summaryrefslogtreecommitdiff
path: root/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch
diff options
context:
space:
mode:
authorPierre Schmitz <pierre@archlinux.de>2015-12-20 09:00:55 +0100
committerPierre Schmitz <pierre@archlinux.de>2015-12-20 09:00:55 +0100
commita2190ac74dd4d7080b12bab90e552d7aa81209ef (patch)
tree8b31f38de9882d18df54cf8d9e0de74167a094eb /vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch
parent15e69f7b20b6596b9148030acce5b59993b95a45 (diff)
parent257401d8b2cf661adf36c84b0e3fd1cf85e33c22 (diff)
Merge branch 'mw-1.26'
Diffstat (limited to 'vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch')
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php119
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php39
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php117
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php160
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php71
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php175
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php269
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php375
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php345
9 files changed, 1670 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
new file mode 100644
index 00000000..8424cf78
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
@@ -0,0 +1,119 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class CommitOffset extends HelperAbstract
+{
+ // {{{ members
+
+ /**
+ * consumer group
+ *
+ * @var string
+ * @access protected
+ */
+ protected $group = '';
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($client)
+ {
+ $this->client = $client;
+ }
+
+ // }}}
+ // {{{ public function setGroup()
+
+ /**
+ * set consumer group
+ *
+ * @access public
+ * @return void
+ */
+ public function setGroup($group)
+ {
+ $this->group = $group;
+ }
+
+ // }}}
+ // {{{ public function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ public function onStreamEof($streamKey)
+ {
+ }
+
+ // }}}
+ // {{{ public function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ public function onTopicEof($topicName)
+ {
+ }
+
+ // }}}
+ // {{{ public function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ public function onPartitionEof($partition)
+ {
+ $partitionId = $partition->key();
+ $topicName = $partition->getTopicName();
+ $offset = $partition->getMessageOffset();
+ $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId);
+ $offsetObject->setOffset($offset);
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
new file mode 100644
index 00000000..acf0223e
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
@@ -0,0 +1,39 @@
+<?php
+namespace Kafka\Protocol\Fetch\Helper;
+/**
+ * Description of Consumer
+ *
+ * @author daniel
+ */
+class Consumer extends HelperAbstract
+{
+ protected $consumer;
+
+ protected $offsetStrategy;
+
+
+ public function __construct(\Kafka\Consumer $consumer)
+ {
+ $this->consumer = $consumer;
+ }
+
+
+ public function onPartitionEof($partition)
+ {
+ $partitionId = $partition->key();
+ $topicName = $partition->getTopicName();
+ $offset = $partition->getMessageOffset();
+ $this->consumer->setFromOffset(true);
+ $this->consumer->setPartition($topicName, $partitionId, ($offset +1));
+ }
+
+ public function onStreamEof($streamKey)
+ {
+
+ }
+
+ public function onTopicEof($topicName)
+ {
+
+ }
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
new file mode 100644
index 00000000..bba38dd6
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
@@ -0,0 +1,117 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class FreeStream extends HelperAbstract
+{
+ // {{{ members
+
+ /**
+ * streams
+ *
+ * @var array
+ * @access protected
+ */
+ protected $streams = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($client)
+ {
+ $this->client = $client;
+ }
+
+ // }}}
+ // {{{ public function setStreams()
+
+ /**
+ * set streams
+ *
+ * @access public
+ * @return void
+ */
+ public function setStreams($streams)
+ {
+ $this->streams = $streams;
+ }
+
+ // }}}
+ // {{{ public function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ public function onStreamEof($streamKey)
+ {
+ if (isset($this->streams[$streamKey])) {
+ $this->client->freeStream($streamKey);
+ }
+ }
+
+ // }}}
+ // {{{ public function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ public function onTopicEof($topicName)
+ {
+ }
+
+ // }}}
+ // {{{ public function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ public function onPartitionEof($partition)
+ {
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
new file mode 100644
index 00000000..4ec23926
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
@@ -0,0 +1,160 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Helper
+{
+ // {{{ members
+
+ /**
+ * helper object
+ */
+ private static $helpers = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public staitc function registerHelper()
+
+ /**
+ * register helper
+ *
+ * @param string $key
+ * @param \Kafka\Protocol\Fetch\Helper\HelperAbstract $helper
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function registerHelper($key, $helper = null)
+ {
+ if (is_null($helper)) {
+ $className = '\\Kafka\\Protocol\\Fetch\\Helper\\' . $key;
+ if (!class_exists($className)) {
+ throw new \Kafka\Exception('helper is not exists.');
+ }
+ $helper = new $className();
+ }
+
+ if ($helper instanceof \Kafka\Protocol\Fetch\Helper\HelperAbstract) {
+ self::$helpers[$key] = $helper;
+ } else {
+ throw new \Kafka\Exception('this helper not instance of `\Kafka\Protocol\Fetch\Helper\HelperAbstract`');
+ }
+ }
+
+ // }}}
+ // {{{ public staitc function unRegisterHelper()
+
+ /**
+ * unregister helper
+ *
+ * @param string $key
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function unRegisterHelper($key)
+ {
+ if (isset(self::$helpers[$key])) {
+ unset(self::$helpers[$key]);
+ }
+ }
+
+ // }}}
+ // {{{ public static function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onStreamEof($streamKey)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onStreamEof')) {
+ $helper->onStreamEof($streamKey);
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public static function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onTopicEof($topicName)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onTopicEof')) {
+ $helper->onStreamEof($topicName);
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public static function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onPartitionEof($partition)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onPartitionEof')) {
+ $helper->onPartitionEof($partition);
+ }
+ }
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php
new file mode 100644
index 00000000..476f3da1
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php
@@ -0,0 +1,71 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+abstract class HelperAbstract
+{
+ // {{{ members
+ // }}}
+ // {{{ functions
+ // {{{ abstract public function onStreamEof()
+
+ /**
+ * on stream eof
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ abstract public function onStreamEof($streamKey);
+
+ // }}}
+ // {{{ abstract public function onTopicEof()
+
+ /**
+ * on topic eof
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ abstract public function onTopicEof($topicName);
+
+ // }}}
+ // {{{ abstract public function onPartitionEof()
+
+ /**
+ * on partition eof
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ abstract public function onPartitionEof($partition);
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
new file mode 100644
index 00000000..42d7da1d
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
@@ -0,0 +1,175 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Message
+{
+ // {{{ members
+
+ /**
+ * init read bytes
+ *
+ * @var float
+ * @access private
+ */
+ private $initOffset = 0;
+
+ /**
+ * validByteCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validByteCount = 0;
+
+ /**
+ * crc32 code
+ *
+ * @var float
+ * @access private
+ */
+ private $crc = 0;
+
+ /**
+ * This is a version id used to allow backwards compatible evolution of the
+ * message binary format.
+ *
+ * @var float
+ * @access private
+ */
+ private $magic = 0;
+
+ /**
+ * The lowest 2 bits contain the compression codec used for the message. The
+ * other bits should be set to 0.
+ *
+ * @var float
+ * @access private
+ */
+ private $attribute = 0;
+
+ /**
+ * message key
+ *
+ * @var string
+ * @access private
+ */
+ private $key = '';
+
+ /**
+ * message value
+ *
+ * @var string
+ * @access private
+ */
+ private $value = '';
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param string(raw) $msg
+ * @access public
+ * @return void
+ */
+ public function __construct($msg)
+ {
+ $offset = 0;
+ $crc = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $offset += 4;
+ $this->crc = array_shift($crc);
+ $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
+ $this->magic = array_shift($magic);
+ $offset += 1;
+ $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
+ $this->attribute = array_shift($attr);
+ $offset += 1;
+ $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $keyLen = array_shift($keyLen);
+ $offset += 4;
+ if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) {
+ $this->key = substr($msg, $offset, $keyLen);
+ $offset += $keyLen;
+ }
+ $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $messageSize = array_shift($messageSize);
+ $offset += 4;
+ if ($messageSize) {
+ $this->value = substr($msg, $offset, $messageSize);
+ }
+ }
+
+ // }}}
+ // {{{ public function getMessage()
+
+ /**
+ * get message data
+ *
+ * @access public
+ * @return string (raw)
+ */
+ public function getMessage()
+ {
+ return $this->value;
+ }
+
+ // }}}
+ // {{{ public function getMessageKey()
+
+ /**
+ * get message key
+ *
+ * @access public
+ * @return string (raw)
+ */
+ public function getMessageKey()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function __toString()
+
+ /**
+ * __toString
+ *
+ * @access public
+ * @return void
+ */
+ public function __toString()
+ {
+ return $this->value;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
new file mode 100644
index 00000000..50413b67
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
@@ -0,0 +1,269 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class MessageSet implements \Iterator
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var mixed
+ * @access private
+ */
+ private $stream = null;
+
+ /**
+ * messageSet size
+ *
+ * @var float
+ * @access private
+ */
+ private $messageSetSize = 0;
+
+ /**
+ * validByteCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validByteCount = 0;
+
+ /**
+ * messageSet offset
+ *
+ * @var float
+ * @access private
+ */
+ private $offset = 0;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * partition object
+ *
+ * @var \Kafka\Protocol\Fetch\Partition
+ * @access private
+ */
+ private $partition = null;
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array())
+ {
+ $this->stream = $partition->getStream();
+ $this->partition = $partition;
+ $this->context = $context;
+ $this->messageSetSize = $this->getMessageSetSize();
+ \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO);
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->validByteCount;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextMessage();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ if (!$this->valid) {
+ $this->partition->setMessageOffset($this->offset);
+
+ // one partition iterator end
+ \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition);
+ }
+
+ return $this->valid;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextMessage();
+ }
+
+ // }}}
+ // {{{ protected function getMessageSetSize()
+
+ /**
+ * get message set size
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getMessageSetSize()
+ {
+ // read message size
+ $data = $this->stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $size = array_shift($data);
+ if ($size <= 0) {
+ throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size');
+ }
+
+ return $size;
+ }
+
+ // }}}
+ // {{{ public function loadNextMessage()
+
+ /**
+ * load next message
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextMessage()
+ {
+ if ($this->validByteCount >= $this->messageSetSize) {
+ return false;
+ }
+
+ try {
+ if ($this->validByteCount + 12 > $this->messageSetSize) {
+ // read socket buffer dirty data
+ $this->stream->read($this->messageSetSize - $this->validByteCount);
+ return false;
+ }
+ $offset = $this->stream->read(8, true);
+ $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
+ $messageSize = $this->stream->read(4, true);
+ $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize);
+ $messageSize = array_shift($messageSize);
+ $this->validByteCount += 12;
+ if (($this->validByteCount + $messageSize) > $this->messageSetSize) {
+ // read socket buffer dirty data
+ $this->stream->read($this->messageSetSize - $this->validByteCount);
+ return false;
+ }
+ $msg = $this->stream->read($messageSize, true);
+ $this->current = new Message($msg);
+ } catch (\Kafka\Exception $e) {
+ \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO);
+ return false;
+ }
+
+ $this->validByteCount += $messageSize;
+
+ return true;
+ }
+
+ // }}}
+ // {{{ public function messageOffset()
+
+ /**
+ * current message offset in producer
+ *
+ * @return void
+ */
+ public function messageOffset()
+ {
+ return $this->offset;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
new file mode 100644
index 00000000..9f8578d5
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
@@ -0,0 +1,375 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Partition implements \Iterator, \Countable
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var mixed
+ * @access private
+ */
+ private $stream = null;
+
+ /**
+ * validCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validCount = 0;
+
+ /**
+ * partitions count
+ *
+ * @var float
+ * @access private
+ */
+ private $partitionCount = false;
+
+ /**
+ * current topic
+ *
+ * @var mixed
+ * @access private
+ */
+ private $current = null;
+
+ /**
+ * current iterator key
+ * partition id
+ *
+ * @var string
+ * @access private
+ */
+ private $key = null;
+
+ /**
+ * partition errCode
+ *
+ * @var float
+ * @access private
+ */
+ private $errCode = 0;
+
+ /**
+ * partition offset
+ *
+ * @var float
+ * @access private
+ */
+ private $offset = 0;
+
+ /**
+ * partition current fetch offset
+ *
+ * @var float
+ * @access private
+ */
+ private $currentOffset = 0;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * cuerrent topic name
+ *
+ * @var string
+ * @access private
+ */
+ private $topicName = '';
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Protocol\Fetch\Topic $topic
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = array())
+ {
+ $this->stream = $topic->getStream();
+ $this->topicName = $topic->key();
+ $this->context = $context;
+ $this->partitionCount = $this->getPartitionCount();
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextPartition();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ return $this->valid && $this->validCount <= $this->partitionCount;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextPartition();
+ }
+
+ // }}}
+ // {{{ public function count()
+
+ /**
+ * implements Countable function
+ *
+ * @access public
+ * @return integer
+ */
+ public function count()
+ {
+ return $this->partitionCount;
+ }
+
+ // }}}
+ // {{{ public function getErrCode()
+
+ /**
+ * get partition errcode
+ *
+ * @access public
+ * @return void
+ */
+ public function getErrCode()
+ {
+ return $this->errCode;
+ }
+
+ // }}}
+ // {{{ public function getHighOffset()
+
+ /**
+ * get partition high offset
+ *
+ * @access public
+ * @return void
+ */
+ public function getHighOffset()
+ {
+ return $this->offset;
+ }
+
+ // }}}
+ // {{{ public function getTopicName()
+
+ /**
+ * get partition topic name
+ *
+ * @access public
+ * @return void
+ */
+ public function getTopicName()
+ {
+ return $this->topicName;
+ }
+
+ // }}}
+ // {{{ public function getStream()
+
+ /**
+ * get current stream
+ *
+ * @access public
+ * @return \Kafka\Socket
+ */
+ public function getStream()
+ {
+ return $this->stream;
+ }
+
+ // }}}
+ // {{{ protected function getPartitionCount()
+
+ /**
+ * get message size
+ * only use to object init
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getPartitionCount()
+ {
+ // read topic count
+ $data = $this->stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $count = array_shift($data);
+ if ($count <= 0) {
+ throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count');
+ }
+
+ return $count;
+ }
+
+ // }}}
+ // {{{ public function loadNextPartition()
+
+ /**
+ * load next partition
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextPartition()
+ {
+ if ($this->validCount >= $this->partitionCount) {
+ return false;
+ }
+
+ try {
+ $partitionId = $this->stream->read(4, true);
+ $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId);
+ $partitionId = array_shift($partitionId);
+ \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO);
+
+ $errCode = $this->stream->read(2, true);
+ $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode);
+ $this->errCode = array_shift($errCode);
+ if ($this->errCode != 0) {
+ throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode));
+ }
+ $offset = $this->stream->read(8, true);
+ $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
+
+ $this->key = $partitionId;
+ $this->current = new MessageSet($this, $this->context);
+ } catch (\Kafka\Exception $e) {
+ return false;
+ }
+
+ $this->validCount++;
+ return true;
+ }
+
+ // }}}
+ // {{{ public function setMessageOffset()
+
+ /**
+ * set messageSet fetch offset current
+ *
+ * @param intger $offset
+ * @return void
+ */
+ public function setMessageOffset($offset)
+ {
+ $this->currentOffset = $offset;
+ }
+
+ // }}}
+ // {{{ public function getMessageOffset()
+
+ /**
+ * get messageSet fetch offset current
+ *
+ * @return int
+ */
+ public function getMessageOffset()
+ {
+ return $this->currentOffset;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
new file mode 100644
index 00000000..500e6b1f
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
@@ -0,0 +1,345 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Topic implements \Iterator, \Countable
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var array
+ * @access private
+ */
+ private $streams = array();
+
+ /**
+ * each topic count
+ *
+ * @var array
+ * @access private
+ */
+ private $topicCounts = array();
+
+ /**
+ * current iterator stream
+ *
+ * @var mixed
+ * @access private
+ */
+ private $currentStreamKey = 0;
+
+ /**
+ * current lock key
+ *
+ * @var string
+ * @access private
+ */
+ private $currentStreamLockKey = '';
+
+ /**
+ * currentStreamCount
+ *
+ * @var float
+ * @access private
+ */
+ private $currentStreamCount = 0;
+
+ /**
+ * validCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validCount = 0;
+
+ /**
+ * topic count
+ *
+ * @var float
+ * @access private
+ */
+ private $topicCount = false;
+
+ /**
+ * current topic
+ *
+ * @var mixed
+ * @access private
+ */
+ private $current = null;
+
+ /**
+ * current iterator key
+ * topic name
+ *
+ * @var string
+ * @access private
+ */
+ private $key = null;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct($streams, $context = array())
+ {
+ if (!is_array($streams)) {
+ $streams = array($streams);
+ }
+ $this->streams = $streams;
+ $topicInfos = array();
+ foreach ($context as $values) {
+ if (!isset($values['data'])) {
+ continue;
+ }
+
+ foreach ($values['data'] as $value) {
+ if (!isset($value['topic_name']) || !isset($value['partitions'])) {
+ continue;
+ }
+
+ $topicName = $value['topic_name'];
+ foreach ($value['partitions'] as $part) {
+ $topicInfos[$topicName][$part['partition_id']] = array(
+ 'offset' => $part['offset'],
+ );
+ }
+ }
+ }
+ $this->context = $topicInfos;
+ $this->topicCount = $this->getTopicCount();
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextTopic();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ return $this->valid;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextTopic();
+ }
+
+ // }}}
+ // {{{ public function count()
+
+ /**
+ * implements Countable function
+ *
+ * @access public
+ * @return integer
+ */
+ public function count()
+ {
+ return $this->topicCount;
+ }
+
+ // }}}
+ // {{{ protected function getTopicCount()
+
+ /**
+ * get message size
+ * only use to object init
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getTopicCount()
+ {
+ $count = 0;
+ foreach (array_values($this->streams) as $key => $stream) {
+ // read topic count
+ $stream->read(8, true);
+ $data = $stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $topicCount = array_shift($data);
+ $count += $topicCount;
+ $this->topicCounts[$key] = $topicCount;
+ if ($count <= 0) {
+ throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count');
+ }
+ }
+
+ return $count;
+ }
+
+ // }}}
+ // {{{ public function loadNextTopic()
+
+ /**
+ * load next topic
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextTopic()
+ {
+ if ($this->validCount >= $this->topicCount) {
+ \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey);
+ return false;
+ }
+
+ if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) {
+ \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey);
+ $this->currentStreamKey++;
+ }
+
+ $lockKeys = array_keys($this->streams);
+ $streams = array_values($this->streams);
+ if (!isset($streams[$this->currentStreamKey])) {
+ return false;
+ }
+
+ $stream = $streams[$this->currentStreamKey];
+ $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey];
+
+ try {
+ $topicLen = $stream->read(2, true);
+ $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen);
+ $topicLen = array_shift($topicLen);
+ if ($topicLen <= 0) {
+ return false;
+ }
+
+ // topic name
+ $this->key = $stream->read($topicLen, true);
+ $this->current = new Partition($this, $this->context);
+ } catch (\Kafka\Exception $e) {
+ return false;
+ }
+
+ $this->validCount++;
+ $this->currentStreamCount++;
+
+ return true;
+ }
+
+ // }}}
+ // {{{ public function getStream()
+
+ /**
+ * get current stream
+ *
+ * @access public
+ * @return \Kafka\Socket
+ */
+ public function getStream()
+ {
+ $streams = array_values($this->streams);
+ return $streams[$this->currentStreamKey];
+ }
+
+ // }}}
+ // }}}
+}