From a1789ddde42033f1b05cc4929491214ee6e79383 Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Thu, 17 Dec 2015 09:15:42 +0100 Subject: Update to MediaWiki 1.26.0 --- .../kafka-php/src/Kafka/MetaDataFromKafka.php | 200 +++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php (limited to 'vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php') diff --git a/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php new file mode 100644 index 00000000..9d2c613e --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php @@ -0,0 +1,200 @@ +hostList = explode(',', $hostList); + } else { + $this->hostList = (array)$hostList; + } + // randomize the order of servers we collect metadata from + shuffle($this->hostList); + } + + // }}} + // {{{ public function setClient() + + /** + * @var \Kafka\Client $client + * @access public + * @return void + */ + public function setClient(\Kafka\Client $client) + { + $this->client = $client; + } + + // }}} + // {{{ public function listBrokers() + + /** + * get broker list from kafka metadata + * + * @access public + * @return array + */ + public function listBrokers() + { + if ($this->brokers === null) { + $this->loadBrokers(); + } + return $this->brokers; + } + + // }}} + // {{{ public function getPartitionState() + + public function getPartitionState($topicName, $partitionId = 0) + { + if (!isset( $this->topics[$topicName] ) ) { + $this->loadTopicDetail(array($topicName)); + } + if ( isset( $this->topics[$topicName]['partitions'][$partitionId] ) ) { + return $this->topics[$topicName]['partitions'][$partitionId]; + } else { + return null; + } + } + + // }}} + // {{{ public function getTopicDetail() + + /** + * + * @param string $topicName + * @access public + * @return array + */ + public function getTopicDetail($topicName) + { + if (!isset( $this->topics[$topicName] ) ) { + $this->loadTopicDetail(array($topicName)); + } + if (isset( $this->topics[$topicName] ) ) { + return $this->topics[$topicName]; + } else { + return array(); + } + } + + // }}} + // {{{ private function loadBrokers() + + private function loadBrokers() + { + $this->brokers = array(); + // not sure how to ask for only the brokers without a topic... + // just ask for a topic we don't care about + $this->loadTopicDetail(array('test')); + } + + // }}} + // {{{ private function loadTopicDetail() + + private function loadTopicDetail(array $topics) + { + if ($this->client === null) { + throw new \Kafka\Exception('client was not provided'); + } + $response = null; + foreach ($this->hostList as $host) { + try { + $response = null; + $stream = $this->client->getStream($host); + $conn = $stream['stream']; + $encoder = new \Kafka\Protocol\Encoder($conn); + $encoder->metadataRequest($topics); + $decoder = new \Kafka\Protocol\Decoder($conn); + $response = $decoder->metadataResponse(); + $this->client->freeStream($stream['key']); + break; + } catch (\Kafka\Exception $e) { + // keep trying + } + } + if ($response) { + // Merge arrays using "+" operator to preserve key (which are broker IDs) + // instead of array_merge (which reindex numeric keys) + $this->brokers = $response['brokers'] + $this->brokers; + $this->topics = array_merge($response['topics'], $this->topics); + } else { + throw new \Kafka\Exception('Could not connect to any kafka brokers'); + } + } + + // }}} + // }}} +} -- cgit v1.2.3-54-g00ecf