From a1789ddde42033f1b05cc4929491214ee6e79383 Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Thu, 17 Dec 2015 09:15:42 +0100 Subject: Update to MediaWiki 1.26.0 --- includes/debug/logger/monolog/KafkaHandler.php | 224 +++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 includes/debug/logger/monolog/KafkaHandler.php (limited to 'includes/debug/logger/monolog/KafkaHandler.php') diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php new file mode 100644 index 00000000..59d7764a --- /dev/null +++ b/includes/debug/logger/monolog/KafkaHandler.php @@ -0,0 +1,224 @@ += 1.3.0 + * + * @since 1.26 + * @author Erik Bernhardson + * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation. + */ +class KafkaHandler extends AbstractProcessingHandler { + /** + * @var Produce Sends requests to kafka + */ + protected $produce; + + /** + * @var array Optional handler configuration + */ + protected $options; + + /** + * @var array Map from topic name to partition this request produces to + */ + protected $partitions = array(); + + /** + * @var array defaults for constructor options + */ + private static $defaultOptions = array( + 'alias' => array(), // map from monolog channel to kafka topic + 'swallowExceptions' => false, // swallow exceptions sending records + 'logExceptions' => null, // A PSR3 logger to inform about errors + ); + + /** + * @param Produce $produce Kafka instance to produce through + * @param array $options optional handler configuration + * @param int $level The minimum logging level at which this handler will be triggered + * @param bool $bubble Whether the messages that are handled can bubble up the stack or not + */ + public function __construct( Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true ) { + parent::__construct( $level, $bubble ); + $this->produce = $produce; + $this->options = array_merge( self::$defaultOptions, $options ); + } + + /** + * Constructs the necessary support objects and returns a KafkaHandler + * instance. + * + * @param string[] $kafkaServers + * @param array $options + * @param int $level The minimum logging level at which this handle will be triggered + * @param bool $bubble Whether the messages that are handled can bubble the stack or not + * @return KafkaHandler + */ + public static function factory( $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true ) { + $metadata = new MetaDataFromKafka( $kafkaServers ); + $produce = new Produce( $metadata ); + if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) { + $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] ); + } + return new self( $produce, $options, $level, $bubble ); + } + + /** + * {@inheritDoc} + */ + protected function write( array $record ) { + if ( $record['formatted'] !== null ) { + $this->addMessages( $record['channel'], array( $record['formatted'] ) ); + $this->send(); + } + } + + /** + * {@inheritDoc} + */ + public function handleBatch( array $batch ) { + $channels = array(); + foreach ( $batch as $record ) { + if ( $record['level'] < $this->level ) { + continue; + } + $channels[$record['channel']][] = $this->processRecord( $record ); + } + + $formatter = $this->getFormatter(); + foreach ( $channels as $channel => $records ) { + $messages = array(); + foreach ( $records as $idx => $record ) { + $message = $formatter->format( $record ); + if ( $message !== null ) { + $messages[] = $message; + } + } + if ( $messages ) { + $this->addMessages($channel, $messages); + } + } + + $this->send(); + } + + /** + * Send any records in the kafka client internal queue. + */ + protected function send() { + try { + $this->produce->send(); + } catch ( \Kafka\Exception $e ) { + $ignore = $this->warning( + 'Error sending records to kafka: {exception}', + array( 'exception' => $e ) ); + if ( !$ignore ) { + throw $e; + } + } + } + + /** + * @param string $topic Name of topic to get partition for + * @return int|null The random partition to produce to for this request, + * or null if a partition could not be determined. + */ + protected function getRandomPartition( $topic ) { + if ( !array_key_exists( $topic, $this->partitions ) ) { + try { + $partitions = $this->produce->getAvailablePartitions( $topic ); + } catch ( \Kafka\Exception $e ) { + $ignore = $this->warning( + 'Error getting metadata for kafka topic {topic}: {exception}', + array( 'topic' => $topic, 'exception' => $e ) ); + if ( $ignore ) { + return null; + } + throw $e; + } + if ( $partitions ) { + $key = array_rand( $partitions ); + $this->partitions[$topic] = $partitions[$key]; + } else { + $details = $this->produce->getClient()->getTopicDetail( $topic ); + $ignore = $this->warning( + 'No partitions available for kafka topic {topic}', + array( 'topic' => $topic, 'kafka' => $details ) + ); + if ( !$ignore ) { + throw new \RuntimeException( "No partitions available for kafka topic $topic" ); + } + $this->partitions[$topic] = null; + } + } + return $this->partitions[$topic]; + } + + /** + * Adds records for a channel to the Kafka client internal queue. + * + * @param string $channel Name of Monolog channel records belong to + * @param array $records List of records to append + */ + protected function addMessages( $channel, array $records ) { + if ( isset( $this->options['alias'][$channel] ) ) { + $topic = $this->options['alias'][$channel]; + } else { + $topic = "monolog_$channel"; + } + $partition = $this->getRandomPartition( $topic ); + if ( $partition !== null ) { + $this->produce->setMessages( $topic, $partition, $records ); + } + } + + /** + * @param string $message PSR3 compatible message string + * @param array $context PSR3 compatible log context + * @return bool true if caller should ignore warning + */ + protected function warning( $message, array $context = array() ) { + if ( $this->options['logExceptions'] instanceof LoggerInterface ) { + $this->options['logExceptions']->warning( $message, $context ); + } + return $this->options['swallowExceptions']; + } +} -- cgit v1.2.3-54-g00ecf