diff options
author | Pierre Schmitz <pierre@archlinux.de> | 2015-12-20 09:00:55 +0100 |
---|---|---|
committer | Pierre Schmitz <pierre@archlinux.de> | 2015-12-20 09:00:55 +0100 |
commit | a2190ac74dd4d7080b12bab90e552d7aa81209ef (patch) | |
tree | 8b31f38de9882d18df54cf8d9e0de74167a094eb /vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch | |
parent | 15e69f7b20b6596b9148030acce5b59993b95a45 (diff) | |
parent | 257401d8b2cf661adf36c84b0e3fd1cf85e33c22 (diff) |
Merge branch 'mw-1.26'
Diffstat (limited to 'vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch')
9 files changed, 1670 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php new file mode 100644 index 00000000..8424cf78 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php @@ -0,0 +1,119 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class CommitOffset extends HelperAbstract +{ + // {{{ members + + /** + * consumer group + * + * @var string + * @access protected + */ + protected $group = ''; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($client) + { + $this->client = $client; + } + + // }}} + // {{{ public function setGroup() + + /** + * set consumer group + * + * @access public + * @return void + */ + public function setGroup($group) + { + $this->group = $group; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); + $offsetObject->setOffset($offset); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php new file mode 100644 index 00000000..acf0223e --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php @@ -0,0 +1,39 @@ +<?php +namespace Kafka\Protocol\Fetch\Helper; +/** + * Description of Consumer + * + * @author daniel + */ +class Consumer extends HelperAbstract +{ + protected $consumer; + + protected $offsetStrategy; + + + public function __construct(\Kafka\Consumer $consumer) + { + $this->consumer = $consumer; + } + + + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $this->consumer->setFromOffset(true); + $this->consumer->setPartition($topicName, $partitionId, ($offset +1)); + } + + public function onStreamEof($streamKey) + { + + } + + public function onTopicEof($topicName) + { + + } +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php new file mode 100644 index 00000000..bba38dd6 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php @@ -0,0 +1,117 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class FreeStream extends HelperAbstract +{ + // {{{ members + + /** + * streams + * + * @var array + * @access protected + */ + protected $streams = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($client) + { + $this->client = $client; + } + + // }}} + // {{{ public function setStreams() + + /** + * set streams + * + * @access public + * @return void + */ + public function setStreams($streams) + { + $this->streams = $streams; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + if (isset($this->streams[$streamKey])) { + $this->client->freeStream($streamKey); + } + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php new file mode 100644 index 00000000..4ec23926 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php @@ -0,0 +1,160 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Helper +{ + // {{{ members + + /** + * helper object + */ + private static $helpers = array(); + + // }}} + // {{{ functions + // {{{ public staitc function registerHelper() + + /** + * register helper + * + * @param string $key + * @param \Kafka\Protocol\Fetch\Helper\HelperAbstract $helper + * @static + * @access public + * @return void + */ + public static function registerHelper($key, $helper = null) + { + if (is_null($helper)) { + $className = '\\Kafka\\Protocol\\Fetch\\Helper\\' . $key; + if (!class_exists($className)) { + throw new \Kafka\Exception('helper is not exists.'); + } + $helper = new $className(); + } + + if ($helper instanceof \Kafka\Protocol\Fetch\Helper\HelperAbstract) { + self::$helpers[$key] = $helper; + } else { + throw new \Kafka\Exception('this helper not instance of `\Kafka\Protocol\Fetch\Helper\HelperAbstract`'); + } + } + + // }}} + // {{{ public staitc function unRegisterHelper() + + /** + * unregister helper + * + * @param string $key + * @static + * @access public + * @return void + */ + public static function unRegisterHelper($key) + { + if (isset(self::$helpers[$key])) { + unset(self::$helpers[$key]); + } + } + + // }}} + // {{{ public static function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @static + * @access public + * @return void + */ + public static function onStreamEof($streamKey) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onStreamEof')) { + $helper->onStreamEof($streamKey); + } + } + } + + // }}} + // {{{ public static function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @static + * @access public + * @return void + */ + public static function onTopicEof($topicName) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onTopicEof')) { + $helper->onStreamEof($topicName); + } + } + } + + // }}} + // {{{ public static function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @static + * @access public + * @return void + */ + public static function onPartitionEof($partition) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onPartitionEof')) { + $helper->onPartitionEof($partition); + } + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php new file mode 100644 index 00000000..476f3da1 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php @@ -0,0 +1,71 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +abstract class HelperAbstract +{ + // {{{ members + // }}} + // {{{ functions + // {{{ abstract public function onStreamEof() + + /** + * on stream eof + * + * @param string $streamKey + * @access public + * @return void + */ + abstract public function onStreamEof($streamKey); + + // }}} + // {{{ abstract public function onTopicEof() + + /** + * on topic eof + * + * @param string $topicName + * @access public + * @return void + */ + abstract public function onTopicEof($topicName); + + // }}} + // {{{ abstract public function onPartitionEof() + + /** + * on partition eof + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + abstract public function onPartitionEof($partition); + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php new file mode 100644 index 00000000..42d7da1d --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php @@ -0,0 +1,175 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Message +{ + // {{{ members + + /** + * init read bytes + * + * @var float + * @access private + */ + private $initOffset = 0; + + /** + * validByteCount + * + * @var float + * @access private + */ + private $validByteCount = 0; + + /** + * crc32 code + * + * @var float + * @access private + */ + private $crc = 0; + + /** + * This is a version id used to allow backwards compatible evolution of the + * message binary format. + * + * @var float + * @access private + */ + private $magic = 0; + + /** + * The lowest 2 bits contain the compression codec used for the message. The + * other bits should be set to 0. + * + * @var float + * @access private + */ + private $attribute = 0; + + /** + * message key + * + * @var string + * @access private + */ + private $key = ''; + + /** + * message value + * + * @var string + * @access private + */ + private $value = ''; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param string(raw) $msg + * @access public + * @return void + */ + public function __construct($msg) + { + $offset = 0; + $crc = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $offset += 4; + $this->crc = array_shift($crc); + $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->magic = array_shift($magic); + $offset += 1; + $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->attribute = array_shift($attr); + $offset += 1; + $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $keyLen = array_shift($keyLen); + $offset += 4; + if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) { + $this->key = substr($msg, $offset, $keyLen); + $offset += $keyLen; + } + $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $messageSize = array_shift($messageSize); + $offset += 4; + if ($messageSize) { + $this->value = substr($msg, $offset, $messageSize); + } + } + + // }}} + // {{{ public function getMessage() + + /** + * get message data + * + * @access public + * @return string (raw) + */ + public function getMessage() + { + return $this->value; + } + + // }}} + // {{{ public function getMessageKey() + + /** + * get message key + * + * @access public + * @return string (raw) + */ + public function getMessageKey() + { + return $this->key; + } + + // }}} + // {{{ public function __toString() + + /** + * __toString + * + * @access public + * @return void + */ + public function __toString() + { + return $this->value; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php new file mode 100644 index 00000000..50413b67 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php @@ -0,0 +1,269 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class MessageSet implements \Iterator +{ + // {{{ members + + /** + * kafka socket object + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * messageSet size + * + * @var float + * @access private + */ + private $messageSetSize = 0; + + /** + * validByteCount + * + * @var float + * @access private + */ + private $validByteCount = 0; + + /** + * messageSet offset + * + * @var float + * @access private + */ + private $offset = 0; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * partition object + * + * @var \Kafka\Protocol\Fetch\Partition + * @access private + */ + private $partition = null; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @param int $initOffset + * @access public + * @return void + */ + public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array()) + { + $this->stream = $partition->getStream(); + $this->partition = $partition; + $this->context = $context; + $this->messageSetSize = $this->getMessageSetSize(); + \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->validByteCount; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + if (!$this->valid) { + $this->partition->setMessageOffset($this->offset); + + // one partition iterator end + \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition); + } + + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ protected function getMessageSetSize() + + /** + * get message set size + * + * @access protected + * @return integer + */ + protected function getMessageSetSize() + { + // read message size + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $size = array_shift($data); + if ($size <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size'); + } + + return $size; + } + + // }}} + // {{{ public function loadNextMessage() + + /** + * load next message + * + * @access public + * @return void + */ + public function loadNextMessage() + { + if ($this->validByteCount >= $this->messageSetSize) { + return false; + } + + try { + if ($this->validByteCount + 12 > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + $messageSize = $this->stream->read(4, true); + $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize); + $messageSize = array_shift($messageSize); + $this->validByteCount += 12; + if (($this->validByteCount + $messageSize) > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $msg = $this->stream->read($messageSize, true); + $this->current = new Message($msg); + } catch (\Kafka\Exception $e) { + \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO); + return false; + } + + $this->validByteCount += $messageSize; + + return true; + } + + // }}} + // {{{ public function messageOffset() + + /** + * current message offset in producer + * + * @return void + */ + public function messageOffset() + { + return $this->offset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php new file mode 100644 index 00000000..9f8578d5 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php @@ -0,0 +1,375 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Partition implements \Iterator, \Countable +{ + // {{{ members + + /** + * kafka socket object + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * validCount + * + * @var float + * @access private + */ + private $validCount = 0; + + /** + * partitions count + * + * @var float + * @access private + */ + private $partitionCount = false; + + /** + * current topic + * + * @var mixed + * @access private + */ + private $current = null; + + /** + * current iterator key + * partition id + * + * @var string + * @access private + */ + private $key = null; + + /** + * partition errCode + * + * @var float + * @access private + */ + private $errCode = 0; + + /** + * partition offset + * + * @var float + * @access private + */ + private $offset = 0; + + /** + * partition current fetch offset + * + * @var float + * @access private + */ + private $currentOffset = 0; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * cuerrent topic name + * + * @var string + * @access private + */ + private $topicName = ''; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Protocol\Fetch\Topic $topic + * @param int $initOffset + * @access public + * @return void + */ + public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = array()) + { + $this->stream = $topic->getStream(); + $this->topicName = $topic->key(); + $this->context = $context; + $this->partitionCount = $this->getPartitionCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid && $this->validCount <= $this->partitionCount; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->partitionCount; + } + + // }}} + // {{{ public function getErrCode() + + /** + * get partition errcode + * + * @access public + * @return void + */ + public function getErrCode() + { + return $this->errCode; + } + + // }}} + // {{{ public function getHighOffset() + + /** + * get partition high offset + * + * @access public + * @return void + */ + public function getHighOffset() + { + return $this->offset; + } + + // }}} + // {{{ public function getTopicName() + + /** + * get partition topic name + * + * @access public + * @return void + */ + public function getTopicName() + { + return $this->topicName; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + return $this->stream; + } + + // }}} + // {{{ protected function getPartitionCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getPartitionCount() + { + // read topic count + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $count = array_shift($data); + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count'); + } + + return $count; + } + + // }}} + // {{{ public function loadNextPartition() + + /** + * load next partition + * + * @access public + * @return void + */ + public function loadNextPartition() + { + if ($this->validCount >= $this->partitionCount) { + return false; + } + + try { + $partitionId = $this->stream->read(4, true); + $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId); + $partitionId = array_shift($partitionId); + \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO); + + $errCode = $this->stream->read(2, true); + $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode); + $this->errCode = array_shift($errCode); + if ($this->errCode != 0) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode)); + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + + $this->key = $partitionId; + $this->current = new MessageSet($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + return true; + } + + // }}} + // {{{ public function setMessageOffset() + + /** + * set messageSet fetch offset current + * + * @param intger $offset + * @return void + */ + public function setMessageOffset($offset) + { + $this->currentOffset = $offset; + } + + // }}} + // {{{ public function getMessageOffset() + + /** + * get messageSet fetch offset current + * + * @return int + */ + public function getMessageOffset() + { + return $this->currentOffset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php new file mode 100644 index 00000000..500e6b1f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php @@ -0,0 +1,345 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Topic implements \Iterator, \Countable +{ + // {{{ members + + /** + * kafka socket object + * + * @var array + * @access private + */ + private $streams = array(); + + /** + * each topic count + * + * @var array + * @access private + */ + private $topicCounts = array(); + + /** + * current iterator stream + * + * @var mixed + * @access private + */ + private $currentStreamKey = 0; + + /** + * current lock key + * + * @var string + * @access private + */ + private $currentStreamLockKey = ''; + + /** + * currentStreamCount + * + * @var float + * @access private + */ + private $currentStreamCount = 0; + + /** + * validCount + * + * @var float + * @access private + */ + private $validCount = 0; + + /** + * topic count + * + * @var float + * @access private + */ + private $topicCount = false; + + /** + * current topic + * + * @var mixed + * @access private + */ + private $current = null; + + /** + * current iterator key + * topic name + * + * @var string + * @access private + */ + private $key = null; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @param int $initOffset + * @access public + * @return void + */ + public function __construct($streams, $context = array()) + { + if (!is_array($streams)) { + $streams = array($streams); + } + $this->streams = $streams; + $topicInfos = array(); + foreach ($context as $values) { + if (!isset($values['data'])) { + continue; + } + + foreach ($values['data'] as $value) { + if (!isset($value['topic_name']) || !isset($value['partitions'])) { + continue; + } + + $topicName = $value['topic_name']; + foreach ($value['partitions'] as $part) { + $topicInfos[$topicName][$part['partition_id']] = array( + 'offset' => $part['offset'], + ); + } + } + } + $this->context = $topicInfos; + $this->topicCount = $this->getTopicCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->topicCount; + } + + // }}} + // {{{ protected function getTopicCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getTopicCount() + { + $count = 0; + foreach (array_values($this->streams) as $key => $stream) { + // read topic count + $stream->read(8, true); + $data = $stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $topicCount = array_shift($data); + $count += $topicCount; + $this->topicCounts[$key] = $topicCount; + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count'); + } + } + + return $count; + } + + // }}} + // {{{ public function loadNextTopic() + + /** + * load next topic + * + * @access public + * @return void + */ + public function loadNextTopic() + { + if ($this->validCount >= $this->topicCount) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + return false; + } + + if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + $this->currentStreamKey++; + } + + $lockKeys = array_keys($this->streams); + $streams = array_values($this->streams); + if (!isset($streams[$this->currentStreamKey])) { + return false; + } + + $stream = $streams[$this->currentStreamKey]; + $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey]; + + try { + $topicLen = $stream->read(2, true); + $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen); + $topicLen = array_shift($topicLen); + if ($topicLen <= 0) { + return false; + } + + // topic name + $this->key = $stream->read($topicLen, true); + $this->current = new Partition($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + $this->currentStreamCount++; + + return true; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + $streams = array_values($this->streams); + return $streams[$this->currentStreamKey]; + } + + // }}} + // }}} +} |