From a1789ddde42033f1b05cc4929491214ee6e79383 Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Thu, 17 Dec 2015 09:15:42 +0100 Subject: Update to MediaWiki 1.26.0 --- vendor/nmred/kafka-php/src/Kafka/Client.php | 290 +++++++++ .../nmred/kafka-php/src/Kafka/ClusterMetaData.php | 53 ++ vendor/nmred/kafka-php/src/Kafka/Consumer.php | 378 ++++++++++++ vendor/nmred/kafka-php/src/Kafka/Exception.php | 31 + .../kafka-php/src/Kafka/Exception/NotSupported.php | 33 ++ .../kafka-php/src/Kafka/Exception/OutOfRange.php | 33 ++ .../kafka-php/src/Kafka/Exception/Protocol.php | 33 ++ .../nmred/kafka-php/src/Kafka/Exception/Socket.php | 33 ++ .../src/Kafka/Exception/SocketConnect.php | 33 ++ .../kafka-php/src/Kafka/Exception/SocketEOF.php | 33 ++ .../src/Kafka/Exception/SocketTimeout.php | 33 ++ vendor/nmred/kafka-php/src/Kafka/Log.php | 78 +++ .../kafka-php/src/Kafka/MetaDataFromKafka.php | 200 +++++++ vendor/nmred/kafka-php/src/Kafka/Offset.php | 305 ++++++++++ vendor/nmred/kafka-php/src/Kafka/Produce.php | 337 +++++++++++ .../nmred/kafka-php/src/Kafka/Protocol/Decoder.php | 430 ++++++++++++++ .../nmred/kafka-php/src/Kafka/Protocol/Encoder.php | 652 +++++++++++++++++++++ .../Kafka/Protocol/Fetch/Helper/CommitOffset.php | 119 ++++ .../src/Kafka/Protocol/Fetch/Helper/Consumer.php | 39 ++ .../src/Kafka/Protocol/Fetch/Helper/FreeStream.php | 117 ++++ .../src/Kafka/Protocol/Fetch/Helper/Helper.php | 160 +++++ .../Kafka/Protocol/Fetch/Helper/HelperAbstract.php | 71 +++ .../kafka-php/src/Kafka/Protocol/Fetch/Message.php | 175 ++++++ .../src/Kafka/Protocol/Fetch/MessageSet.php | 269 +++++++++ .../src/Kafka/Protocol/Fetch/Partition.php | 375 ++++++++++++ .../kafka-php/src/Kafka/Protocol/Fetch/Topic.php | 345 +++++++++++ .../kafka-php/src/Kafka/Protocol/Protocol.php | 230 ++++++++ vendor/nmred/kafka-php/src/Kafka/Socket.php | 365 ++++++++++++ vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php | 364 ++++++++++++ 29 files changed, 5614 insertions(+) create mode 100644 vendor/nmred/kafka-php/src/Kafka/Client.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Consumer.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Log.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Offset.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Produce.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/Socket.php create mode 100644 vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php (limited to 'vendor/nmred/kafka-php/src') diff --git a/vendor/nmred/kafka-php/src/Kafka/Client.php b/vendor/nmred/kafka-php/src/Kafka/Client.php new file mode 100644 index 00000000..a38e705b --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Client.php @@ -0,0 +1,290 @@ + 0, + 'RecvTimeoutUsec' => 750000, + 'SendTimeoutSec' => 0, + 'SendTimeoutUsec' => 100000, + ); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct(ClusterMetaData $metadata) + { + $this->metadata = $metadata; + if (method_exists($metadata, 'setClient')) { + $this->metadata->setClient($this); + } + } + + /** + * update stream options + * + * @param array $options + */ + public function setStreamOptions($options = array()) + { + // Merge the arrays + $this->streamOptions = array_merge($this->streamOptions, $options); + $this->updateStreamOptions(); + } + + /** + * @access public + * @param $name - name of stream option + * @param $value - value for option + */ + public function setStreamOption($name, $value) + { + $this->streamOptions[$name] = $value; + $this->updateStreamOptions(); + } + + /** + * @access public + * @param $name - name of option + * @return mixed + */ + public function getStreamOption($name) + { + if (array_key_exists($name, $this->streamOptions)) { + return $this->streamOptions[$name]; + } + return null; + } + + /** + * @access private + */ + private function updateStreamOptions() + { + // Loop thru each stream + foreach (self::$stream as $host => $streams) { + foreach ($streams as $key => $info) { + // Update options + if (isset($info['stream'])) { + /** @var \Kafka\Socket $stream */ + $stream = $info['stream']; + $stream->setRecvTimeoutSec($this->streamOptions['RecvTimeoutSec']); + $stream->setRecvTimeoutUsec($this->streamOptions['SendTimeoutUsec']); + $stream->setSendTimeoutSec($this->streamOptions['SendTimeoutSec']); + $stream->setSendTimeoutUsec($this->streamOptions['SendTimeoutUsec']); + } + } + } + } + + // }}} + // {{{ public function getBrokers() + + /** + * get broker server + * + * @access public + * @return void + */ + public function getBrokers() + { + if (empty($this->hostList)) { + $brokerList = $this->metadata->listBrokers(); + foreach ($brokerList as $brokerId => $info) { + if (!isset($info['host']) || !isset($info['port'])) { + continue; + } + $this->hostList[$brokerId] = $info['host'] . ':' . $info['port']; + } + } + + return $this->hostList; + } + + // }}} + // {{{ public function getHostByPartition() + + /** + * get broker host by topic partition + * + * @param string $topicName + * @param int $partitionId + * @access public + * @return string + */ + public function getHostByPartition($topicName, $partitionId = 0) + { + $partitionInfo = $this->metadata->getPartitionState($topicName, $partitionId); + if (!$partitionInfo) { + throw new \Kafka\Exception('topic:' . $topicName . ', partition id: ' . $partitionId . ' is not exists.'); + } + + $hostList = $this->getBrokers(); + if (isset($partitionInfo['leader']) && isset($hostList[$partitionInfo['leader']])) { + return $hostList[$partitionInfo['leader']]; + } else { + throw new \Kafka\Exception('can\'t find broker host.'); + } + } + + // }}} + // {{{ public function getZooKeeper() + + /** + * get kafka zookeeper object + * + * @access public + * @return \Kafka\ZooKeeper + */ + public function getZooKeeper() + { + if ($this->metadata instanceof \Kafka\ZooKeeper) { + return $this->metadata; + } else { + throw new \Kafka\Exception( 'ZooKeeper was not provided' ); + } + } + + // }}} + // {{{ public function getStream() + + /** + * get broker broker connect + * + * @param string $host + * @access private + * @return void + */ + public function getStream($host, $lockKey = null) + { + if (!$lockKey) { + $lockKey = uniqid($host); + } + + list($hostname, $port) = explode(':', $host); + // find unlock stream + if (isset(self::$stream[$host])) { + foreach (self::$stream[$host] as $key => $info) { + if ($info['locked']) { + continue; + } else { + self::$stream[$host][$key]['locked'] = true; + $info['stream']->connect(); + return array('key' => $key, 'stream' => $info['stream']); + } + } + } + + // no idle stream + $stream = new \Kafka\Socket($hostname, $port, $this->getStreamOption('RecvTimeoutSec'), $this->getStreamOption('RecvTimeoutUsec'), $this->getStreamOption('SendTimeoutSec'), $this->getStreamOption('SendTimeoutUsec')); + $stream->connect(); + self::$stream[$host][$lockKey] = array( + 'locked' => true, + 'stream' => $stream, + ); + return array('key' => $lockKey, 'stream' => $stream); + } + + // }}} + // {{{ public function freeStream() + + /** + * free stream pool + * + * @param string $key + * @access public + * @return void + */ + public function freeStream($key) + { + foreach (self::$stream as $host => $values) { + if (isset($values[$key])) { + self::$stream[$host][$key]['locked'] = false; + } + } + } + + // }}} + // {{{ public function getTopicDetail() + + /** + * get topic detail info + * + * @param string $topicName + * @return array + */ + public function getTopicDetail($topicName) + { + return $this->metadata->getTopicDetail($topicName); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php b/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php new file mode 100644 index 00000000..e9b3d064 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php @@ -0,0 +1,53 @@ +client = new \Kafka\Client($zookeeper); + } + + // }}} + // {{{ public function clearPayload() + + /** + * clearPayload + * + * @access public + * @return void + */ + public function clearPayload() + { + $this->payload = array(); + } + + // }}} + // {{{ public function setTopic() + + /** + * set topic name + * + * @access public + * @return void + */ + public function setTopic($topicName, $defaultOffset = null) + { + $parts = $this->client->getTopicDetail($topicName); + if (!isset($parts['partitions']) || empty($parts['partitions'])) { + // set topic fail. + return $this; + } + + foreach ($parts['partitions'] as $partId => $info) { + $this->setPartition($topicName, $partId, $defaultOffset); + } + + return $this; + } + + // }}} + // {{{ public function setPartition() + + /** + * set topic partition + * + * @access public + * @return void + */ + public function setPartition($topicName, $partitionId = 0, $offset = null) + { + if (is_null($offset)) { + if ($this->fromOffset) { + $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); + $offset = $offsetObject->getOffset($this->offsetStrategy); + \Kafka\Log::log('topic name:' . $topicName . ', part:' . $partitionId . 'get offset from kafka server, offet:' . $offset, LOG_DEBUG); + } else { + $offset = 0; + } + } + $this->payload[$topicName][$partitionId] = $offset; + + return $this; + } + + // }}} + // {{{ public function setFromOffset() + + /** + * set whether starting offset fetch + * + * @param boolean $fromOffset + * @access public + * @return void + */ + public function setFromOffset($fromOffset) + { + $this->fromOffset = (boolean) $fromOffset; + } + + // }}} + // {{{ public function setMaxBytes() + + /** + * set fetch message max bytes + * + * @param int $maxSize + * @access public + * @return void + */ + public function setMaxBytes($maxSize) + { + $this->maxSize = $maxSize; + } + + // }}} + // {{{ public function setGroup() + + /** + * set consumer group + * + * @param string $group + * @access public + * @return void + */ + public function setGroup($group) + { + $this->group = (string) $group; + return $this; + } + + // }}} + // {{{ public function fetch() + + /** + * fetch message to broker + * + * @access public + * @return void + */ + public function fetch() + { + $data = $this->_formatPayload(); + if (empty($data)) { + return false; + } + + $responseData = array(); + $streams = array(); + foreach ($data as $host => $requestData) { + $connArr = $this->client->getStream($host); + $conn = $connArr['stream']; + $encoder = new \Kafka\Protocol\Encoder($conn); + $encoder->fetchRequest($requestData); + $streams[$connArr['key']] = $conn; + } + + $fetch = new \Kafka\Protocol\Fetch\Topic($streams, $data); + + // register fetch helper + $freeStream = new \Kafka\Protocol\Fetch\Helper\FreeStream($this->client); + $freeStream->setStreams($streams); + \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('freeStream', $freeStream); + + // register partition commit offset + $commitOffset = new \Kafka\Protocol\Fetch\Helper\CommitOffset($this->client); + $commitOffset->setGroup($this->group); + \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('commitOffset', $commitOffset); + + $updateConsumer = new \Kafka\Protocol\Fetch\Helper\Consumer($this); + \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('updateConsumer', $updateConsumer); + + return $fetch; + } + + // }}} + // {{{ public function getClient() + + /** + * get client object + * + * @access public + * @return void + */ + public function getClient() + { + return $this->client; + } + + /** + * passthru method to client for setting stream options + * + * @param array $options + */ + public function setStreamOptions($options = array()) + { + $this->client->setStreamOptions($options); + } + + // }}} + // {{{ private function _formatPayload() + + /** + * format payload array + * + * @access private + * @return array + */ + private function _formatPayload() + { + if (empty($this->payload)) { + return array(); + } + + $data = array(); + foreach ($this->payload as $topicName => $partitions) { + foreach ($partitions as $partitionId => $offset) { + $host = $this->client->getHostByPartition($topicName, $partitionId); + $data[$host][$topicName][$partitionId] = $offset; + } + } + + $requestData = array(); + foreach ($data as $host => $info) { + $topicData = array(); + foreach ($info as $topicName => $partitions) { + $partitionData = array(); + foreach ($partitions as $partitionId => $offset) { + $partitionData[] = array( + 'partition_id' => $partitionId, + 'offset' => $offset, + 'max_bytes' => $this->maxSize, + ); + } + $topicData[] = array( + 'topic_name' => $topicName, + 'partitions' => $partitionData, + ); + } + + $requestData[$host] = array( + 'data' => $topicData, + ); + } + + return $requestData; + } + + /** + * const LAST_OFFSET = -1; + * const EARLIEST_OFFSET = -2; + * const DEFAULT_LAST = -2; + * const DEFAULT_EARLY = -1; + * @param type $offsetStrategy + */ + public function setOffsetStrategy($offsetStrategy) + { + $this->offsetStrategy = $offsetStrategy; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception.php b/vendor/nmred/kafka-php/src/Kafka/Exception.php new file mode 100644 index 00000000..f336f1c3 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception.php @@ -0,0 +1,31 @@ +log($message, $level); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php new file mode 100644 index 00000000..9d2c613e --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php @@ -0,0 +1,200 @@ +hostList = explode(',', $hostList); + } else { + $this->hostList = (array)$hostList; + } + // randomize the order of servers we collect metadata from + shuffle($this->hostList); + } + + // }}} + // {{{ public function setClient() + + /** + * @var \Kafka\Client $client + * @access public + * @return void + */ + public function setClient(\Kafka\Client $client) + { + $this->client = $client; + } + + // }}} + // {{{ public function listBrokers() + + /** + * get broker list from kafka metadata + * + * @access public + * @return array + */ + public function listBrokers() + { + if ($this->brokers === null) { + $this->loadBrokers(); + } + return $this->brokers; + } + + // }}} + // {{{ public function getPartitionState() + + public function getPartitionState($topicName, $partitionId = 0) + { + if (!isset( $this->topics[$topicName] ) ) { + $this->loadTopicDetail(array($topicName)); + } + if ( isset( $this->topics[$topicName]['partitions'][$partitionId] ) ) { + return $this->topics[$topicName]['partitions'][$partitionId]; + } else { + return null; + } + } + + // }}} + // {{{ public function getTopicDetail() + + /** + * + * @param string $topicName + * @access public + * @return array + */ + public function getTopicDetail($topicName) + { + if (!isset( $this->topics[$topicName] ) ) { + $this->loadTopicDetail(array($topicName)); + } + if (isset( $this->topics[$topicName] ) ) { + return $this->topics[$topicName]; + } else { + return array(); + } + } + + // }}} + // {{{ private function loadBrokers() + + private function loadBrokers() + { + $this->brokers = array(); + // not sure how to ask for only the brokers without a topic... + // just ask for a topic we don't care about + $this->loadTopicDetail(array('test')); + } + + // }}} + // {{{ private function loadTopicDetail() + + private function loadTopicDetail(array $topics) + { + if ($this->client === null) { + throw new \Kafka\Exception('client was not provided'); + } + $response = null; + foreach ($this->hostList as $host) { + try { + $response = null; + $stream = $this->client->getStream($host); + $conn = $stream['stream']; + $encoder = new \Kafka\Protocol\Encoder($conn); + $encoder->metadataRequest($topics); + $decoder = new \Kafka\Protocol\Decoder($conn); + $response = $decoder->metadataResponse(); + $this->client->freeStream($stream['key']); + break; + } catch (\Kafka\Exception $e) { + // keep trying + } + } + if ($response) { + // Merge arrays using "+" operator to preserve key (which are broker IDs) + // instead of array_merge (which reindex numeric keys) + $this->brokers = $response['brokers'] + $this->brokers; + $this->topics = array_merge($response['topics'], $this->topics); + } else { + throw new \Kafka\Exception('Could not connect to any kafka brokers'); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Offset.php b/vendor/nmred/kafka-php/src/Kafka/Offset.php new file mode 100644 index 00000000..7ad3f9d8 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Offset.php @@ -0,0 +1,305 @@ +client = $client; + $this->groupId = $groupId; + $this->topicName = $topicName; + $this->partitionId = $partitionId; + + $host = $this->client->getHostByPartition($topicName, $partitionId); + $stream = $this->client->getStream($host); + $conn = $stream['stream']; + $this->streamKey = $stream['key']; + $this->encoder = new \Kafka\Protocol\Encoder($conn); + $this->decoder = new \Kafka\Protocol\Decoder($conn); + } + + // }}} + // {{{ public function setOffset() + + /** + * set consumer offset + * + * @param integer $offset + * @access public + * @return void + */ + public function setOffset($offset) + { + $maxOffset = $this->getProduceOffset(); + if ($offset > $maxOffset) { + throw new \Kafka\Exception('this offset is invalid. must less than max offset:' . $maxOffset); + } + + $data = array( + 'group_id' => $this->groupId, + 'data' => array( + array( + 'topic_name' => $this->topicName, + 'partitions' => array( + array( + 'partition_id' => $this->partitionId, + 'offset' => $offset, + ), + ), + ), + ), + ); + + $topicName = $this->topicName; + $partitionId = $this->partitionId; + + $this->encoder->commitOffsetRequest($data); + $result = $this->decoder->commitOffsetResponse(); + $this->client->freeStream($this->streamKey); + if (!isset($result[$topicName][$partitionId]['errCode'])) { + throw new \Kafka\Exception('commit topic offset failed.'); + } + if ($result[$topicName][$partitionId]['errCode'] != 0) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); + } + } + + // }}} + // {{{ public function getOffset() + + /** + * get consumer offset + * + * @param integer $defaultOffset + * if defaultOffset -1 instead of early offset + * if defaultOffset -2 instead of last offset + * @access public + * @return void + */ + public function getOffset($defaultOffset = self::DEFAULT_LAST) + { + $maxOffset = $this->getProduceOffset(self::LAST_OFFSET); + $minOffset = $this->getProduceOffset(self::EARLIEST_OFFSET); + $data = array( + 'group_id' => $this->groupId, + 'data' => array( + array( + 'topic_name' => $this->topicName, + 'partitions' => array( + array( + 'partition_id' => $this->partitionId, + ), + ), + ), + ), + ); + + $this->encoder->fetchOffsetRequest($data); + $result = $this->decoder->fetchOffsetResponse(); + $this->client->freeStream($this->streamKey); + + $topicName = $this->topicName; + $partitionId = $this->partitionId; + if (!isset($result[$topicName][$partitionId]['errCode'])) { + throw new \Kafka\Exception('fetch topic offset failed.'); + } + if ($result[$topicName][$partitionId]['errCode'] == 3) { + switch ($defaultOffset) { + case self::DEFAULT_LAST: + return $maxOffset; + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default last.", LOG_INFO); + case self::DEFAULT_EARLY: + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default early.", LOG_INFO); + return $minOffset; + default: + $this->setOffset($defaultOffset); + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default $defaultOffset.", LOG_INFO); + return $defaultOffset; + } + if ($defaultOffset) { + $this->setOffset($defaultOffset); + return $defaultOffset; + } + } elseif ($result[$topicName][$partitionId]['errCode'] == 0) { + $offset = $result[$topicName][$partitionId]['offset']; + if ($offset > $maxOffset || $offset < $minOffset) { + if ($defaultOffset == self::DEFAULT_EARLY) { + $offset = $minOffset; + } else { + $offset = $maxOffset; + } + } + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is $offset.", LOG_INFO); + + return $offset; + } else { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); + } + } + + // }}} + // {{{ public function getProduceOffset() + + /** + * get produce server offset + * + * @param string $topicName + * @param integer $partitionId + * @access public + * @return int + */ + public function getProduceOffset($timeLine = self::LAST_OFFSET) + { + $topicName = $this->topicName; + $partitionId = $this->partitionId; + + $requestData = array( + 'data' => array( + array( + 'topic_name' => $this->topicName, + 'partitions' => array( + array( + 'partition_id' => $this->partitionId, + 'time' => $timeLine, + 'max_offset' => 1, + ), + ), + ), + ), + ); + $this->encoder->offsetRequest($requestData); + $result = $this->decoder->offsetResponse(); + $this->client->freeStream($this->streamKey); + + if (!isset($result[$topicName][$partitionId]['offset'])) { + if (isset($result[$topicName][$partitionId]['errCode'])) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); + } else { + throw new \Kafka\Exception('get offset failed. topic name:' . $this->topicName . ' partitionId: ' . $this->partitionId); + } + } + + return array_shift($result[$topicName][$partitionId]['offset']); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Produce.php b/vendor/nmred/kafka-php/src/Kafka/Produce.php new file mode 100644 index 00000000..2b9e6cd3 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Produce.php @@ -0,0 +1,337 @@ +client = new \Kafka\Client($metadata); + } + + // }}} + // {{{ public function setMessages() + + /** + * set send messages + * + * @access public + * @return void + */ + public function setMessages($topicName, $partitionId = 0, $messages = array()) + { + if (isset($this->payload[$topicName][$partitionId])) { + $this->payload[$topicName][$partitionId] = + array_merge($this->payload[$topicName][$partitionId], $messages); + } else { + $this->payload[$topicName][$partitionId] = $messages; + } + + return $this; + } + + // }}} + // {{{ public function setRequireAck() + + /** + * set request mode + * This field indicates how many acknowledgements the servers should receive + * before responding to the request. If it is 0 the server will not send any + * response (this is the only case where the server will not reply to a + * request). If it is 1, the server will wait the data is written to the + * local log before sending a response. If it is -1 the server will block + * until the message is committed by all in sync replicas before sending a + * response. For any number > 1 the server will block waiting for this + * number of acknowledgements to occur (but the server will never wait for + * more acknowledgements than there are in-sync replicas). + * + * @param int $ack + * @access public + * @return void + */ + public function setRequireAck($ack = 0) + { + if ($ack >= -1) { + $this->requiredAck = (int) $ack; + } + + return $this; + } + + // }}} + // {{{ public function setTimeOut() + + /** + * set request timeout + * + * @param int $timeout + * @access public + * @return void + */ + public function setTimeOut($timeout = 100) + { + if ((int) $timeout) { + $this->timeout = (int) $timeout; + } + return $this; + } + + // }}} + // {{{ public function send() + + /** + * send message to broker + * + * @access public + * @return void + */ + public function send() + { + $data = $this->_formatPayload(); + if (empty($data)) { + return false; + } + + $responseData = array(); + foreach ($data as $host => $requestData) { + $stream = $this->client->getStream($host); + $conn = $stream['stream']; + $encoder = new \Kafka\Protocol\Encoder($conn); + $encoder->produceRequest($requestData); + if ((int) $this->requiredAck !== 0) { // get broker response + $decoder = new \Kafka\Protocol\Decoder($conn); + $response = $decoder->produceResponse(); + foreach ($response as $topicName => $info) { + if (!isset($responseData[$topicName])) { + $responseData[$topicName] = $info; + } else { + $responseData[$topicName] = array_merge($info, $responseData[$topicName]); + } + } + } + + $this->client->freeStream($stream['key']); + } + + $this->payload = array(); + return $responseData; + } + + // }}} + // {{{ public function getClient() + + /** + * get client object + * + * @access public + * @return void + */ + public function getClient() + { + return $this->client; + } + + /** + * passthru method to client for setting stream options + * + * @access public + * @param array $options + */ + public function setStreamOptions($options = array()) + { + $this->client->setStreamOptions($options); + } + + // }}} + // {{{ public function getAvailablePartitions() + + /** + * get available partition + * + * @access public + * @return array + */ + public function getAvailablePartitions($topicName) + { + $topicDetail = $this->client->getTopicDetail($topicName); + if (is_array($topicDetail) && isset($topicDetail['partitions'])) { + $topicPartitiions = array_keys($topicDetail['partitions']); + } else { + $topicPartitiions = array(); + } + + return $topicPartitiions; + } + + // }}} + // {{{ private function _formatPayload() + + /** + * format payload array + * + * @access private + * @return array + */ + private function _formatPayload() + { + if (empty($this->payload)) { + return array(); + } + + $data = array(); + foreach ($this->payload as $topicName => $partitions) { + foreach ($partitions as $partitionId => $messages) { + $host = $this->client->getHostByPartition($topicName, $partitionId); + $data[$host][$topicName][$partitionId] = $messages; + } + } + + $requestData = array(); + foreach ($data as $host => $info) { + $topicData = array(); + foreach ($info as $topicName => $partitions) { + $partitionData = array(); + foreach ($partitions as $partitionId => $messages) { + $partitionData[] = array( + 'partition_id' => $partitionId, + 'messages' => $messages, + ); + } + $topicData[] = array( + 'topic_name' => $topicName, + 'partitions' => $partitionData, + ); + } + + $requestData[$host] = array( + 'required_ack' => $this->requiredAck, + 'timeout' => $this->timeout, + 'data' => $topicData, + ); + } + + return $requestData; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php new file mode 100644 index 00000000..f1e4b496 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php @@ -0,0 +1,430 @@ +stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('produce response invalid.'); + } + $data = $this->stream->read($dataLen, true); + + // parse data struct + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $topicCount = array_shift($topicCount); + $offset += 4; + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + 'offset' => $partitionOffset + ); + } + } + + return $result; + } + + // }}} + // {{{ public function fetchResponse() + + /** + * decode fetch response + * + * @param string $data + * @access public + * @return Iterator + */ + public function fetchResponse() + { + return new \Kafka\Protocol\Fetch\Topic($this->stream); + } + + // }}} + // {{{ public function metadataResponse() + + /** + * decode metadata response + * + * @param string $data + * @access public + * @return array + */ + public function metadataResponse() + { + $result = array(); + $broker = array(); + $topic = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('metaData response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0; + for ($i = 0; $i < $brokerCount; $i++) { + $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $nodeId = $nodeId[1]; + $offset += 4; + $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length + $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0; + $offset += 2; + $hostName = substr($data, $offset, $hostNameLen); + $offset += $hostNameLen; + $port = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $broker[$nodeId] = array( + 'host' => $hostName, + 'port' => $port[1], + ); + } + + $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0; + for ($i = 0; $i < $topicMetaCount; $i++) { + $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $topicName = substr($data, $offset, $topicLen[1]); + $offset += $topicLen[1]; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $topic[$topicName]['errCode'] = $topicErrCode[1]; + $partitions = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0; + $offset += 4; + $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0; + $replias = array(); + for ($z = 0; $z < $repliasCount; $z++) { + $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $replias[] = $repliaId[1]; + } + $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0; + $isrs = array(); + for ($z = 0; $z < $isrCount; $z++) { + $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $isrs[] = $isrId[1]; + } + + $partitions[$partitionId] = array( + 'errCode' => $partitionErrCode[1], + 'leader' => $leaderId[1], + 'replicas' => $replias, + 'isr' => $isrs, + ); + } + $topic[$topicName]['partitions'] = $partitions; + } + + $result = array( + 'brokers' => $broker, + 'topics' => $topic, + ); + return $result; + } + + // }}} + // {{{ public function offsetResponse() + + /** + * decode offset response + * + * @param string $data + * @access public + * @return array + */ + public function offsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $offsetCount = array_shift($offsetCount); + $offsetArr = array(); + for ($z = 0; $z < $offsetCount; $z++) { + $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + } + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + 'offset' => $offsetArr + ); + } + } + return $result; + } + + // }}} + // {{{ public function commitOffsetResponse() + + /** + * decode commit offset response + * + * @param string $data + * @access public + * @return array + */ + public function commitOffsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('commit offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + ); + } + } + return $result; + } + + // }}} + // {{{ public function fetchOffsetResponse() + + /** + * decode fetch offset response + * + * @param string $data + * @access public + * @return array + */ + public function fetchOffsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('fetch offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $metaLen = array_shift($metaLen); + $offset += 2; + $metaData = ''; + if ($metaLen) { + $metaData = substr($data, $offset, $metaLen); + $offset += $metaLen; + } + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $result[$topicName][$partitionId[1]] = array( + 'offset' => $partitionOffset, + 'metadata' => $metaData, + 'errCode' => $errCode[1], + ); + } + } + return $result; + } + + // }}} + // {{{ public static function getError() + + /** + * get error + * + * @param integer $errCode + * @static + * @access public + * @return string + */ + public static function getError($errCode) + { + $error = ''; + switch($errCode) { + case 0: + $error = 'No error--it worked!'; + break; + case -1: + $error = 'An unexpected server error'; + break; + case 1: + $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.'; + break; + case 2: + $error = 'This indicates that a message contents does not match its CRC'; + break; + case 3: + $error = 'This request is for a topic or partition that does not exist on this broker.'; + break; + case 4: + $error = 'The message has a negative size'; + break; + case 5: + $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes'; + break; + case 6: + $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.'; + break; + case 7: + $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.'; + break; + case 8: + $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.'; + break; + case 10: + $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.'; + break; + case 11: + $error = 'Internal error code for broker-to-broker communication.'; + break; + case 12: + $error = 'If you specify a string larger than configured maximum for offset metadata'; + break; + case 14: + $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).'; + break; + case 15: + $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.'; + break; + case 16: + $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.'; + break; + default: + $error = 'Unknown error'; + } + + return $error; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php new file mode 100644 index 00000000..7d36e10f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php @@ -0,0 +1,652 @@ +stream->write($data); + } + + // }}} + // {{{ public function metadataRequest() + + /** + * build metadata request protocol + * + * @param array $topics + * @access public + * @return string + */ + public function metadataRequest($topics) + { + if (!is_array($topics)) { + $topics = array($topics); + } + + foreach ($topics as $topic) { + if (!is_string($topic)) { + throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. '); + } + } + + $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST); + $data = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchRequest() + + /** + * build fetch request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + if (!isset($payloads['max_wait_time'])) { + $payloads['max_wait_time'] = 100; // default timeout 100ms + } + + if (!isset($payloads['min_bytes'])) { + $payloads['min_bytes'] = 64 * 1024; // 64k + } + + $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::pack(self::BIT_B32, $payloads['max_wait_time']); + $data .= self::pack(self::BIT_B32, $payloads['min_bytes']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function offsetRequest() + + /** + * build offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function offsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function commitOffsetRequest() + + /** + * build consumer commit offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function commitOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchOffsetRequest() + + /** + * build consumer fetch offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public static function encodeString() + + /** + * encode pack string type + * + * @param string $string + * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order. + * @static + * @access public + * @return string + */ + public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE) + { + $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16; + switch ($compression) { + case self::COMPRESSION_NONE: + break; + case self::COMPRESSION_GZIP: + $string = gzencode($string); + break; + case self::COMPRESSION_SNAPPY: + throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented'); + default: + throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression); + } + return self::pack($packLen, strlen($string)) . $string; + } + + // }}} + // {{{ public static function encodeArray() + + /** + * encode key array + * + * @param array $array + * @param Callable $func + * @static + * @access public + * @return string + */ + public static function encodeArray(array $array, $func, $options = null) + { + if (!is_callable($func, false)) { + throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.'); + } + + $arrayCount = count($array); + + $body = ''; + foreach ($array as $value) { + if (!is_null($options)) { + $body .= call_user_func($func, $value, $options); + } else { + $body .= call_user_func($func, $value); + } + } + + return self::pack(self::BIT_B32, $arrayCount) . $body; + } + + // }}} + // {{{ public static function encodeMessageSet() + + /** + * encode message set + * N.B., MessageSets are not preceded by an int32 like other array elements + * in the protocol. + * + * @param array $messages + * @static + * @access public + * @return string + */ + public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE) + { + if (!is_array($messages)) { + $messages = array($messages); + } + + $data = ''; + foreach ($messages as $message) { + $tmpMessage = self::_encodeMessage($message, $compression); + + // int64 -- message offset Message + $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32); + } + return $data; + } + + // }}} + // {{{ public static function requestHeader() + + /** + * get request header + * + * @param string $clientId + * @param integer $correlationId + * @param integer $apiKey + * @static + * @access public + * @return void + */ + public static function requestHeader($clientId, $correlationId, $apiKey) + { + // int16 -- apiKey int16 -- apiVersion int32 correlationId + $binData = self::pack(self::BIT_B16, $apiKey); + $binData .= self::pack(self::BIT_B16, self::API_VERSION); + $binData .= self::pack(self::BIT_B32, $correlationId); + + // concat client id + $binData .= self::encodeString($clientId, self::PACK_INT16); + + return $binData; + } + + // }}} + // {{{ protected static function _encodeMessage() + + /** + * encode signal message + * + * @param string $message + * @static + * @access protected + * @return string + */ + protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE) + { + // int8 -- magic int8 -- attribute + $data = self::pack(self::BIT_B8, self::MESSAGE_MAGIC); + $data .= self::pack(self::BIT_B8, $compression); + + // message key + $data .= self::encodeString('', self::PACK_INT32); + + // message value + $data .= self::encodeString($message, self::PACK_INT32, $compression); + + $crc = crc32($data); + + // int32 -- crc code string data + $message = self::pack(self::BIT_B32, $crc) . $data; + + return $message; + } + + // }}} + // {{{ protected static function _encodeProcudePartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudePartion($values, $compression) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['messages']) || empty($values['messages'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32); + + return $data; + } + + // }}} + // {{{ protected static function _encodeProcudeTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudeTopic($values, $compression) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + $values['offset'] = 0; + } + + if (!isset($values['max_bytes'])) { + $values['max_bytes'] = 100 * 1024 * 1024; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B32, $values['max_bytes']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; // -1 + } + + if (!isset($values['max_offset'])) { + $values['max_offset'] = 100000; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::pack(self::BIT_B32, $values['max_offset']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeOffsetTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeCommitOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; + } + + if (!isset($values['metadata'])) { + $values['metadata'] = 'm'; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::encodeString($values['metadata'], self::PACK_INT16); + + return $data; + } + + // }}} + // {{{ protected static function _encodeCommitOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php new file mode 100644 index 00000000..8424cf78 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php @@ -0,0 +1,119 @@ +client = $client; + } + + // }}} + // {{{ public function setGroup() + + /** + * set consumer group + * + * @access public + * @return void + */ + public function setGroup($group) + { + $this->group = $group; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); + $offsetObject->setOffset($offset); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php new file mode 100644 index 00000000..acf0223e --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php @@ -0,0 +1,39 @@ +consumer = $consumer; + } + + + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $this->consumer->setFromOffset(true); + $this->consumer->setPartition($topicName, $partitionId, ($offset +1)); + } + + public function onStreamEof($streamKey) + { + + } + + public function onTopicEof($topicName) + { + + } +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php new file mode 100644 index 00000000..bba38dd6 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php @@ -0,0 +1,117 @@ +client = $client; + } + + // }}} + // {{{ public function setStreams() + + /** + * set streams + * + * @access public + * @return void + */ + public function setStreams($streams) + { + $this->streams = $streams; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + if (isset($this->streams[$streamKey])) { + $this->client->freeStream($streamKey); + } + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php new file mode 100644 index 00000000..4ec23926 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php @@ -0,0 +1,160 @@ + $helper) { + if (method_exists($helper, 'onStreamEof')) { + $helper->onStreamEof($streamKey); + } + } + } + + // }}} + // {{{ public static function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @static + * @access public + * @return void + */ + public static function onTopicEof($topicName) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onTopicEof')) { + $helper->onStreamEof($topicName); + } + } + } + + // }}} + // {{{ public static function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @static + * @access public + * @return void + */ + public static function onPartitionEof($partition) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onPartitionEof')) { + $helper->onPartitionEof($partition); + } + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php new file mode 100644 index 00000000..476f3da1 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php @@ -0,0 +1,71 @@ +crc = array_shift($crc); + $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->magic = array_shift($magic); + $offset += 1; + $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->attribute = array_shift($attr); + $offset += 1; + $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $keyLen = array_shift($keyLen); + $offset += 4; + if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) { + $this->key = substr($msg, $offset, $keyLen); + $offset += $keyLen; + } + $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $messageSize = array_shift($messageSize); + $offset += 4; + if ($messageSize) { + $this->value = substr($msg, $offset, $messageSize); + } + } + + // }}} + // {{{ public function getMessage() + + /** + * get message data + * + * @access public + * @return string (raw) + */ + public function getMessage() + { + return $this->value; + } + + // }}} + // {{{ public function getMessageKey() + + /** + * get message key + * + * @access public + * @return string (raw) + */ + public function getMessageKey() + { + return $this->key; + } + + // }}} + // {{{ public function __toString() + + /** + * __toString + * + * @access public + * @return void + */ + public function __toString() + { + return $this->value; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php new file mode 100644 index 00000000..50413b67 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php @@ -0,0 +1,269 @@ +stream = $partition->getStream(); + $this->partition = $partition; + $this->context = $context; + $this->messageSetSize = $this->getMessageSetSize(); + \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->validByteCount; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + if (!$this->valid) { + $this->partition->setMessageOffset($this->offset); + + // one partition iterator end + \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition); + } + + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ protected function getMessageSetSize() + + /** + * get message set size + * + * @access protected + * @return integer + */ + protected function getMessageSetSize() + { + // read message size + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $size = array_shift($data); + if ($size <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size'); + } + + return $size; + } + + // }}} + // {{{ public function loadNextMessage() + + /** + * load next message + * + * @access public + * @return void + */ + public function loadNextMessage() + { + if ($this->validByteCount >= $this->messageSetSize) { + return false; + } + + try { + if ($this->validByteCount + 12 > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + $messageSize = $this->stream->read(4, true); + $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize); + $messageSize = array_shift($messageSize); + $this->validByteCount += 12; + if (($this->validByteCount + $messageSize) > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $msg = $this->stream->read($messageSize, true); + $this->current = new Message($msg); + } catch (\Kafka\Exception $e) { + \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO); + return false; + } + + $this->validByteCount += $messageSize; + + return true; + } + + // }}} + // {{{ public function messageOffset() + + /** + * current message offset in producer + * + * @return void + */ + public function messageOffset() + { + return $this->offset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php new file mode 100644 index 00000000..9f8578d5 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php @@ -0,0 +1,375 @@ +stream = $topic->getStream(); + $this->topicName = $topic->key(); + $this->context = $context; + $this->partitionCount = $this->getPartitionCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid && $this->validCount <= $this->partitionCount; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->partitionCount; + } + + // }}} + // {{{ public function getErrCode() + + /** + * get partition errcode + * + * @access public + * @return void + */ + public function getErrCode() + { + return $this->errCode; + } + + // }}} + // {{{ public function getHighOffset() + + /** + * get partition high offset + * + * @access public + * @return void + */ + public function getHighOffset() + { + return $this->offset; + } + + // }}} + // {{{ public function getTopicName() + + /** + * get partition topic name + * + * @access public + * @return void + */ + public function getTopicName() + { + return $this->topicName; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + return $this->stream; + } + + // }}} + // {{{ protected function getPartitionCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getPartitionCount() + { + // read topic count + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $count = array_shift($data); + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count'); + } + + return $count; + } + + // }}} + // {{{ public function loadNextPartition() + + /** + * load next partition + * + * @access public + * @return void + */ + public function loadNextPartition() + { + if ($this->validCount >= $this->partitionCount) { + return false; + } + + try { + $partitionId = $this->stream->read(4, true); + $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId); + $partitionId = array_shift($partitionId); + \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO); + + $errCode = $this->stream->read(2, true); + $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode); + $this->errCode = array_shift($errCode); + if ($this->errCode != 0) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode)); + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + + $this->key = $partitionId; + $this->current = new MessageSet($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + return true; + } + + // }}} + // {{{ public function setMessageOffset() + + /** + * set messageSet fetch offset current + * + * @param intger $offset + * @return void + */ + public function setMessageOffset($offset) + { + $this->currentOffset = $offset; + } + + // }}} + // {{{ public function getMessageOffset() + + /** + * get messageSet fetch offset current + * + * @return int + */ + public function getMessageOffset() + { + return $this->currentOffset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php new file mode 100644 index 00000000..500e6b1f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php @@ -0,0 +1,345 @@ +streams = $streams; + $topicInfos = array(); + foreach ($context as $values) { + if (!isset($values['data'])) { + continue; + } + + foreach ($values['data'] as $value) { + if (!isset($value['topic_name']) || !isset($value['partitions'])) { + continue; + } + + $topicName = $value['topic_name']; + foreach ($value['partitions'] as $part) { + $topicInfos[$topicName][$part['partition_id']] = array( + 'offset' => $part['offset'], + ); + } + } + } + $this->context = $topicInfos; + $this->topicCount = $this->getTopicCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->topicCount; + } + + // }}} + // {{{ protected function getTopicCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getTopicCount() + { + $count = 0; + foreach (array_values($this->streams) as $key => $stream) { + // read topic count + $stream->read(8, true); + $data = $stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $topicCount = array_shift($data); + $count += $topicCount; + $this->topicCounts[$key] = $topicCount; + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count'); + } + } + + return $count; + } + + // }}} + // {{{ public function loadNextTopic() + + /** + * load next topic + * + * @access public + * @return void + */ + public function loadNextTopic() + { + if ($this->validCount >= $this->topicCount) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + return false; + } + + if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + $this->currentStreamKey++; + } + + $lockKeys = array_keys($this->streams); + $streams = array_values($this->streams); + if (!isset($streams[$this->currentStreamKey])) { + return false; + } + + $stream = $streams[$this->currentStreamKey]; + $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey]; + + try { + $topicLen = $stream->read(2, true); + $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen); + $topicLen = array_shift($topicLen); + if ($topicLen <= 0) { + return false; + } + + // topic name + $this->key = $stream->read($topicLen, true); + $this->current = new Partition($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + $this->currentStreamCount++; + + return true; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + $streams = array_values($this->streams); + return $streams[$this->currentStreamKey]; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php new file mode 100644 index 00000000..a31067b5 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php @@ -0,0 +1,230 @@ +stream = $stream; + } + + // }}} + // {{{ public static function Khex2bin() + + /** + * hex to bin + * + * @param string $string + * @static + * @access protected + * @return string (raw) + */ + public static function Khex2bin($string) + { + if (function_exists('\hex2bin')) { + return \hex2bin($string); + } else { + $bin = ''; + $len = strlen($string); + for ($i = 0; $i < $len; $i += 2) { + $bin .= pack('H*', substr($string, $i, 2)); + } + + return $bin; + } + } + + // }}} + // {{{ public static function unpack() + + /** + * Unpack a bit integer as big endian long + * + * @static + * @access public + * @return integer + */ + public static function unpack($type, $bytes) + { + self::checkLen($type, $bytes); + if ($type == self::BIT_B64) { + $set = unpack($type, $bytes); + $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF); + return $original; + } else { + return unpack($type, $bytes); + } + } + + // }}} + // {{{ public static function pack() + + /** + * pack a bit integer as big endian long + * + * @static + * @access public + * @return integer + */ + public static function pack($type, $data) + { + if ($type == self::BIT_B64) { + if ($data == -1) { // -1L + $data = self::Khex2bin('ffffffffffffffff'); + } elseif ($data == -2) { // -2L + $data = self::Khex2bin('fffffffffffffffe'); + } else { + $left = 0xffffffff00000000; + $right = 0x00000000ffffffff; + + $l = ($data & $left) >> 32; + $r = $data & $right; + $data = pack($type, $l, $r); + } + } else { + $data = pack($type, $data); + } + + return $data; + } + + // }}} + // {{{ protected static function checkLen() + + /** + * check unpack bit is valid + * + * @param string $type + * @param string(raw) $bytes + * @static + * @access protected + * @return void + */ + protected static function checkLen($type, $bytes) + { + $len = 0; + switch($type) { + case self::BIT_B64: + $len = 8; + break; + case self::BIT_B32: + $len = 4; + break; + case self::BIT_B16: + $len = 2; + break; + case self::BIT_B8: + $len = 1; + break; + } + + if (strlen($bytes) != $len) { + throw new \Kafka\Exception\Protocol('unpack failed. string(raw) length is ' . strlen($bytes) . ' , TO ' . $type); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Socket.php b/vendor/nmred/kafka-php/src/Kafka/Socket.php new file mode 100644 index 00000000..be7321f3 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Socket.php @@ -0,0 +1,365 @@ +host = $host; + $this->port = $port; + $this->setRecvTimeoutSec($recvTimeoutSec); + $this->setRecvTimeoutUsec($recvTimeoutUsec); + $this->setSendTimeoutSec($sendTimeoutSec); + $this->setSendTimeoutUsec($sendTimeoutUsec); + } + + /** + * @param float $sendTimeoutSec + */ + public function setSendTimeoutSec($sendTimeoutSec) + { + $this->sendTimeoutSec = $sendTimeoutSec; + } + + /** + * @param float $sendTimeoutUsec + */ + public function setSendTimeoutUsec($sendTimeoutUsec) + { + $this->sendTimeoutUsec = $sendTimeoutUsec; + } + + /** + * @param float $recvTimeoutSec + */ + public function setRecvTimeoutSec($recvTimeoutSec) + { + $this->recvTimeoutSec = $recvTimeoutSec; + } + + /** + * @param float $recvTimeoutUsec + */ + public function setRecvTimeoutUsec($recvTimeoutUsec) + { + $this->recvTimeoutUsec = $recvTimeoutUsec; + } + + + + // }}} + // {{{ public static function createFromStream() + + /** + * Optional method to set the internal stream handle + * + * @static + * @access public + * @return void + */ + public static function createFromStream($stream) + { + $socket = new self('localhost', 0); + $socket->setStream($stream); + return $socket; + } + + // }}} + // {{{ public function setStream() + + /** + * Optional method to set the internal stream handle + * + * @param mixed $stream + * @access public + * @return void + */ + public function setStream($stream) + { + $this->stream = $stream; + } + + // }}} + // {{{ public function connect() + + /** + * Connects the socket + * + * @access public + * @return void + */ + public function connect() + { + if (is_resource($this->stream)) { + return false; + } + + if (empty($this->host)) { + throw new \Kafka\Exception('Cannot open null host.'); + } + if ($this->port <= 0) { + throw new \Kafka\Exception('Cannot open without port.'); + } + + $this->stream = @fsockopen( + $this->host, + $this->port, + $errno, + $errstr, + $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000) + ); + + if ($this->stream == false) { + $error = 'Could not connect to ' + . $this->host . ':' . $this->port + . ' ('.$errstr.' ['.$errno.'])'; + throw new \Kafka\Exception\SocketConnect($error); + } + + stream_set_blocking($this->stream, 0); + } + + // }}} + // {{{ public function close() + + /** + * close the socket + * + * @access public + * @return void + */ + public function close() + { + if (is_resource($this->stream)) { + fclose($this->stream); + } + } + + // }}} + // {{{ public function read() + + /** + * Read from the socket at most $len bytes. + * + * This method will not wait for all the requested data, it will return as + * soon as any data is received. + * + * @param integer $len Maximum number of bytes to read. + * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len + * + * @return string Binary data + * @throws Kafka_Exception_Socket + */ + public function read($len, $verifyExactLength = false) + { + if ($len > self::READ_MAX_LEN) { + throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream, length too longer.'); + } + + $null = null; + $read = array($this->stream); + $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); + if ($readable > 0) { + $remainingBytes = $len; + $data = $chunk = ''; + while ($remainingBytes > 0) { + $chunk = fread($this->stream, $remainingBytes); + if ($chunk === false) { + $this->close(); + throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)'); + } + if (strlen($chunk) === 0) { + // Zero bytes because of EOF? + if (feof($this->stream)) { + $this->close(); + throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)'); + } + // Otherwise wait for bytes + $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); + if ($readable !== 1) { + throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go'); + } + continue; // attempt another read + } + $data .= $chunk; + $remainingBytes -= strlen($chunk); + } + if ($len === $remainingBytes || ($verifyExactLength && $len !== strlen($data))) { + // couldn't read anything at all OR reached EOF sooner than expected + $this->close(); + throw new \Kafka\Exception\SocketEOF('Read ' . strlen($data) . ' bytes instead of the requested ' . $len . ' bytes'); + } + + return $data; + } + if (false !== $readable) { + $res = stream_get_meta_data($this->stream); + if (!empty($res['timed_out'])) { + $this->close(); + throw new \Kafka\Exception\SocketTimeout('Timed out reading '.$len.' bytes from stream'); + } + } + $this->close(); + throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (not readable)'); + + } + + // }}} + // {{{ public function write() + + /** + * Write to the socket. + * + * @param string $buf The data to write + * + * @return integer + * @throws Kafka_Exception_Socket + */ + public function write($buf) + { + $null = null; + $write = array($this->stream); + + // fwrite to a socket may be partial, so loop until we + // are done with the entire buffer + $written = 0; + $buflen = strlen($buf); + while ( $written < $buflen ) { + // wait for stream to become available for writing + $writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec); + if ($writable > 0) { + // write remaining buffer bytes to stream + $wrote = fwrite($this->stream, substr($buf, $written)); + if ($wrote === -1 || $wrote === false) { + throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' bytes'); + } + $written += $wrote; + continue; + } + if (false !== $writable) { + $res = stream_get_meta_data($this->stream); + if (!empty($res['timed_out'])) { + throw new \Kafka\Exception\SocketTimeout('Timed out writing ' . strlen($buf) . ' bytes to stream after writing ' . $written . ' bytes'); + } + } + throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream'); + } + return $written; + } + + // }}} + // {{{ public function rewind() + + /** + * Rewind the stream + * + * @return void + */ + public function rewind() + { + if (is_resource($this->stream)) { + rewind($this->stream); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php b/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php new file mode 100644 index 00000000..f48b5cb9 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php @@ -0,0 +1,364 @@ +zookeeper = new \ZooKeeper($hostList, null, $timeout); + } else { + $this->zookeeper = new \ZooKeeper($hostList); + } + } + + // }}} + // {{{ public function listBrokers() + + /** + * get broker list using zookeeper + * + * @access public + * @return array + */ + public function listBrokers() + { + $result = array(); + $lists = $this->zookeeper->getChildren(self::BROKER_PATH); + if (!empty($lists)) { + foreach ($lists as $brokerId) { + $brokerDetail = $this->getBrokerDetail($brokerId); + if (!$brokerDetail) { + continue; + } + $result[$brokerId] = $brokerDetail; + } + } + + return $result; + } + + // }}} + // {{{ public function getBrokerDetail() + + /** + * get broker detail + * + * @param integer $brokerId + * @access public + * @return void + */ + public function getBrokerDetail($brokerId) + { + $result = array(); + $path = sprintf(self::BROKER_DETAIL_PATH, (int) $brokerId); + if ($this->zookeeper->exists($path)) { + $result = $this->zookeeper->get($path); + if (!$result) { + return false; + } + + $result = json_decode($result, true); + } + + return $result; + } + + // }}} + // {{{ public function getTopicDetail() + + /** + * get topic detail + * + * @param string $topicName + * @access public + * @return void + */ + public function getTopicDetail($topicName) + { + $result = array(); + $path = sprintf(self::TOPIC_PATCH, (string) $topicName); + if ($this->zookeeper->exists($path)) { + $result = $this->zookeeper->get($path); + if (!$result) { + return false; + } + $result = json_decode($result, true); + } + + return $result; + } + + // }}} + // {{{ public function getPartitionState() + + /** + * get partition state + * + * @param string $topicName + * @param integer $partitionId + * @access public + * @return void + */ + public function getPartitionState($topicName, $partitionId = 0) + { + $result = array(); + $path = sprintf(self::PARTITION_STATE, (string) $topicName, (int) $partitionId); + if ($this->zookeeper->exists($path)) { + $result = $this->zookeeper->get($path); + if (!$result) { + return false; + } + $result = json_decode($result, true); + } + + return $result; + } + + // }}} + // {{{ public function registerConsumer() + + /** + * register consumer + * + * @param string $topicName + * @param integer $partitionId + * @access public + * @return void + */ + public function registerConsumer($groupId, $consumerId, $topics = array()) + { + if (empty($topics)) { + return true; + } + + $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId); + $subData = array(); + foreach ($topics as $topic) { + $subData[$topic] = 1; + } + $data = array( + 'version' => '1', + 'pattern' => 'white_list', + 'subscription' => $subData, + ); + if (!$this->zookeeper->exists($path)) { + $this->makeZkPath($path); + $this->makeZkNode($path, json_encode($data)); + } else { + $this->zookeeper->set($path, json_encode($data)); + } + } + + // }}} + // {{{ public function listConsumer() + + /** + * list consumer + * + * @param string $groupId + * @access public + * @return void + */ + public function listConsumer($groupId) + { + $path = sprintf(self::LIST_CONSUMER, (string) $groupId); + if (!$this->zookeeper->exists($path)) { + return array(); + } else { + return $this->zookeeper->getChildren($path); + } + } + + // }}} + // {{{ public function getConsumersPerTopic() + + /** + * get consumer per topic + * + * @param string $groupId + * @access public + * @return array + */ + public function getConsumersPerTopic($groupId) + { + $consumers = $this->listConsumer($groupId); + if (empty($consumers)) { + return array(); + } + + $topics = array(); + foreach ($consumers as $consumerId) { + $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId); + if (!$this->zookeeper->exists($path)) { + continue; + } + + $info = $this->zookeeper->get($path); + $info = json_decode($info, true); + $subTopic = isset($info['subscription']) ? $info['subscription'] : array(); + foreach ($subTopic as $topic => $num) { + $topics[$topic] = $consumerId; + } + } + + return $topics; + } + + // }}} + // {{{ public function addPartitionOwner() + + /** + * add partition owner + * + * @param string $groupId + * @param string $topicName + * @param integer $partitionId + * @param string $consumerId + * @access public + * @return void + */ + public function addPartitionOwner($groupId, $topicName, $partitionId, $consumerId) + { + $path = sprintf(self::PARTITION_OWNER, (string) $groupId, $topicName, (string) $partitionId); + if (!$this->zookeeper->exists($path)) { + $this->makeZkPath($path); + $this->makeZkNode($path, $consumerId); + } else { + $this->zookeeper->set($path, $consumerId); + } + } + + // }}} + // {{{ protected function makeZkPath() + + /** + * Equivalent of "mkdir -p" on ZooKeeper + * + * @param string $path The path to the node + * @param mixed $value The value to assign to each new node along the path + * + * @return bool + */ + protected function makeZkPath($path, $value = 0) + { + $parts = explode('/', $path); + $parts = array_filter($parts); + $subpath = ''; + while (count($parts) > 1) { + $subpath .= '/' . array_shift($parts); + if (!$this->zookeeper->exists($subpath)) { + $this->makeZkNode($subpath, $value); + } + } + } + + // }}} + // {{{ protected function makeZkNode() + + /** + * Create a node on ZooKeeper at the given path + * + * @param string $path The path to the node + * @param mixed $value The value to assign to the new node + * + * @return bool + */ + protected function makeZkNode($path, $value) + { + $params = array( + array( + 'perms' => \Zookeeper::PERM_ALL, + 'scheme' => 'world', + 'id' => 'anyone', + ) + ); + return $this->zookeeper->create($path, $value, $params); + } + + // }}} + // }}} +} -- cgit v1.2.3-54-g00ecf