diff options
Diffstat (limited to 'vendor/nmred/kafka-php')
31 files changed, 6137 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/LICENSE b/vendor/nmred/kafka-php/LICENSE new file mode 100644 index 00000000..96d08492 --- /dev/null +++ b/vendor/nmred/kafka-php/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2014, SWANSOFT +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the {organization} nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/nmred/kafka-php/README.md b/vendor/nmred/kafka-php/README.md new file mode 100644 index 00000000..5ed12e23 --- /dev/null +++ b/vendor/nmred/kafka-php/README.md @@ -0,0 +1,496 @@ +Kafka-php +========== + +[![Build Status](https://travis-ci.org/nmred/kafka-php.svg?branch=master)](https://travis-ci.org/nmred/Kafka-php) + +Kafka-php is a php client with Zookeeper integration for apache Kafka. It only supports the latest version of Kafka 0.8 which is still under development, so this module is _not production ready_ so far. + +The Zookeeper integration does the following jobs: + +* Loads broker metadata from Zookeeper before we can communicate with the Kafka server +* Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client + +## Requirements + +* Minimum PHP version: 5.3.3. +* Apache Kafka 0.8.x +* You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at https://github.com/kafka-dev/kafka [kafka-08-quick-start](https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html) +* The [PHP Zookeeper extension](https://github.com/andreiz/php-zookeeper) is required if you want to use the Zookeeper-based consumer. +* Productor can not dependency zookeeper + +## Installation +Add the lib directory to the PHP include_path and use an autoloader like the one in the examples directory (the code follows the PEAR/Zend one-class-per-file convention). + +## Composer Install + +Simply add a dependency on nmred/kafka-php to your project's composer.json file if you use Composer to manage the dependencies of your project. Here is a minimal example of a composer.json file : + +``` +{ + "require": { + "nmred/kafka-php": "0.1.*" + } +} +``` + +## Produce + +### \Kafka\Produce::getInstance($hostList, $timeout) + +* `hostList` : zookeeper host list , example 127.0.0.1:2181,192.168.1.114:2181 +* `timeout` : zookeeper timeout + +### \Kafka\Produce::setRequireAck($ack = -1) + +* `ack`: This field indicates how many acknowledgements the servers should receive before responding to the request. + +### \Kafka\Produce::setMessages($topicName, $partitionId, $messages) + +* `topicName` : The topic that data is being published to. +* `partitionId` : The partition that data is being published to. +* `messages` : [Array] publish message. + +### \Kafka\Produce::send() + +send message sets to the server. + +### Example + +``` php +$produce = \Kafka\Produce::getInstance('localhost:2181', 3000); + +$produce->setRequireAck(-1); +$produce->setMessages('test', 0, array('test1111111')); +$produce->setMessages('test6', 0, array('test1111111')); +$produce->setMessages('test6', 2, array('test1111111')); +$produce->setMessages('test6', 1, array('test111111111133')); +$result = $produce->send(); +var_dump($result); + +``` + +## Consumer + +### \Kafka\Consumer::getInstance($hostList, $timeout) + +* `hostList` : zookeeper host list , example 127.0.0.1:2181,192.168.1.114:2181 +* `timeout` : zookeeper timeout + +### \Kafka\Consumer::setGroup($groupName) + +* `groupName` : Specify consumer group. + +### \Kafka\Consumer::setPartition($topicName, $partitionId, $offset = 0) + +* `topicName` : The topic that data is being fetch to. +* `partitionId` : The partition that data is being fetch to. +* `offset`: set fetch offset. default `0`. + +### \Kafka\Consumer::fetch() + +return fetch message Iterator. `\Kafka\Protocol\Fetch\Topic` + +### \Kafka\Protocol\Fetch\Topic + +this object is iterator + +`key` : topic name +`value`: `\Kafka\Protocol\Fetch\Partition` + +### \Kafka\Protocol\Fetch\Partition + +this object is iterator. + +`key`: partition id +`value`: messageSet object + +#### \Kafka\Protocol\Fetch\Partition::getErrCode() + +return partition fetch errcode. + +#### \Kafka\Protocol\Fetch\Partition::getHighOffset() + +return partition fetch offset. + +### \Kafka\Protocol\Fetch\MessageSet + +this object is iterator. `\Kafka\Protocol\Fetch\Message` + +### Example + +``` php +$consumer = \Kafka\Consumer::getInstance('localhost:2181'); + +$consumer->setGroup('testgroup'); +$consumer->setPartition('test', 0); +$consumer->setPartition('test6', 2, 10); +$result = $consumer->fetch(); +foreach ($result as $topicName => $topic) { + foreach ($topic as $partId => $partition) { + var_dump($partition->getHighOffset()); + foreach ($partition as $message) { + var_dump((string)$message); + } + } +} +``` + +## Basic Protocol +### Produce API + +The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request. + +\Kafka\Protocol\Encoder::produceRequest + +#### Param struct +``` php +array( + 'required_ack' => 1, + // This field indicates how many acknowledgements the servers should receive before responding to the request. default `0` + // If it is 0 the server will not send any response + // If it is -1 the server will block until the message is committed by all in sync replicas before sending a response + // For any number > 1 the server will block waiting for this number of acknowledgements to occur + 'timeout' => 1000, + // This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks. + 'data' => array( + array( + 'topic_name' => 'testtopic', + // The topic that data is being published to.[String] + 'partitions' => array( + array( + 'partition_id' => 0, + // The partition that data is being published to. + 'messages' => array( + 'message1', + // [String] message + ), + ), + ), + ), + ), +); +``` + +#### Return + +Array + +#### Example + +``` php + +$data = array( + 'required_ack' => 1, + 'timeout' => 1000, + 'data' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + array( + 'partition_id' => 0, + 'messages' => array( + 'message1', + 'message2', + ), + ), + ), + ), + ), +); + +$conn = new \Kafka\Socket('localhost', '9092'); +$conn->connect(); +$encoder = new \Kafka\Protocol\Encoder($conn); +$encoder->produceRequest($data); + +$decoder = new \Kafka\Protocol\Decoder($conn); +$result = $decoder->produceResponse(); +var_dump($result); + +``` +### Fetch API + +The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and gets back a chunk of messages + +\Kafka\Protocol\Encoder::fetchRequest + +#### Param struct +``` php +array( + 'replica_id' => -1, + // The replica id indicates the node id of the replica initiating this request. default `-1` + 'max_wait_time' => 100, + // The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. default 100 ms. + 'min_bytes' => 64 * 1024 // 64k + // This is the minimum number of bytes of messages that must be available to give a response. default 64k. + 'data' => array( + array( + 'topic_name' => 'testtopic', + // The topic that data is being published to.[String] + 'partitions' => array( + array( + 'partition_id' => 0, + // The partition that data is being published to. + 'offset' => 0, + // The offset to begin this fetch from. default 0 + 'max_bytes' => 100 * 1024 * 1024, + // This is the minimum number of bytes of messages that must be available to give a response. default 100Mb + ), + ), + ), + ), +); +``` + +#### Return + +\Kafka\Protocol\Fetch\Topic iterator + +#### Example +``` php + +$data = array( + 'data' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + array( + 'partition_id' => 0, + 'offset' => 0, + ), + ), + ), + ), +); + +$conn = new \Kafka\Socket('localhost', '9092'); +$conn->connect(); +$encoder = new \Kafka\Protocol\Encoder($conn); +$encoder->fetchRequest($data); + +$decoder = new \Kafka\Protocol\Decoder($conn); +$result = $decoder->fetchResponse(); +var_dump($result); + +``` +### Offset API + +This API describes the valid offset range available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API. + +\Kafka\Protocol\Encoder::offsetRequest + +####param struct +``` php +array( + 'replica_id' => -1, + // The replica id indicates the node id of the replica initiating this request. default `-1` + 'data' => array( + array( + 'topic_name' => 'testtopic', + // The topic that data is being published to.[String] + 'partitions' => array( + array( + 'partition_id' => 0, + // The partition that get offset . + 'time' => -1, + // Used to ask for all messages before a certain time (ms). + // Specify -1 to receive the latest offsets + // Specify -2 to receive the earliest available offset. + 'max_offset' => 1, + // max return offset element. default 10000. + ), + ), + ), + ), +); +``` + +#### Return + +Array. + +#### Example + +``` php + +$data = array( + 'data' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + array( + 'partition_id' => 0, + 'max_offset' => 10, + 'time' => -1, + ), + ), + ), + ), +); + +$conn = new \Kafka\Socket('localhost', '9092'); +$conn->connect(); +$encoder = new \Kafka\Protocol\Encoder($conn); +$encoder->offsetRequest($data); + +$decoder = new \Kafka\Protocol\Decoder($conn); +$result = $decoder->offsetResponse(); +var_dump($result); + +``` +### Metadata API + +The metdata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync. + +\Kafka\Protocol\Encoder::metadataRequest + +####param struct +``` php +array( + 'topic_name1', // topic name +); +``` + +#### Return + +Array. + +#### Example + +``` php + +$data = array( + 'test' +); + +$conn = new \Kafka\Socket('localhost', '9092'); +$conn->connect(); +$encoder = new \Kafka\Protocol\Encoder($conn); +$encoder->metadataRequest($data); + +$decoder = new \Kafka\Protocol\Decoder($conn); +$result = $decoder->metadataResponse(); +var_dump($result); + +``` +### Offset Commit API + +These APIs allow for centralized management of offsets. + +\Kafka\Protocol\Encoder::commitOffsetRequest + +####param struct +``` php +array( + 'group_id' => 'testgroup', + // consumer group + 'data' => array( + array( + 'topic_name' => 'testtopic', + // The topic that data is being published to.[String] + 'partitions' => array( + array( + 'partition_id' => 0, + // The partition that get offset . + 'offset' => 0, + // The offset to begin this fetch from. + 'time' => -1, + // If the time stamp field is set to -1, then the broker sets the time stamp to the receive time before committing the offset. + ), + ), + ), + ), +); +``` + +#### Return + +Array. + +#### Example + +``` php +$data = array( + 'group_id' => 'testgroup', + 'data' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + array( + 'partition_id' => 0, + 'offset' => 2, + ), + ), + ), + ), +); + + +$conn = new \Kafka\Socket('localhost', '9092'); +$conn->connect(); +$encoder = new \Kafka\Protocol\Encoder($conn); +$encoder->commitOffsetRequest($data); + +$decoder = new \Kafka\Protocol\Decoder($conn); +$result = $decoder->commitOffsetResponse(); +var_dump($result); + +``` +### Offset Fetch API + +These APIs allow for centralized management of offsets. + +\Kafka\Protocol\Encoder::fetchOffsetRequest + +####param struct +``` php +array( + 'group_id' => 'testgroup', + // consumer group + 'data' => array( + array( + 'topic_name' => 'testtopic', + // The topic that data is being published to.[String] + 'partitions' => array( + array( + 'partition_id' => 0, + // The partition that get offset . + ), + ), + ), + ), +); +``` + +#### Return + +Array. + +#### Example + +``` php +$data = array( + 'group_id' => 'testgroup', + 'data' => array( + array( + 'topic_name' => 'test', + 'partitions' => array( + array( + 'partition_id' => 0, + ), + ), + ), + ), +); + + +$conn = new \Kafka\Socket('localhost', '9092'); +$conn->connect(); +$encoder = new \Kafka\Protocol\Encoder($conn); +$encoder->fetchOffsetRequest($data); + +$decoder = new \Kafka\Protocol\Decoder($conn); +$result = $decoder->fetchOffsetResponse(); +var_dump($result); + +``` diff --git a/vendor/nmred/kafka-php/src/Kafka/Client.php b/vendor/nmred/kafka-php/src/Kafka/Client.php new file mode 100644 index 00000000..a38e705b --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Client.php @@ -0,0 +1,290 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Client +{ + // {{{ consts + // }}} + // {{{ members + + /** + * cluster metadata + * + * @var \Kafka\ClusterMetaData + * @access private + */ + private $metadata = null; + + /** + * broker host list + * + * @var array + * @access private + */ + private $hostList = array(); + + /** + * save broker connection + * + * @var array + * @access private + */ + private static $stream = array(); + + /** + * default stream options + * + * @var array + * @access private + */ + private $streamOptions = array( + 'RecvTimeoutSec' => 0, + 'RecvTimeoutUsec' => 750000, + 'SendTimeoutSec' => 0, + 'SendTimeoutUsec' => 100000, + ); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct(ClusterMetaData $metadata) + { + $this->metadata = $metadata; + if (method_exists($metadata, 'setClient')) { + $this->metadata->setClient($this); + } + } + + /** + * update stream options + * + * @param array $options + */ + public function setStreamOptions($options = array()) + { + // Merge the arrays + $this->streamOptions = array_merge($this->streamOptions, $options); + $this->updateStreamOptions(); + } + + /** + * @access public + * @param $name - name of stream option + * @param $value - value for option + */ + public function setStreamOption($name, $value) + { + $this->streamOptions[$name] = $value; + $this->updateStreamOptions(); + } + + /** + * @access public + * @param $name - name of option + * @return mixed + */ + public function getStreamOption($name) + { + if (array_key_exists($name, $this->streamOptions)) { + return $this->streamOptions[$name]; + } + return null; + } + + /** + * @access private + */ + private function updateStreamOptions() + { + // Loop thru each stream + foreach (self::$stream as $host => $streams) { + foreach ($streams as $key => $info) { + // Update options + if (isset($info['stream'])) { + /** @var \Kafka\Socket $stream */ + $stream = $info['stream']; + $stream->setRecvTimeoutSec($this->streamOptions['RecvTimeoutSec']); + $stream->setRecvTimeoutUsec($this->streamOptions['SendTimeoutUsec']); + $stream->setSendTimeoutSec($this->streamOptions['SendTimeoutSec']); + $stream->setSendTimeoutUsec($this->streamOptions['SendTimeoutUsec']); + } + } + } + } + + // }}} + // {{{ public function getBrokers() + + /** + * get broker server + * + * @access public + * @return void + */ + public function getBrokers() + { + if (empty($this->hostList)) { + $brokerList = $this->metadata->listBrokers(); + foreach ($brokerList as $brokerId => $info) { + if (!isset($info['host']) || !isset($info['port'])) { + continue; + } + $this->hostList[$brokerId] = $info['host'] . ':' . $info['port']; + } + } + + return $this->hostList; + } + + // }}} + // {{{ public function getHostByPartition() + + /** + * get broker host by topic partition + * + * @param string $topicName + * @param int $partitionId + * @access public + * @return string + */ + public function getHostByPartition($topicName, $partitionId = 0) + { + $partitionInfo = $this->metadata->getPartitionState($topicName, $partitionId); + if (!$partitionInfo) { + throw new \Kafka\Exception('topic:' . $topicName . ', partition id: ' . $partitionId . ' is not exists.'); + } + + $hostList = $this->getBrokers(); + if (isset($partitionInfo['leader']) && isset($hostList[$partitionInfo['leader']])) { + return $hostList[$partitionInfo['leader']]; + } else { + throw new \Kafka\Exception('can\'t find broker host.'); + } + } + + // }}} + // {{{ public function getZooKeeper() + + /** + * get kafka zookeeper object + * + * @access public + * @return \Kafka\ZooKeeper + */ + public function getZooKeeper() + { + if ($this->metadata instanceof \Kafka\ZooKeeper) { + return $this->metadata; + } else { + throw new \Kafka\Exception( 'ZooKeeper was not provided' ); + } + } + + // }}} + // {{{ public function getStream() + + /** + * get broker broker connect + * + * @param string $host + * @access private + * @return void + */ + public function getStream($host, $lockKey = null) + { + if (!$lockKey) { + $lockKey = uniqid($host); + } + + list($hostname, $port) = explode(':', $host); + // find unlock stream + if (isset(self::$stream[$host])) { + foreach (self::$stream[$host] as $key => $info) { + if ($info['locked']) { + continue; + } else { + self::$stream[$host][$key]['locked'] = true; + $info['stream']->connect(); + return array('key' => $key, 'stream' => $info['stream']); + } + } + } + + // no idle stream + $stream = new \Kafka\Socket($hostname, $port, $this->getStreamOption('RecvTimeoutSec'), $this->getStreamOption('RecvTimeoutUsec'), $this->getStreamOption('SendTimeoutSec'), $this->getStreamOption('SendTimeoutUsec')); + $stream->connect(); + self::$stream[$host][$lockKey] = array( + 'locked' => true, + 'stream' => $stream, + ); + return array('key' => $lockKey, 'stream' => $stream); + } + + // }}} + // {{{ public function freeStream() + + /** + * free stream pool + * + * @param string $key + * @access public + * @return void + */ + public function freeStream($key) + { + foreach (self::$stream as $host => $values) { + if (isset($values[$key])) { + self::$stream[$host][$key]['locked'] = false; + } + } + } + + // }}} + // {{{ public function getTopicDetail() + + /** + * get topic detail info + * + * @param string $topicName + * @return array + */ + public function getTopicDetail($topicName) + { + return $this->metadata->getTopicDetail($topicName); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php b/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php new file mode 100644 index 00000000..e9b3d064 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php @@ -0,0 +1,53 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Metadata about the kafka cluster ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author ebernhardson@wikimedia.org ++------------------------------------------------------------------------------ +*/ + +interface ClusterMetaData +{ + /** + * get broker list from kafka metadata + * + * @access public + * @return array + */ + public function listBrokers(); + + /** + * @param string $topicName + * @param integer $partitionId + * @access public + * @return array + */ + public function getPartitionState($topicName, $partitionId = 0); + + /** + * @param string $topicName + * @access public + * @return array + */ + public function getTopicDetail($topicName); +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Consumer.php new file mode 100644 index 00000000..5ff2d43f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Consumer.php @@ -0,0 +1,378 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Consumer +{ + // {{{ consts + // }}} + // {{{ members + + /** + * client + * + * @var mixed + * @access private + */ + private $client = null; + + /** + * send message options cache + * + * @var array + * @access private + */ + private $payload = array(); + + /** + * consumer group + * + * @var string + * @access private + */ + private $group = ''; + + /** + * from offset + * + * @var mixed + * @access private + */ + private $fromOffset = true; + + /** + * produce instance + * + * @var \Kafka\Produce + * @access private + */ + private static $instance = null; + + /** + * broker host list + * + * @var array + * @access private + */ + private $hostList = array(); + + /** + * save broker connection + * + * @var array + * @access private + */ + private $stream = array(); + + /** + * maxSize + * + * @var integer + */ + private $maxSize = 1048576; + + /** + * offsetStrategy + * @var integer + */ + private $offsetStrategy = \Kafka\Offset::DEFAULT_EARLY; + + // }}} + // {{{ functions + // {{{ public function static getInstance() + + /** + * set send messages + * + * @access public + * @return void + */ + public static function getInstance($hostList, $timeout = null) + { + if (is_null(self::$instance)) { + self::$instance = new self($hostList, $timeout); + } + + return self::$instance; + } + + // }}} + // {{{ private function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + private function __construct($hostList, $timeout = null) + { + $zookeeper = new \Kafka\ZooKeeper($hostList, $timeout); + $this->client = new \Kafka\Client($zookeeper); + } + + // }}} + // {{{ public function clearPayload() + + /** + * clearPayload + * + * @access public + * @return void + */ + public function clearPayload() + { + $this->payload = array(); + } + + // }}} + // {{{ public function setTopic() + + /** + * set topic name + * + * @access public + * @return void + */ + public function setTopic($topicName, $defaultOffset = null) + { + $parts = $this->client->getTopicDetail($topicName); + if (!isset($parts['partitions']) || empty($parts['partitions'])) { + // set topic fail. + return $this; + } + + foreach ($parts['partitions'] as $partId => $info) { + $this->setPartition($topicName, $partId, $defaultOffset); + } + + return $this; + } + + // }}} + // {{{ public function setPartition() + + /** + * set topic partition + * + * @access public + * @return void + */ + public function setPartition($topicName, $partitionId = 0, $offset = null) + { + if (is_null($offset)) { + if ($this->fromOffset) { + $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); + $offset = $offsetObject->getOffset($this->offsetStrategy); + \Kafka\Log::log('topic name:' . $topicName . ', part:' . $partitionId . 'get offset from kafka server, offet:' . $offset, LOG_DEBUG); + } else { + $offset = 0; + } + } + $this->payload[$topicName][$partitionId] = $offset; + + return $this; + } + + // }}} + // {{{ public function setFromOffset() + + /** + * set whether starting offset fetch + * + * @param boolean $fromOffset + * @access public + * @return void + */ + public function setFromOffset($fromOffset) + { + $this->fromOffset = (boolean) $fromOffset; + } + + // }}} + // {{{ public function setMaxBytes() + + /** + * set fetch message max bytes + * + * @param int $maxSize + * @access public + * @return void + */ + public function setMaxBytes($maxSize) + { + $this->maxSize = $maxSize; + } + + // }}} + // {{{ public function setGroup() + + /** + * set consumer group + * + * @param string $group + * @access public + * @return void + */ + public function setGroup($group) + { + $this->group = (string) $group; + return $this; + } + + // }}} + // {{{ public function fetch() + + /** + * fetch message to broker + * + * @access public + * @return void + */ + public function fetch() + { + $data = $this->_formatPayload(); + if (empty($data)) { + return false; + } + + $responseData = array(); + $streams = array(); + foreach ($data as $host => $requestData) { + $connArr = $this->client->getStream($host); + $conn = $connArr['stream']; + $encoder = new \Kafka\Protocol\Encoder($conn); + $encoder->fetchRequest($requestData); + $streams[$connArr['key']] = $conn; + } + + $fetch = new \Kafka\Protocol\Fetch\Topic($streams, $data); + + // register fetch helper + $freeStream = new \Kafka\Protocol\Fetch\Helper\FreeStream($this->client); + $freeStream->setStreams($streams); + \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('freeStream', $freeStream); + + // register partition commit offset + $commitOffset = new \Kafka\Protocol\Fetch\Helper\CommitOffset($this->client); + $commitOffset->setGroup($this->group); + \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('commitOffset', $commitOffset); + + $updateConsumer = new \Kafka\Protocol\Fetch\Helper\Consumer($this); + \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('updateConsumer', $updateConsumer); + + return $fetch; + } + + // }}} + // {{{ public function getClient() + + /** + * get client object + * + * @access public + * @return void + */ + public function getClient() + { + return $this->client; + } + + /** + * passthru method to client for setting stream options + * + * @param array $options + */ + public function setStreamOptions($options = array()) + { + $this->client->setStreamOptions($options); + } + + // }}} + // {{{ private function _formatPayload() + + /** + * format payload array + * + * @access private + * @return array + */ + private function _formatPayload() + { + if (empty($this->payload)) { + return array(); + } + + $data = array(); + foreach ($this->payload as $topicName => $partitions) { + foreach ($partitions as $partitionId => $offset) { + $host = $this->client->getHostByPartition($topicName, $partitionId); + $data[$host][$topicName][$partitionId] = $offset; + } + } + + $requestData = array(); + foreach ($data as $host => $info) { + $topicData = array(); + foreach ($info as $topicName => $partitions) { + $partitionData = array(); + foreach ($partitions as $partitionId => $offset) { + $partitionData[] = array( + 'partition_id' => $partitionId, + 'offset' => $offset, + 'max_bytes' => $this->maxSize, + ); + } + $topicData[] = array( + 'topic_name' => $topicName, + 'partitions' => $partitionData, + ); + } + + $requestData[$host] = array( + 'data' => $topicData, + ); + } + + return $requestData; + } + + /** + * const LAST_OFFSET = -1; + * const EARLIEST_OFFSET = -2; + * const DEFAULT_LAST = -2; + * const DEFAULT_EARLY = -1; + * @param type $offsetStrategy + */ + public function setOffsetStrategy($offsetStrategy) + { + $this->offsetStrategy = $offsetStrategy; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception.php b/vendor/nmred/kafka-php/src/Kafka/Exception.php new file mode 100644 index 00000000..f336f1c3 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception.php @@ -0,0 +1,31 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Exception extends \RuntimeException +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php b/vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php new file mode 100644 index 00000000..011129a2 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php @@ -0,0 +1,33 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Exception; + +use \Kafka\Exception; + +/** ++------------------------------------------------------------------------------ +* Kafka php client exception ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class NotSupported extends \Exception +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php b/vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php new file mode 100644 index 00000000..374d1538 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php @@ -0,0 +1,33 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Exception; + +use \Kafka\Exception; + +/** ++------------------------------------------------------------------------------ +* Kafka php client exception ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class OutOfRange extends Exception +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php b/vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php new file mode 100644 index 00000000..6e213f05 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php @@ -0,0 +1,33 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Exception; + +use \Kafka\Exception; + +/** ++------------------------------------------------------------------------------ +* Kafka php client exception ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Protocol extends \Exception +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php b/vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php new file mode 100644 index 00000000..aca93e25 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php @@ -0,0 +1,33 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Exception; + +use \Kafka\Exception; + +/** ++------------------------------------------------------------------------------ +* Kafka php client exception ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Socket extends \Exception +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php new file mode 100644 index 00000000..476b2a48 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php @@ -0,0 +1,33 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Exception; + +use \Kafka\Exception; + +/** ++------------------------------------------------------------------------------ + * Kafka php client exception ++------------------------------------------------------------------------------ + * + * @package + * @version $_SWANBR_VERSION_$ + * @copyright Copyleft + * @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ + */ + +class SocketConnect extends Socket +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php new file mode 100644 index 00000000..35e34e07 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php @@ -0,0 +1,33 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Exception; + +use \Kafka\Exception; + +/** ++------------------------------------------------------------------------------ +* Kafka php client exception ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class SocketEOF extends Exception +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php new file mode 100644 index 00000000..b0d38c2c --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php @@ -0,0 +1,33 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Exception; + +use \Kafka\Exception; + +/** ++------------------------------------------------------------------------------ +* Kafka php client exception ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class SocketTimeout extends Exception +{ +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Log.php b/vendor/nmred/kafka-php/src/Kafka/Log.php new file mode 100644 index 00000000..481bfc3d --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Log.php @@ -0,0 +1,78 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Log +{ + // {{{ consts + // }}} + // {{{ members + + /** + * log + * + * @var mixed + * @access private + */ + private static $log = null; + + // }}} + // {{{ functions + // {{{ public static function setLog() + + /** + * setLog + * + * @access public + * @return void + */ + public static function setLog($log) + { + if ($log) { + self::$log = $log; + } + } + + // }}} + // {{{ public static function log() + + /** + * log + * + * @access public + * @return void + */ + public static function log($message, $level = LOG_DEBUG) + { + if (self::$log && method_exists(self::$log, 'log')) { + self::$log->log($message, $level); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php new file mode 100644 index 00000000..9d2c613e --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php @@ -0,0 +1,200 @@ +<?php + +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Cluster metadata provided by kafka ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author ebernhardson@wikimedia.org ++------------------------------------------------------------------------------ +*/ + +class MetaDataFromKafka implements ClusterMetaData +{ + // {{{ consts + // }}} + // {{{ members + + /** + * client + * + * @var \Kafka\Client + * @access private + */ + private $client; + + /** + * list of kafka brokers to get metadata from + * + * @var array + * @access private + */ + private $hostList; + + /** + * List of all kafka brokers + * + * @var array + * @access private + */ + private $brokers = array(); + + /** + * List of all loaded topic metadata + * + * @var array + * @access private + */ + private $topics = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * @var string|array $hostList List of kafka brokers to get metadata from + * @access public + */ + public function __construct($hostList) + { + if (is_string($hostList)) { // support host list 127.0.0.1:9092,192.168.2.11:9092 form + $this->hostList = explode(',', $hostList); + } else { + $this->hostList = (array)$hostList; + } + // randomize the order of servers we collect metadata from + shuffle($this->hostList); + } + + // }}} + // {{{ public function setClient() + + /** + * @var \Kafka\Client $client + * @access public + * @return void + */ + public function setClient(\Kafka\Client $client) + { + $this->client = $client; + } + + // }}} + // {{{ public function listBrokers() + + /** + * get broker list from kafka metadata + * + * @access public + * @return array + */ + public function listBrokers() + { + if ($this->brokers === null) { + $this->loadBrokers(); + } + return $this->brokers; + } + + // }}} + // {{{ public function getPartitionState() + + public function getPartitionState($topicName, $partitionId = 0) + { + if (!isset( $this->topics[$topicName] ) ) { + $this->loadTopicDetail(array($topicName)); + } + if ( isset( $this->topics[$topicName]['partitions'][$partitionId] ) ) { + return $this->topics[$topicName]['partitions'][$partitionId]; + } else { + return null; + } + } + + // }}} + // {{{ public function getTopicDetail() + + /** + * + * @param string $topicName + * @access public + * @return array + */ + public function getTopicDetail($topicName) + { + if (!isset( $this->topics[$topicName] ) ) { + $this->loadTopicDetail(array($topicName)); + } + if (isset( $this->topics[$topicName] ) ) { + return $this->topics[$topicName]; + } else { + return array(); + } + } + + // }}} + // {{{ private function loadBrokers() + + private function loadBrokers() + { + $this->brokers = array(); + // not sure how to ask for only the brokers without a topic... + // just ask for a topic we don't care about + $this->loadTopicDetail(array('test')); + } + + // }}} + // {{{ private function loadTopicDetail() + + private function loadTopicDetail(array $topics) + { + if ($this->client === null) { + throw new \Kafka\Exception('client was not provided'); + } + $response = null; + foreach ($this->hostList as $host) { + try { + $response = null; + $stream = $this->client->getStream($host); + $conn = $stream['stream']; + $encoder = new \Kafka\Protocol\Encoder($conn); + $encoder->metadataRequest($topics); + $decoder = new \Kafka\Protocol\Decoder($conn); + $response = $decoder->metadataResponse(); + $this->client->freeStream($stream['key']); + break; + } catch (\Kafka\Exception $e) { + // keep trying + } + } + if ($response) { + // Merge arrays using "+" operator to preserve key (which are broker IDs) + // instead of array_merge (which reindex numeric keys) + $this->brokers = $response['brokers'] + $this->brokers; + $this->topics = array_merge($response['topics'], $this->topics); + } else { + throw new \Kafka\Exception('Could not connect to any kafka brokers'); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Offset.php b/vendor/nmred/kafka-php/src/Kafka/Offset.php new file mode 100644 index 00000000..7ad3f9d8 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Offset.php @@ -0,0 +1,305 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +use \Kafka\Log; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Offset +{ + // {{{ consts + + /** + * receive the latest offset + */ + const LAST_OFFSET = -1; + + /** + * receive the earliest available offset. + */ + const EARLIEST_OFFSET = -2; + + /** + * function getOffset if read invalid value use latest offset instead of + */ + const DEFAULT_LAST = -2; + + /** + * function getOffset if read invalid value use earliest offset instead of + */ + const DEFAULT_EARLY = -1; + + // }}} + // {{{ members + + /** + * client + * + * @var mixed + * @access private + */ + private $client = null; + + /** + * consumer group + * + * @var string + * @access private + */ + private $groupId = ''; + + /** + * topic name + * + * @var string + * @access private + */ + private $topicName = ''; + + /** + * topic partition id, default 0 + * + * @var float + * @access private + */ + private $partitionId = 0; + + /** + * encoder + * + * @var mixed + * @access private + */ + private $encoder = null; + + /** + * decoder + * + * @var mixed + * @access private + */ + private $decoder = null; + + /** + * streamKey + * + * @var string + * @access private + */ + private $streamKey = ''; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($client, $groupId, $topicName, $partitionId = 0) + { + $this->client = $client; + $this->groupId = $groupId; + $this->topicName = $topicName; + $this->partitionId = $partitionId; + + $host = $this->client->getHostByPartition($topicName, $partitionId); + $stream = $this->client->getStream($host); + $conn = $stream['stream']; + $this->streamKey = $stream['key']; + $this->encoder = new \Kafka\Protocol\Encoder($conn); + $this->decoder = new \Kafka\Protocol\Decoder($conn); + } + + // }}} + // {{{ public function setOffset() + + /** + * set consumer offset + * + * @param integer $offset + * @access public + * @return void + */ + public function setOffset($offset) + { + $maxOffset = $this->getProduceOffset(); + if ($offset > $maxOffset) { + throw new \Kafka\Exception('this offset is invalid. must less than max offset:' . $maxOffset); + } + + $data = array( + 'group_id' => $this->groupId, + 'data' => array( + array( + 'topic_name' => $this->topicName, + 'partitions' => array( + array( + 'partition_id' => $this->partitionId, + 'offset' => $offset, + ), + ), + ), + ), + ); + + $topicName = $this->topicName; + $partitionId = $this->partitionId; + + $this->encoder->commitOffsetRequest($data); + $result = $this->decoder->commitOffsetResponse(); + $this->client->freeStream($this->streamKey); + if (!isset($result[$topicName][$partitionId]['errCode'])) { + throw new \Kafka\Exception('commit topic offset failed.'); + } + if ($result[$topicName][$partitionId]['errCode'] != 0) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); + } + } + + // }}} + // {{{ public function getOffset() + + /** + * get consumer offset + * + * @param integer $defaultOffset + * if defaultOffset -1 instead of early offset + * if defaultOffset -2 instead of last offset + * @access public + * @return void + */ + public function getOffset($defaultOffset = self::DEFAULT_LAST) + { + $maxOffset = $this->getProduceOffset(self::LAST_OFFSET); + $minOffset = $this->getProduceOffset(self::EARLIEST_OFFSET); + $data = array( + 'group_id' => $this->groupId, + 'data' => array( + array( + 'topic_name' => $this->topicName, + 'partitions' => array( + array( + 'partition_id' => $this->partitionId, + ), + ), + ), + ), + ); + + $this->encoder->fetchOffsetRequest($data); + $result = $this->decoder->fetchOffsetResponse(); + $this->client->freeStream($this->streamKey); + + $topicName = $this->topicName; + $partitionId = $this->partitionId; + if (!isset($result[$topicName][$partitionId]['errCode'])) { + throw new \Kafka\Exception('fetch topic offset failed.'); + } + if ($result[$topicName][$partitionId]['errCode'] == 3) { + switch ($defaultOffset) { + case self::DEFAULT_LAST: + return $maxOffset; + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default last.", LOG_INFO); + case self::DEFAULT_EARLY: + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default early.", LOG_INFO); + return $minOffset; + default: + $this->setOffset($defaultOffset); + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default $defaultOffset.", LOG_INFO); + return $defaultOffset; + } + if ($defaultOffset) { + $this->setOffset($defaultOffset); + return $defaultOffset; + } + } elseif ($result[$topicName][$partitionId]['errCode'] == 0) { + $offset = $result[$topicName][$partitionId]['offset']; + if ($offset > $maxOffset || $offset < $minOffset) { + if ($defaultOffset == self::DEFAULT_EARLY) { + $offset = $minOffset; + } else { + $offset = $maxOffset; + } + } + Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is $offset.", LOG_INFO); + + return $offset; + } else { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); + } + } + + // }}} + // {{{ public function getProduceOffset() + + /** + * get produce server offset + * + * @param string $topicName + * @param integer $partitionId + * @access public + * @return int + */ + public function getProduceOffset($timeLine = self::LAST_OFFSET) + { + $topicName = $this->topicName; + $partitionId = $this->partitionId; + + $requestData = array( + 'data' => array( + array( + 'topic_name' => $this->topicName, + 'partitions' => array( + array( + 'partition_id' => $this->partitionId, + 'time' => $timeLine, + 'max_offset' => 1, + ), + ), + ), + ), + ); + $this->encoder->offsetRequest($requestData); + $result = $this->decoder->offsetResponse(); + $this->client->freeStream($this->streamKey); + + if (!isset($result[$topicName][$partitionId]['offset'])) { + if (isset($result[$topicName][$partitionId]['errCode'])) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); + } else { + throw new \Kafka\Exception('get offset failed. topic name:' . $this->topicName . ' partitionId: ' . $this->partitionId); + } + } + + return array_shift($result[$topicName][$partitionId]['offset']); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Produce.php b/vendor/nmred/kafka-php/src/Kafka/Produce.php new file mode 100644 index 00000000..2b9e6cd3 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Produce.php @@ -0,0 +1,337 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Produce +{ + // {{{ consts + // }}} + // {{{ members + + /** + * client + * + * @var mixed + * @access private + */ + private $client = null; + + /** + * send message options cache + * + * @var array + * @access private + */ + private $payload = array(); + + /** + * default the server will not send any response + * + * @var float + * @access private + */ + private $requiredAck = 0; + + /** + * default timeout is 100ms + * + * @var float + * @access private + */ + private $timeout = 100; + + /** + * produce instance + * + * @var \Kafka\Produce + * @access private + */ + private static $instance = null; + + /** + * broker host list + * + * @var array + * @access private + */ + private $hostList = array(); + + /** + * save broker connection + * + * @var array + * @access private + */ + private $stream = array(); + + // }}} + // {{{ functions + // {{{ public function static getInstance() + + /** + * set send messages + * + * @access public + * @return void + */ + public static function getInstance($hostList, $timeout, $kafkaHostList = null) + { + if (is_null(self::$instance)) { + self::$instance = new self($hostList, $timeout, $kafkaHostList); + } + + return self::$instance; + } + + // }}} + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($hostList, $timeout = null, $kafkaHostList = null) + { + if ($hostList instanceof \Kafka\ClusterMetaData) { + $metadata = $hostList; + } elseif ( $kafkaHostList !== null ) { + $metadata = new \Kafka\MetaDataFromKafka($kafkaHostList); + } else { + $metadata = new \Kafka\ZooKeeper($hostList, $timeout); + } + $this->client = new \Kafka\Client($metadata); + } + + // }}} + // {{{ public function setMessages() + + /** + * set send messages + * + * @access public + * @return void + */ + public function setMessages($topicName, $partitionId = 0, $messages = array()) + { + if (isset($this->payload[$topicName][$partitionId])) { + $this->payload[$topicName][$partitionId] = + array_merge($this->payload[$topicName][$partitionId], $messages); + } else { + $this->payload[$topicName][$partitionId] = $messages; + } + + return $this; + } + + // }}} + // {{{ public function setRequireAck() + + /** + * set request mode + * This field indicates how many acknowledgements the servers should receive + * before responding to the request. If it is 0 the server will not send any + * response (this is the only case where the server will not reply to a + * request). If it is 1, the server will wait the data is written to the + * local log before sending a response. If it is -1 the server will block + * until the message is committed by all in sync replicas before sending a + * response. For any number > 1 the server will block waiting for this + * number of acknowledgements to occur (but the server will never wait for + * more acknowledgements than there are in-sync replicas). + * + * @param int $ack + * @access public + * @return void + */ + public function setRequireAck($ack = 0) + { + if ($ack >= -1) { + $this->requiredAck = (int) $ack; + } + + return $this; + } + + // }}} + // {{{ public function setTimeOut() + + /** + * set request timeout + * + * @param int $timeout + * @access public + * @return void + */ + public function setTimeOut($timeout = 100) + { + if ((int) $timeout) { + $this->timeout = (int) $timeout; + } + return $this; + } + + // }}} + // {{{ public function send() + + /** + * send message to broker + * + * @access public + * @return void + */ + public function send() + { + $data = $this->_formatPayload(); + if (empty($data)) { + return false; + } + + $responseData = array(); + foreach ($data as $host => $requestData) { + $stream = $this->client->getStream($host); + $conn = $stream['stream']; + $encoder = new \Kafka\Protocol\Encoder($conn); + $encoder->produceRequest($requestData); + if ((int) $this->requiredAck !== 0) { // get broker response + $decoder = new \Kafka\Protocol\Decoder($conn); + $response = $decoder->produceResponse(); + foreach ($response as $topicName => $info) { + if (!isset($responseData[$topicName])) { + $responseData[$topicName] = $info; + } else { + $responseData[$topicName] = array_merge($info, $responseData[$topicName]); + } + } + } + + $this->client->freeStream($stream['key']); + } + + $this->payload = array(); + return $responseData; + } + + // }}} + // {{{ public function getClient() + + /** + * get client object + * + * @access public + * @return void + */ + public function getClient() + { + return $this->client; + } + + /** + * passthru method to client for setting stream options + * + * @access public + * @param array $options + */ + public function setStreamOptions($options = array()) + { + $this->client->setStreamOptions($options); + } + + // }}} + // {{{ public function getAvailablePartitions() + + /** + * get available partition + * + * @access public + * @return array + */ + public function getAvailablePartitions($topicName) + { + $topicDetail = $this->client->getTopicDetail($topicName); + if (is_array($topicDetail) && isset($topicDetail['partitions'])) { + $topicPartitiions = array_keys($topicDetail['partitions']); + } else { + $topicPartitiions = array(); + } + + return $topicPartitiions; + } + + // }}} + // {{{ private function _formatPayload() + + /** + * format payload array + * + * @access private + * @return array + */ + private function _formatPayload() + { + if (empty($this->payload)) { + return array(); + } + + $data = array(); + foreach ($this->payload as $topicName => $partitions) { + foreach ($partitions as $partitionId => $messages) { + $host = $this->client->getHostByPartition($topicName, $partitionId); + $data[$host][$topicName][$partitionId] = $messages; + } + } + + $requestData = array(); + foreach ($data as $host => $info) { + $topicData = array(); + foreach ($info as $topicName => $partitions) { + $partitionData = array(); + foreach ($partitions as $partitionId => $messages) { + $partitionData[] = array( + 'partition_id' => $partitionId, + 'messages' => $messages, + ); + } + $topicData[] = array( + 'topic_name' => $topicName, + 'partitions' => $partitionData, + ); + } + + $requestData[$host] = array( + 'required_ack' => $this->requiredAck, + 'timeout' => $this->timeout, + 'data' => $topicData, + ); + } + + return $requestData; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php new file mode 100644 index 00000000..f1e4b496 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php @@ -0,0 +1,430 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Decoder extends Protocol +{ + // {{{ functions + // {{{ public function produceResponse() + + /** + * decode produce response + * + * @param string $data + * @access public + * @return array + */ + public function produceResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('produce response invalid.'); + } + $data = $this->stream->read($dataLen, true); + + // parse data struct + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $topicCount = array_shift($topicCount); + $offset += 4; + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + 'offset' => $partitionOffset + ); + } + } + + return $result; + } + + // }}} + // {{{ public function fetchResponse() + + /** + * decode fetch response + * + * @param string $data + * @access public + * @return Iterator + */ + public function fetchResponse() + { + return new \Kafka\Protocol\Fetch\Topic($this->stream); + } + + // }}} + // {{{ public function metadataResponse() + + /** + * decode metadata response + * + * @param string $data + * @access public + * @return array + */ + public function metadataResponse() + { + $result = array(); + $broker = array(); + $topic = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('metaData response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0; + for ($i = 0; $i < $brokerCount; $i++) { + $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $nodeId = $nodeId[1]; + $offset += 4; + $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length + $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0; + $offset += 2; + $hostName = substr($data, $offset, $hostNameLen); + $offset += $hostNameLen; + $port = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $broker[$nodeId] = array( + 'host' => $hostName, + 'port' => $port[1], + ); + } + + $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0; + for ($i = 0; $i < $topicMetaCount; $i++) { + $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $topicName = substr($data, $offset, $topicLen[1]); + $offset += $topicLen[1]; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $topic[$topicName]['errCode'] = $topicErrCode[1]; + $partitions = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0; + $offset += 4; + $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0; + $replias = array(); + for ($z = 0; $z < $repliasCount; $z++) { + $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $replias[] = $repliaId[1]; + } + $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0; + $isrs = array(); + for ($z = 0; $z < $isrCount; $z++) { + $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $isrs[] = $isrId[1]; + } + + $partitions[$partitionId] = array( + 'errCode' => $partitionErrCode[1], + 'leader' => $leaderId[1], + 'replicas' => $replias, + 'isr' => $isrs, + ); + } + $topic[$topicName]['partitions'] = $partitions; + } + + $result = array( + 'brokers' => $broker, + 'topics' => $topic, + ); + return $result; + } + + // }}} + // {{{ public function offsetResponse() + + /** + * decode offset response + * + * @param string $data + * @access public + * @return array + */ + public function offsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $offsetCount = array_shift($offsetCount); + $offsetArr = array(); + for ($z = 0; $z < $offsetCount; $z++) { + $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + } + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + 'offset' => $offsetArr + ); + } + } + return $result; + } + + // }}} + // {{{ public function commitOffsetResponse() + + /** + * decode commit offset response + * + * @param string $data + * @access public + * @return array + */ + public function commitOffsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('commit offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + ); + } + } + return $result; + } + + // }}} + // {{{ public function fetchOffsetResponse() + + /** + * decode fetch offset response + * + * @param string $data + * @access public + * @return array + */ + public function fetchOffsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('fetch offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $metaLen = array_shift($metaLen); + $offset += 2; + $metaData = ''; + if ($metaLen) { + $metaData = substr($data, $offset, $metaLen); + $offset += $metaLen; + } + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $result[$topicName][$partitionId[1]] = array( + 'offset' => $partitionOffset, + 'metadata' => $metaData, + 'errCode' => $errCode[1], + ); + } + } + return $result; + } + + // }}} + // {{{ public static function getError() + + /** + * get error + * + * @param integer $errCode + * @static + * @access public + * @return string + */ + public static function getError($errCode) + { + $error = ''; + switch($errCode) { + case 0: + $error = 'No error--it worked!'; + break; + case -1: + $error = 'An unexpected server error'; + break; + case 1: + $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.'; + break; + case 2: + $error = 'This indicates that a message contents does not match its CRC'; + break; + case 3: + $error = 'This request is for a topic or partition that does not exist on this broker.'; + break; + case 4: + $error = 'The message has a negative size'; + break; + case 5: + $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes'; + break; + case 6: + $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.'; + break; + case 7: + $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.'; + break; + case 8: + $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.'; + break; + case 10: + $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.'; + break; + case 11: + $error = 'Internal error code for broker-to-broker communication.'; + break; + case 12: + $error = 'If you specify a string larger than configured maximum for offset metadata'; + break; + case 14: + $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).'; + break; + case 15: + $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.'; + break; + case 16: + $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.'; + break; + default: + $error = 'Unknown error'; + } + + return $error; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php new file mode 100644 index 00000000..7d36e10f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php @@ -0,0 +1,652 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Encoder extends Protocol +{ + // {{{ functions + // {{{ public function produceRequest() + + /** + * produce request + * + * @param array $payloads + * @static + * @access public + * @return void + */ + public function produceRequest($payloads, $compression = self::COMPRESSION_NONE) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given procude data invalid. `data` is undefined.'); + } + + if (!isset($payloads['required_ack'])) { + // default server will not send any response + // (this is the only case where the server will not reply to a request) + $payloads['required_ack'] = 0; + } + + if (!isset($payloads['timeout'])) { + $payloads['timeout'] = 100; // default timeout 100ms + } + + $header = self::requestHeader('kafka-php', 0, self::PRODUCE_REQUEST); + $data = self::pack(self::BIT_B16, $payloads['required_ack']); + $data .= self::pack(self::BIT_B32, $payloads['timeout']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeProcudeTopic'), $compression); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function metadataRequest() + + /** + * build metadata request protocol + * + * @param array $topics + * @access public + * @return string + */ + public function metadataRequest($topics) + { + if (!is_array($topics)) { + $topics = array($topics); + } + + foreach ($topics as $topic) { + if (!is_string($topic)) { + throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. '); + } + } + + $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST); + $data = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchRequest() + + /** + * build fetch request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + if (!isset($payloads['max_wait_time'])) { + $payloads['max_wait_time'] = 100; // default timeout 100ms + } + + if (!isset($payloads['min_bytes'])) { + $payloads['min_bytes'] = 64 * 1024; // 64k + } + + $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::pack(self::BIT_B32, $payloads['max_wait_time']); + $data .= self::pack(self::BIT_B32, $payloads['min_bytes']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function offsetRequest() + + /** + * build offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function offsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function commitOffsetRequest() + + /** + * build consumer commit offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function commitOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchOffsetRequest() + + /** + * build consumer fetch offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public static function encodeString() + + /** + * encode pack string type + * + * @param string $string + * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order. + * @static + * @access public + * @return string + */ + public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE) + { + $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16; + switch ($compression) { + case self::COMPRESSION_NONE: + break; + case self::COMPRESSION_GZIP: + $string = gzencode($string); + break; + case self::COMPRESSION_SNAPPY: + throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented'); + default: + throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression); + } + return self::pack($packLen, strlen($string)) . $string; + } + + // }}} + // {{{ public static function encodeArray() + + /** + * encode key array + * + * @param array $array + * @param Callable $func + * @static + * @access public + * @return string + */ + public static function encodeArray(array $array, $func, $options = null) + { + if (!is_callable($func, false)) { + throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.'); + } + + $arrayCount = count($array); + + $body = ''; + foreach ($array as $value) { + if (!is_null($options)) { + $body .= call_user_func($func, $value, $options); + } else { + $body .= call_user_func($func, $value); + } + } + + return self::pack(self::BIT_B32, $arrayCount) . $body; + } + + // }}} + // {{{ public static function encodeMessageSet() + + /** + * encode message set + * N.B., MessageSets are not preceded by an int32 like other array elements + * in the protocol. + * + * @param array $messages + * @static + * @access public + * @return string + */ + public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE) + { + if (!is_array($messages)) { + $messages = array($messages); + } + + $data = ''; + foreach ($messages as $message) { + $tmpMessage = self::_encodeMessage($message, $compression); + + // int64 -- message offset Message + $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32); + } + return $data; + } + + // }}} + // {{{ public static function requestHeader() + + /** + * get request header + * + * @param string $clientId + * @param integer $correlationId + * @param integer $apiKey + * @static + * @access public + * @return void + */ + public static function requestHeader($clientId, $correlationId, $apiKey) + { + // int16 -- apiKey int16 -- apiVersion int32 correlationId + $binData = self::pack(self::BIT_B16, $apiKey); + $binData .= self::pack(self::BIT_B16, self::API_VERSION); + $binData .= self::pack(self::BIT_B32, $correlationId); + + // concat client id + $binData .= self::encodeString($clientId, self::PACK_INT16); + + return $binData; + } + + // }}} + // {{{ protected static function _encodeMessage() + + /** + * encode signal message + * + * @param string $message + * @static + * @access protected + * @return string + */ + protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE) + { + // int8 -- magic int8 -- attribute + $data = self::pack(self::BIT_B8, self::MESSAGE_MAGIC); + $data .= self::pack(self::BIT_B8, $compression); + + // message key + $data .= self::encodeString('', self::PACK_INT32); + + // message value + $data .= self::encodeString($message, self::PACK_INT32, $compression); + + $crc = crc32($data); + + // int32 -- crc code string data + $message = self::pack(self::BIT_B32, $crc) . $data; + + return $message; + } + + // }}} + // {{{ protected static function _encodeProcudePartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudePartion($values, $compression) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['messages']) || empty($values['messages'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32); + + return $data; + } + + // }}} + // {{{ protected static function _encodeProcudeTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudeTopic($values, $compression) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + $values['offset'] = 0; + } + + if (!isset($values['max_bytes'])) { + $values['max_bytes'] = 100 * 1024 * 1024; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B32, $values['max_bytes']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; // -1 + } + + if (!isset($values['max_offset'])) { + $values['max_offset'] = 100000; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::pack(self::BIT_B32, $values['max_offset']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeOffsetTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeCommitOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; + } + + if (!isset($values['metadata'])) { + $values['metadata'] = 'm'; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::encodeString($values['metadata'], self::PACK_INT16); + + return $data; + } + + // }}} + // {{{ protected static function _encodeCommitOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php new file mode 100644 index 00000000..8424cf78 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php @@ -0,0 +1,119 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class CommitOffset extends HelperAbstract +{ + // {{{ members + + /** + * consumer group + * + * @var string + * @access protected + */ + protected $group = ''; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($client) + { + $this->client = $client; + } + + // }}} + // {{{ public function setGroup() + + /** + * set consumer group + * + * @access public + * @return void + */ + public function setGroup($group) + { + $this->group = $group; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); + $offsetObject->setOffset($offset); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php new file mode 100644 index 00000000..acf0223e --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php @@ -0,0 +1,39 @@ +<?php +namespace Kafka\Protocol\Fetch\Helper; +/** + * Description of Consumer + * + * @author daniel + */ +class Consumer extends HelperAbstract +{ + protected $consumer; + + protected $offsetStrategy; + + + public function __construct(\Kafka\Consumer $consumer) + { + $this->consumer = $consumer; + } + + + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $this->consumer->setFromOffset(true); + $this->consumer->setPartition($topicName, $partitionId, ($offset +1)); + } + + public function onStreamEof($streamKey) + { + + } + + public function onTopicEof($topicName) + { + + } +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php new file mode 100644 index 00000000..bba38dd6 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php @@ -0,0 +1,117 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class FreeStream extends HelperAbstract +{ + // {{{ members + + /** + * streams + * + * @var array + * @access protected + */ + protected $streams = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($client) + { + $this->client = $client; + } + + // }}} + // {{{ public function setStreams() + + /** + * set streams + * + * @access public + * @return void + */ + public function setStreams($streams) + { + $this->streams = $streams; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + if (isset($this->streams[$streamKey])) { + $this->client->freeStream($streamKey); + } + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php new file mode 100644 index 00000000..4ec23926 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php @@ -0,0 +1,160 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Helper +{ + // {{{ members + + /** + * helper object + */ + private static $helpers = array(); + + // }}} + // {{{ functions + // {{{ public staitc function registerHelper() + + /** + * register helper + * + * @param string $key + * @param \Kafka\Protocol\Fetch\Helper\HelperAbstract $helper + * @static + * @access public + * @return void + */ + public static function registerHelper($key, $helper = null) + { + if (is_null($helper)) { + $className = '\\Kafka\\Protocol\\Fetch\\Helper\\' . $key; + if (!class_exists($className)) { + throw new \Kafka\Exception('helper is not exists.'); + } + $helper = new $className(); + } + + if ($helper instanceof \Kafka\Protocol\Fetch\Helper\HelperAbstract) { + self::$helpers[$key] = $helper; + } else { + throw new \Kafka\Exception('this helper not instance of `\Kafka\Protocol\Fetch\Helper\HelperAbstract`'); + } + } + + // }}} + // {{{ public staitc function unRegisterHelper() + + /** + * unregister helper + * + * @param string $key + * @static + * @access public + * @return void + */ + public static function unRegisterHelper($key) + { + if (isset(self::$helpers[$key])) { + unset(self::$helpers[$key]); + } + } + + // }}} + // {{{ public static function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @static + * @access public + * @return void + */ + public static function onStreamEof($streamKey) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onStreamEof')) { + $helper->onStreamEof($streamKey); + } + } + } + + // }}} + // {{{ public static function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @static + * @access public + * @return void + */ + public static function onTopicEof($topicName) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onTopicEof')) { + $helper->onStreamEof($topicName); + } + } + } + + // }}} + // {{{ public static function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @static + * @access public + * @return void + */ + public static function onPartitionEof($partition) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onPartitionEof')) { + $helper->onPartitionEof($partition); + } + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php new file mode 100644 index 00000000..476f3da1 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php @@ -0,0 +1,71 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +abstract class HelperAbstract +{ + // {{{ members + // }}} + // {{{ functions + // {{{ abstract public function onStreamEof() + + /** + * on stream eof + * + * @param string $streamKey + * @access public + * @return void + */ + abstract public function onStreamEof($streamKey); + + // }}} + // {{{ abstract public function onTopicEof() + + /** + * on topic eof + * + * @param string $topicName + * @access public + * @return void + */ + abstract public function onTopicEof($topicName); + + // }}} + // {{{ abstract public function onPartitionEof() + + /** + * on partition eof + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + abstract public function onPartitionEof($partition); + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php new file mode 100644 index 00000000..42d7da1d --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php @@ -0,0 +1,175 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Message +{ + // {{{ members + + /** + * init read bytes + * + * @var float + * @access private + */ + private $initOffset = 0; + + /** + * validByteCount + * + * @var float + * @access private + */ + private $validByteCount = 0; + + /** + * crc32 code + * + * @var float + * @access private + */ + private $crc = 0; + + /** + * This is a version id used to allow backwards compatible evolution of the + * message binary format. + * + * @var float + * @access private + */ + private $magic = 0; + + /** + * The lowest 2 bits contain the compression codec used for the message. The + * other bits should be set to 0. + * + * @var float + * @access private + */ + private $attribute = 0; + + /** + * message key + * + * @var string + * @access private + */ + private $key = ''; + + /** + * message value + * + * @var string + * @access private + */ + private $value = ''; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param string(raw) $msg + * @access public + * @return void + */ + public function __construct($msg) + { + $offset = 0; + $crc = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $offset += 4; + $this->crc = array_shift($crc); + $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->magic = array_shift($magic); + $offset += 1; + $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->attribute = array_shift($attr); + $offset += 1; + $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $keyLen = array_shift($keyLen); + $offset += 4; + if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) { + $this->key = substr($msg, $offset, $keyLen); + $offset += $keyLen; + } + $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $messageSize = array_shift($messageSize); + $offset += 4; + if ($messageSize) { + $this->value = substr($msg, $offset, $messageSize); + } + } + + // }}} + // {{{ public function getMessage() + + /** + * get message data + * + * @access public + * @return string (raw) + */ + public function getMessage() + { + return $this->value; + } + + // }}} + // {{{ public function getMessageKey() + + /** + * get message key + * + * @access public + * @return string (raw) + */ + public function getMessageKey() + { + return $this->key; + } + + // }}} + // {{{ public function __toString() + + /** + * __toString + * + * @access public + * @return void + */ + public function __toString() + { + return $this->value; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php new file mode 100644 index 00000000..50413b67 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php @@ -0,0 +1,269 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class MessageSet implements \Iterator +{ + // {{{ members + + /** + * kafka socket object + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * messageSet size + * + * @var float + * @access private + */ + private $messageSetSize = 0; + + /** + * validByteCount + * + * @var float + * @access private + */ + private $validByteCount = 0; + + /** + * messageSet offset + * + * @var float + * @access private + */ + private $offset = 0; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * partition object + * + * @var \Kafka\Protocol\Fetch\Partition + * @access private + */ + private $partition = null; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @param int $initOffset + * @access public + * @return void + */ + public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array()) + { + $this->stream = $partition->getStream(); + $this->partition = $partition; + $this->context = $context; + $this->messageSetSize = $this->getMessageSetSize(); + \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->validByteCount; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + if (!$this->valid) { + $this->partition->setMessageOffset($this->offset); + + // one partition iterator end + \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition); + } + + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ protected function getMessageSetSize() + + /** + * get message set size + * + * @access protected + * @return integer + */ + protected function getMessageSetSize() + { + // read message size + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $size = array_shift($data); + if ($size <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size'); + } + + return $size; + } + + // }}} + // {{{ public function loadNextMessage() + + /** + * load next message + * + * @access public + * @return void + */ + public function loadNextMessage() + { + if ($this->validByteCount >= $this->messageSetSize) { + return false; + } + + try { + if ($this->validByteCount + 12 > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + $messageSize = $this->stream->read(4, true); + $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize); + $messageSize = array_shift($messageSize); + $this->validByteCount += 12; + if (($this->validByteCount + $messageSize) > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $msg = $this->stream->read($messageSize, true); + $this->current = new Message($msg); + } catch (\Kafka\Exception $e) { + \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO); + return false; + } + + $this->validByteCount += $messageSize; + + return true; + } + + // }}} + // {{{ public function messageOffset() + + /** + * current message offset in producer + * + * @return void + */ + public function messageOffset() + { + return $this->offset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php new file mode 100644 index 00000000..9f8578d5 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php @@ -0,0 +1,375 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Partition implements \Iterator, \Countable +{ + // {{{ members + + /** + * kafka socket object + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * validCount + * + * @var float + * @access private + */ + private $validCount = 0; + + /** + * partitions count + * + * @var float + * @access private + */ + private $partitionCount = false; + + /** + * current topic + * + * @var mixed + * @access private + */ + private $current = null; + + /** + * current iterator key + * partition id + * + * @var string + * @access private + */ + private $key = null; + + /** + * partition errCode + * + * @var float + * @access private + */ + private $errCode = 0; + + /** + * partition offset + * + * @var float + * @access private + */ + private $offset = 0; + + /** + * partition current fetch offset + * + * @var float + * @access private + */ + private $currentOffset = 0; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * cuerrent topic name + * + * @var string + * @access private + */ + private $topicName = ''; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Protocol\Fetch\Topic $topic + * @param int $initOffset + * @access public + * @return void + */ + public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = array()) + { + $this->stream = $topic->getStream(); + $this->topicName = $topic->key(); + $this->context = $context; + $this->partitionCount = $this->getPartitionCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid && $this->validCount <= $this->partitionCount; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->partitionCount; + } + + // }}} + // {{{ public function getErrCode() + + /** + * get partition errcode + * + * @access public + * @return void + */ + public function getErrCode() + { + return $this->errCode; + } + + // }}} + // {{{ public function getHighOffset() + + /** + * get partition high offset + * + * @access public + * @return void + */ + public function getHighOffset() + { + return $this->offset; + } + + // }}} + // {{{ public function getTopicName() + + /** + * get partition topic name + * + * @access public + * @return void + */ + public function getTopicName() + { + return $this->topicName; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + return $this->stream; + } + + // }}} + // {{{ protected function getPartitionCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getPartitionCount() + { + // read topic count + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $count = array_shift($data); + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count'); + } + + return $count; + } + + // }}} + // {{{ public function loadNextPartition() + + /** + * load next partition + * + * @access public + * @return void + */ + public function loadNextPartition() + { + if ($this->validCount >= $this->partitionCount) { + return false; + } + + try { + $partitionId = $this->stream->read(4, true); + $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId); + $partitionId = array_shift($partitionId); + \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO); + + $errCode = $this->stream->read(2, true); + $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode); + $this->errCode = array_shift($errCode); + if ($this->errCode != 0) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode)); + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + + $this->key = $partitionId; + $this->current = new MessageSet($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + return true; + } + + // }}} + // {{{ public function setMessageOffset() + + /** + * set messageSet fetch offset current + * + * @param intger $offset + * @return void + */ + public function setMessageOffset($offset) + { + $this->currentOffset = $offset; + } + + // }}} + // {{{ public function getMessageOffset() + + /** + * get messageSet fetch offset current + * + * @return int + */ + public function getMessageOffset() + { + return $this->currentOffset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php new file mode 100644 index 00000000..500e6b1f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php @@ -0,0 +1,345 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Topic implements \Iterator, \Countable +{ + // {{{ members + + /** + * kafka socket object + * + * @var array + * @access private + */ + private $streams = array(); + + /** + * each topic count + * + * @var array + * @access private + */ + private $topicCounts = array(); + + /** + * current iterator stream + * + * @var mixed + * @access private + */ + private $currentStreamKey = 0; + + /** + * current lock key + * + * @var string + * @access private + */ + private $currentStreamLockKey = ''; + + /** + * currentStreamCount + * + * @var float + * @access private + */ + private $currentStreamCount = 0; + + /** + * validCount + * + * @var float + * @access private + */ + private $validCount = 0; + + /** + * topic count + * + * @var float + * @access private + */ + private $topicCount = false; + + /** + * current topic + * + * @var mixed + * @access private + */ + private $current = null; + + /** + * current iterator key + * topic name + * + * @var string + * @access private + */ + private $key = null; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @param int $initOffset + * @access public + * @return void + */ + public function __construct($streams, $context = array()) + { + if (!is_array($streams)) { + $streams = array($streams); + } + $this->streams = $streams; + $topicInfos = array(); + foreach ($context as $values) { + if (!isset($values['data'])) { + continue; + } + + foreach ($values['data'] as $value) { + if (!isset($value['topic_name']) || !isset($value['partitions'])) { + continue; + } + + $topicName = $value['topic_name']; + foreach ($value['partitions'] as $part) { + $topicInfos[$topicName][$part['partition_id']] = array( + 'offset' => $part['offset'], + ); + } + } + } + $this->context = $topicInfos; + $this->topicCount = $this->getTopicCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->topicCount; + } + + // }}} + // {{{ protected function getTopicCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getTopicCount() + { + $count = 0; + foreach (array_values($this->streams) as $key => $stream) { + // read topic count + $stream->read(8, true); + $data = $stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $topicCount = array_shift($data); + $count += $topicCount; + $this->topicCounts[$key] = $topicCount; + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count'); + } + } + + return $count; + } + + // }}} + // {{{ public function loadNextTopic() + + /** + * load next topic + * + * @access public + * @return void + */ + public function loadNextTopic() + { + if ($this->validCount >= $this->topicCount) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + return false; + } + + if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + $this->currentStreamKey++; + } + + $lockKeys = array_keys($this->streams); + $streams = array_values($this->streams); + if (!isset($streams[$this->currentStreamKey])) { + return false; + } + + $stream = $streams[$this->currentStreamKey]; + $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey]; + + try { + $topicLen = $stream->read(2, true); + $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen); + $topicLen = array_shift($topicLen); + if ($topicLen <= 0) { + return false; + } + + // topic name + $this->key = $stream->read($topicLen, true); + $this->current = new Partition($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + $this->currentStreamCount++; + + return true; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + $streams = array_values($this->streams); + return $streams[$this->currentStreamKey]; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php new file mode 100644 index 00000000..a31067b5 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php @@ -0,0 +1,230 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +abstract class Protocol +{ + // {{{ consts + + /** + * Kafka server protocol version + */ + const API_VERSION = 0; + + /** + * use encode message, This is a version id used to allow backwards + * compatible evolution of the message binary format. + */ + const MESSAGE_MAGIC = 0; + + /** + * message no compression + */ + const COMPRESSION_NONE = 0; + + /** + * Message using gzip compression + */ + const COMPRESSION_GZIP = 1; + + /** + * Message using Snappy compression + */ + const COMPRESSION_SNAPPY = 2; + + /** + * pack int32 type + */ + const PACK_INT32 = 0; + + /** + * pack int16 type + */ + const PACK_INT16 = 1; + + /** + * protocol request code + */ + const PRODUCE_REQUEST = 0; + const FETCH_REQUEST = 1; + const OFFSET_REQUEST = 2; + const METADATA_REQUEST = 3; + const OFFSET_COMMIT_REQUEST = 8; + const OFFSET_FETCH_REQUEST = 9; + const CONSUMER_METADATA_REQUEST = 10; + + // unpack/pack bit + const BIT_B64 = 'N2'; + const BIT_B32 = 'N'; + const BIT_B16 = 'n'; + const BIT_B8 = 'C'; + + // }}} + // {{{ members + + /** + * stream + * + * @var mixed + * @access protected + */ + protected $stream = null; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @access public + * @return void + */ + public function __construct(\Kafka\Socket $stream) + { + $this->stream = $stream; + } + + // }}} + // {{{ public static function Khex2bin() + + /** + * hex to bin + * + * @param string $string + * @static + * @access protected + * @return string (raw) + */ + public static function Khex2bin($string) + { + if (function_exists('\hex2bin')) { + return \hex2bin($string); + } else { + $bin = ''; + $len = strlen($string); + for ($i = 0; $i < $len; $i += 2) { + $bin .= pack('H*', substr($string, $i, 2)); + } + + return $bin; + } + } + + // }}} + // {{{ public static function unpack() + + /** + * Unpack a bit integer as big endian long + * + * @static + * @access public + * @return integer + */ + public static function unpack($type, $bytes) + { + self::checkLen($type, $bytes); + if ($type == self::BIT_B64) { + $set = unpack($type, $bytes); + $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF); + return $original; + } else { + return unpack($type, $bytes); + } + } + + // }}} + // {{{ public static function pack() + + /** + * pack a bit integer as big endian long + * + * @static + * @access public + * @return integer + */ + public static function pack($type, $data) + { + if ($type == self::BIT_B64) { + if ($data == -1) { // -1L + $data = self::Khex2bin('ffffffffffffffff'); + } elseif ($data == -2) { // -2L + $data = self::Khex2bin('fffffffffffffffe'); + } else { + $left = 0xffffffff00000000; + $right = 0x00000000ffffffff; + + $l = ($data & $left) >> 32; + $r = $data & $right; + $data = pack($type, $l, $r); + } + } else { + $data = pack($type, $data); + } + + return $data; + } + + // }}} + // {{{ protected static function checkLen() + + /** + * check unpack bit is valid + * + * @param string $type + * @param string(raw) $bytes + * @static + * @access protected + * @return void + */ + protected static function checkLen($type, $bytes) + { + $len = 0; + switch($type) { + case self::BIT_B64: + $len = 8; + break; + case self::BIT_B32: + $len = 4; + break; + case self::BIT_B16: + $len = 2; + break; + case self::BIT_B8: + $len = 1; + break; + } + + if (strlen($bytes) != $len) { + throw new \Kafka\Exception\Protocol('unpack failed. string(raw) length is ' . strlen($bytes) . ' , TO ' . $type); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Socket.php b/vendor/nmred/kafka-php/src/Kafka/Socket.php new file mode 100644 index 00000000..be7321f3 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Socket.php @@ -0,0 +1,365 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Socket +{ + // {{{ consts + + const READ_MAX_LEN = 5242880; // read socket max length 5MB + + // }}} + // {{{ members + + /** + * Send timeout in seconds. + * + * @var float + * @access private + */ + private $sendTimeoutSec = 0; + + /** + * Send timeout in microseconds. + * + * @var float + * @access private + */ + private $sendTimeoutUsec = 100000; + + /** + * Recv timeout in seconds + * + * @var float + * @access private + */ + private $recvTimeoutSec = 0; + + /** + * Recv timeout in microseconds + * + * @var float + * @access private + */ + private $recvTimeoutUsec = 750000; + + /** + * Stream resource + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * Socket host + * + * @var mixed + * @access private + */ + private $host = null; + + /** + * Socket port + * + * @var mixed + * @access private + */ + private $port = -1; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($host, $port, $recvTimeoutSec = 0, $recvTimeoutUsec = 750000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000) + { + $this->host = $host; + $this->port = $port; + $this->setRecvTimeoutSec($recvTimeoutSec); + $this->setRecvTimeoutUsec($recvTimeoutUsec); + $this->setSendTimeoutSec($sendTimeoutSec); + $this->setSendTimeoutUsec($sendTimeoutUsec); + } + + /** + * @param float $sendTimeoutSec + */ + public function setSendTimeoutSec($sendTimeoutSec) + { + $this->sendTimeoutSec = $sendTimeoutSec; + } + + /** + * @param float $sendTimeoutUsec + */ + public function setSendTimeoutUsec($sendTimeoutUsec) + { + $this->sendTimeoutUsec = $sendTimeoutUsec; + } + + /** + * @param float $recvTimeoutSec + */ + public function setRecvTimeoutSec($recvTimeoutSec) + { + $this->recvTimeoutSec = $recvTimeoutSec; + } + + /** + * @param float $recvTimeoutUsec + */ + public function setRecvTimeoutUsec($recvTimeoutUsec) + { + $this->recvTimeoutUsec = $recvTimeoutUsec; + } + + + + // }}} + // {{{ public static function createFromStream() + + /** + * Optional method to set the internal stream handle + * + * @static + * @access public + * @return void + */ + public static function createFromStream($stream) + { + $socket = new self('localhost', 0); + $socket->setStream($stream); + return $socket; + } + + // }}} + // {{{ public function setStream() + + /** + * Optional method to set the internal stream handle + * + * @param mixed $stream + * @access public + * @return void + */ + public function setStream($stream) + { + $this->stream = $stream; + } + + // }}} + // {{{ public function connect() + + /** + * Connects the socket + * + * @access public + * @return void + */ + public function connect() + { + if (is_resource($this->stream)) { + return false; + } + + if (empty($this->host)) { + throw new \Kafka\Exception('Cannot open null host.'); + } + if ($this->port <= 0) { + throw new \Kafka\Exception('Cannot open without port.'); + } + + $this->stream = @fsockopen( + $this->host, + $this->port, + $errno, + $errstr, + $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000) + ); + + if ($this->stream == false) { + $error = 'Could not connect to ' + . $this->host . ':' . $this->port + . ' ('.$errstr.' ['.$errno.'])'; + throw new \Kafka\Exception\SocketConnect($error); + } + + stream_set_blocking($this->stream, 0); + } + + // }}} + // {{{ public function close() + + /** + * close the socket + * + * @access public + * @return void + */ + public function close() + { + if (is_resource($this->stream)) { + fclose($this->stream); + } + } + + // }}} + // {{{ public function read() + + /** + * Read from the socket at most $len bytes. + * + * This method will not wait for all the requested data, it will return as + * soon as any data is received. + * + * @param integer $len Maximum number of bytes to read. + * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len + * + * @return string Binary data + * @throws Kafka_Exception_Socket + */ + public function read($len, $verifyExactLength = false) + { + if ($len > self::READ_MAX_LEN) { + throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream, length too longer.'); + } + + $null = null; + $read = array($this->stream); + $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); + if ($readable > 0) { + $remainingBytes = $len; + $data = $chunk = ''; + while ($remainingBytes > 0) { + $chunk = fread($this->stream, $remainingBytes); + if ($chunk === false) { + $this->close(); + throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)'); + } + if (strlen($chunk) === 0) { + // Zero bytes because of EOF? + if (feof($this->stream)) { + $this->close(); + throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)'); + } + // Otherwise wait for bytes + $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); + if ($readable !== 1) { + throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go'); + } + continue; // attempt another read + } + $data .= $chunk; + $remainingBytes -= strlen($chunk); + } + if ($len === $remainingBytes || ($verifyExactLength && $len !== strlen($data))) { + // couldn't read anything at all OR reached EOF sooner than expected + $this->close(); + throw new \Kafka\Exception\SocketEOF('Read ' . strlen($data) . ' bytes instead of the requested ' . $len . ' bytes'); + } + + return $data; + } + if (false !== $readable) { + $res = stream_get_meta_data($this->stream); + if (!empty($res['timed_out'])) { + $this->close(); + throw new \Kafka\Exception\SocketTimeout('Timed out reading '.$len.' bytes from stream'); + } + } + $this->close(); + throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (not readable)'); + + } + + // }}} + // {{{ public function write() + + /** + * Write to the socket. + * + * @param string $buf The data to write + * + * @return integer + * @throws Kafka_Exception_Socket + */ + public function write($buf) + { + $null = null; + $write = array($this->stream); + + // fwrite to a socket may be partial, so loop until we + // are done with the entire buffer + $written = 0; + $buflen = strlen($buf); + while ( $written < $buflen ) { + // wait for stream to become available for writing + $writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec); + if ($writable > 0) { + // write remaining buffer bytes to stream + $wrote = fwrite($this->stream, substr($buf, $written)); + if ($wrote === -1 || $wrote === false) { + throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' bytes'); + } + $written += $wrote; + continue; + } + if (false !== $writable) { + $res = stream_get_meta_data($this->stream); + if (!empty($res['timed_out'])) { + throw new \Kafka\Exception\SocketTimeout('Timed out writing ' . strlen($buf) . ' bytes to stream after writing ' . $written . ' bytes'); + } + } + throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream'); + } + return $written; + } + + // }}} + // {{{ public function rewind() + + /** + * Rewind the stream + * + * @return void + */ + public function rewind() + { + if (is_resource($this->stream)) { + rewind($this->stream); + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php b/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php new file mode 100644 index 00000000..f48b5cb9 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php @@ -0,0 +1,364 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class ZooKeeper implements \Kafka\ClusterMetaData +{ + // {{{ consts + + /** + * get all broker + */ + const BROKER_PATH = '/brokers/ids'; + + /** + * get broker detail + */ + const BROKER_DETAIL_PATH = '/brokers/ids/%d'; + + /** + * get topic detail + */ + const TOPIC_PATCH = '/brokers/topics/%s'; + + /** + * get partition state + */ + const PARTITION_STATE = '/brokers/topics/%s/partitions/%d/state'; + + /** + * register consumer + */ + const REG_CONSUMER = '/consumers/%s/ids/%s'; + + /** + * list consumer + */ + const LIST_CONSUMER = '/consumers/%s/ids'; + + /** + * partition owner + */ + const PARTITION_OWNER = '/consumers/%s/owners/%s/%d'; + + // }}} + // {{{ members + + /** + * zookeeper + * + * @var mixed + * @access private + */ + private $zookeeper = null; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($hostList, $timeout = null) + { + if (!is_null($timeout) && is_numeric($timeout)) { + $this->zookeeper = new \ZooKeeper($hostList, null, $timeout); + } else { + $this->zookeeper = new \ZooKeeper($hostList); + } + } + + // }}} + // {{{ public function listBrokers() + + /** + * get broker list using zookeeper + * + * @access public + * @return array + */ + public function listBrokers() + { + $result = array(); + $lists = $this->zookeeper->getChildren(self::BROKER_PATH); + if (!empty($lists)) { + foreach ($lists as $brokerId) { + $brokerDetail = $this->getBrokerDetail($brokerId); + if (!$brokerDetail) { + continue; + } + $result[$brokerId] = $brokerDetail; + } + } + + return $result; + } + + // }}} + // {{{ public function getBrokerDetail() + + /** + * get broker detail + * + * @param integer $brokerId + * @access public + * @return void + */ + public function getBrokerDetail($brokerId) + { + $result = array(); + $path = sprintf(self::BROKER_DETAIL_PATH, (int) $brokerId); + if ($this->zookeeper->exists($path)) { + $result = $this->zookeeper->get($path); + if (!$result) { + return false; + } + + $result = json_decode($result, true); + } + + return $result; + } + + // }}} + // {{{ public function getTopicDetail() + + /** + * get topic detail + * + * @param string $topicName + * @access public + * @return void + */ + public function getTopicDetail($topicName) + { + $result = array(); + $path = sprintf(self::TOPIC_PATCH, (string) $topicName); + if ($this->zookeeper->exists($path)) { + $result = $this->zookeeper->get($path); + if (!$result) { + return false; + } + $result = json_decode($result, true); + } + + return $result; + } + + // }}} + // {{{ public function getPartitionState() + + /** + * get partition state + * + * @param string $topicName + * @param integer $partitionId + * @access public + * @return void + */ + public function getPartitionState($topicName, $partitionId = 0) + { + $result = array(); + $path = sprintf(self::PARTITION_STATE, (string) $topicName, (int) $partitionId); + if ($this->zookeeper->exists($path)) { + $result = $this->zookeeper->get($path); + if (!$result) { + return false; + } + $result = json_decode($result, true); + } + + return $result; + } + + // }}} + // {{{ public function registerConsumer() + + /** + * register consumer + * + * @param string $topicName + * @param integer $partitionId + * @access public + * @return void + */ + public function registerConsumer($groupId, $consumerId, $topics = array()) + { + if (empty($topics)) { + return true; + } + + $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId); + $subData = array(); + foreach ($topics as $topic) { + $subData[$topic] = 1; + } + $data = array( + 'version' => '1', + 'pattern' => 'white_list', + 'subscription' => $subData, + ); + if (!$this->zookeeper->exists($path)) { + $this->makeZkPath($path); + $this->makeZkNode($path, json_encode($data)); + } else { + $this->zookeeper->set($path, json_encode($data)); + } + } + + // }}} + // {{{ public function listConsumer() + + /** + * list consumer + * + * @param string $groupId + * @access public + * @return void + */ + public function listConsumer($groupId) + { + $path = sprintf(self::LIST_CONSUMER, (string) $groupId); + if (!$this->zookeeper->exists($path)) { + return array(); + } else { + return $this->zookeeper->getChildren($path); + } + } + + // }}} + // {{{ public function getConsumersPerTopic() + + /** + * get consumer per topic + * + * @param string $groupId + * @access public + * @return array + */ + public function getConsumersPerTopic($groupId) + { + $consumers = $this->listConsumer($groupId); + if (empty($consumers)) { + return array(); + } + + $topics = array(); + foreach ($consumers as $consumerId) { + $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId); + if (!$this->zookeeper->exists($path)) { + continue; + } + + $info = $this->zookeeper->get($path); + $info = json_decode($info, true); + $subTopic = isset($info['subscription']) ? $info['subscription'] : array(); + foreach ($subTopic as $topic => $num) { + $topics[$topic] = $consumerId; + } + } + + return $topics; + } + + // }}} + // {{{ public function addPartitionOwner() + + /** + * add partition owner + * + * @param string $groupId + * @param string $topicName + * @param integer $partitionId + * @param string $consumerId + * @access public + * @return void + */ + public function addPartitionOwner($groupId, $topicName, $partitionId, $consumerId) + { + $path = sprintf(self::PARTITION_OWNER, (string) $groupId, $topicName, (string) $partitionId); + if (!$this->zookeeper->exists($path)) { + $this->makeZkPath($path); + $this->makeZkNode($path, $consumerId); + } else { + $this->zookeeper->set($path, $consumerId); + } + } + + // }}} + // {{{ protected function makeZkPath() + + /** + * Equivalent of "mkdir -p" on ZooKeeper + * + * @param string $path The path to the node + * @param mixed $value The value to assign to each new node along the path + * + * @return bool + */ + protected function makeZkPath($path, $value = 0) + { + $parts = explode('/', $path); + $parts = array_filter($parts); + $subpath = ''; + while (count($parts) > 1) { + $subpath .= '/' . array_shift($parts); + if (!$this->zookeeper->exists($subpath)) { + $this->makeZkNode($subpath, $value); + } + } + } + + // }}} + // {{{ protected function makeZkNode() + + /** + * Create a node on ZooKeeper at the given path + * + * @param string $path The path to the node + * @param mixed $value The value to assign to the new node + * + * @return bool + */ + protected function makeZkNode($path, $value) + { + $params = array( + array( + 'perms' => \Zookeeper::PERM_ALL, + 'scheme' => 'world', + 'id' => 'anyone', + ) + ); + return $this->zookeeper->create($path, $value, $params); + } + + // }}} + // }}} +} |