summaryrefslogtreecommitdiff
path: root/vendor/nmred/kafka-php/src/Kafka/Protocol
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/nmred/kafka-php/src/Kafka/Protocol')
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php430
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php652
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php119
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php39
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php117
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php160
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php71
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php175
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php269
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php375
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php345
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php230
12 files changed, 2982 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
new file mode 100644
index 00000000..f1e4b496
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
@@ -0,0 +1,430 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Decoder extends Protocol
+{
+ // {{{ functions
+ // {{{ public function produceResponse()
+
+ /**
+ * decode produce response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function produceResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('produce response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+
+ // parse data struct
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $topicCount = array_shift($topicCount);
+ $offset += 4;
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ 'offset' => $partitionOffset
+ );
+ }
+ }
+
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function fetchResponse()
+
+ /**
+ * decode fetch response
+ *
+ * @param string $data
+ * @access public
+ * @return Iterator
+ */
+ public function fetchResponse()
+ {
+ return new \Kafka\Protocol\Fetch\Topic($this->stream);
+ }
+
+ // }}}
+ // {{{ public function metadataResponse()
+
+ /**
+ * decode metadata response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function metadataResponse()
+ {
+ $result = array();
+ $broker = array();
+ $topic = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('metaData response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0;
+ for ($i = 0; $i < $brokerCount; $i++) {
+ $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $nodeId = $nodeId[1];
+ $offset += 4;
+ $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length
+ $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0;
+ $offset += 2;
+ $hostName = substr($data, $offset, $hostNameLen);
+ $offset += $hostNameLen;
+ $port = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $broker[$nodeId] = array(
+ 'host' => $hostName,
+ 'port' => $port[1],
+ );
+ }
+
+ $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0;
+ for ($i = 0; $i < $topicMetaCount; $i++) {
+ $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen[1]);
+ $offset += $topicLen[1];
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $topic[$topicName]['errCode'] = $topicErrCode[1];
+ $partitions = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0;
+ $offset += 4;
+ $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0;
+ $replias = array();
+ for ($z = 0; $z < $repliasCount; $z++) {
+ $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $replias[] = $repliaId[1];
+ }
+ $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0;
+ $isrs = array();
+ for ($z = 0; $z < $isrCount; $z++) {
+ $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $isrs[] = $isrId[1];
+ }
+
+ $partitions[$partitionId] = array(
+ 'errCode' => $partitionErrCode[1],
+ 'leader' => $leaderId[1],
+ 'replicas' => $replias,
+ 'isr' => $isrs,
+ );
+ }
+ $topic[$topicName]['partitions'] = $partitions;
+ }
+
+ $result = array(
+ 'brokers' => $broker,
+ 'topics' => $topic,
+ );
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function offsetResponse()
+
+ /**
+ * decode offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function offsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $offsetCount = array_shift($offsetCount);
+ $offsetArr = array();
+ for ($z = 0; $z < $offsetCount; $z++) {
+ $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ }
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ 'offset' => $offsetArr
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function commitOffsetResponse()
+
+ /**
+ * decode commit offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function commitOffsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('commit offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function fetchOffsetResponse()
+
+ /**
+ * decode fetch offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function fetchOffsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('fetch offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $metaLen = array_shift($metaLen);
+ $offset += 2;
+ $metaData = '';
+ if ($metaLen) {
+ $metaData = substr($data, $offset, $metaLen);
+ $offset += $metaLen;
+ }
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $result[$topicName][$partitionId[1]] = array(
+ 'offset' => $partitionOffset,
+ 'metadata' => $metaData,
+ 'errCode' => $errCode[1],
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public static function getError()
+
+ /**
+ * get error
+ *
+ * @param integer $errCode
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function getError($errCode)
+ {
+ $error = '';
+ switch($errCode) {
+ case 0:
+ $error = 'No error--it worked!';
+ break;
+ case -1:
+ $error = 'An unexpected server error';
+ break;
+ case 1:
+ $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.';
+ break;
+ case 2:
+ $error = 'This indicates that a message contents does not match its CRC';
+ break;
+ case 3:
+ $error = 'This request is for a topic or partition that does not exist on this broker.';
+ break;
+ case 4:
+ $error = 'The message has a negative size';
+ break;
+ case 5:
+ $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes';
+ break;
+ case 6:
+ $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.';
+ break;
+ case 7:
+ $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.';
+ break;
+ case 8:
+ $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.';
+ break;
+ case 10:
+ $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.';
+ break;
+ case 11:
+ $error = 'Internal error code for broker-to-broker communication.';
+ break;
+ case 12:
+ $error = 'If you specify a string larger than configured maximum for offset metadata';
+ break;
+ case 14:
+ $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).';
+ break;
+ case 15:
+ $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.';
+ break;
+ case 16:
+ $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.';
+ break;
+ default:
+ $error = 'Unknown error';
+ }
+
+ return $error;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
new file mode 100644
index 00000000..7d36e10f
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
@@ -0,0 +1,652 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Encoder extends Protocol
+{
+ // {{{ functions
+ // {{{ public function produceRequest()
+
+ /**
+ * produce request
+ *
+ * @param array $payloads
+ * @static
+ * @access public
+ * @return void
+ */
+ public function produceRequest($payloads, $compression = self::COMPRESSION_NONE)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given procude data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['required_ack'])) {
+ // default server will not send any response
+ // (this is the only case where the server will not reply to a request)
+ $payloads['required_ack'] = 0;
+ }
+
+ if (!isset($payloads['timeout'])) {
+ $payloads['timeout'] = 100; // default timeout 100ms
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::PRODUCE_REQUEST);
+ $data = self::pack(self::BIT_B16, $payloads['required_ack']);
+ $data .= self::pack(self::BIT_B32, $payloads['timeout']);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeProcudeTopic'), $compression);
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function metadataRequest()
+
+ /**
+ * build metadata request protocol
+ *
+ * @param array $topics
+ * @access public
+ * @return string
+ */
+ public function metadataRequest($topics)
+ {
+ if (!is_array($topics)) {
+ $topics = array($topics);
+ }
+
+ foreach ($topics as $topic) {
+ if (!is_string($topic)) {
+ throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. ');
+ }
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST);
+ $data = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16);
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function fetchRequest()
+
+ /**
+ * build fetch request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function fetchRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['replica_id'])) {
+ $payloads['replica_id'] = -1;
+ }
+
+ if (!isset($payloads['max_wait_time'])) {
+ $payloads['max_wait_time'] = 100; // default timeout 100ms
+ }
+
+ if (!isset($payloads['min_bytes'])) {
+ $payloads['min_bytes'] = 64 * 1024; // 64k
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST);
+ $data = self::pack(self::BIT_B32, $payloads['replica_id']);
+ $data .= self::pack(self::BIT_B32, $payloads['max_wait_time']);
+ $data .= self::pack(self::BIT_B32, $payloads['min_bytes']);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function offsetRequest()
+
+ /**
+ * build offset request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function offsetRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['replica_id'])) {
+ $payloads['replica_id'] = -1;
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST);
+ $data = self::pack(self::BIT_B32, $payloads['replica_id']);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function commitOffsetRequest()
+
+ /**
+ * build consumer commit offset request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function commitOffsetRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['group_id'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.');
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST);
+ $data = self::encodeString($payloads['group_id'], self::PACK_INT16);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function fetchOffsetRequest()
+
+ /**
+ * build consumer fetch offset request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function fetchOffsetRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['group_id'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.');
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST);
+ $data = self::encodeString($payloads['group_id'], self::PACK_INT16);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public static function encodeString()
+
+ /**
+ * encode pack string type
+ *
+ * @param string $string
+ * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order.
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE)
+ {
+ $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16;
+ switch ($compression) {
+ case self::COMPRESSION_NONE:
+ break;
+ case self::COMPRESSION_GZIP:
+ $string = gzencode($string);
+ break;
+ case self::COMPRESSION_SNAPPY:
+ throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented');
+ default:
+ throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression);
+ }
+ return self::pack($packLen, strlen($string)) . $string;
+ }
+
+ // }}}
+ // {{{ public static function encodeArray()
+
+ /**
+ * encode key array
+ *
+ * @param array $array
+ * @param Callable $func
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function encodeArray(array $array, $func, $options = null)
+ {
+ if (!is_callable($func, false)) {
+ throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.');
+ }
+
+ $arrayCount = count($array);
+
+ $body = '';
+ foreach ($array as $value) {
+ if (!is_null($options)) {
+ $body .= call_user_func($func, $value, $options);
+ } else {
+ $body .= call_user_func($func, $value);
+ }
+ }
+
+ return self::pack(self::BIT_B32, $arrayCount) . $body;
+ }
+
+ // }}}
+ // {{{ public static function encodeMessageSet()
+
+ /**
+ * encode message set
+ * N.B., MessageSets are not preceded by an int32 like other array elements
+ * in the protocol.
+ *
+ * @param array $messages
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE)
+ {
+ if (!is_array($messages)) {
+ $messages = array($messages);
+ }
+
+ $data = '';
+ foreach ($messages as $message) {
+ $tmpMessage = self::_encodeMessage($message, $compression);
+
+ // int64 -- message offset Message
+ $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32);
+ }
+ return $data;
+ }
+
+ // }}}
+ // {{{ public static function requestHeader()
+
+ /**
+ * get request header
+ *
+ * @param string $clientId
+ * @param integer $correlationId
+ * @param integer $apiKey
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function requestHeader($clientId, $correlationId, $apiKey)
+ {
+ // int16 -- apiKey int16 -- apiVersion int32 correlationId
+ $binData = self::pack(self::BIT_B16, $apiKey);
+ $binData .= self::pack(self::BIT_B16, self::API_VERSION);
+ $binData .= self::pack(self::BIT_B32, $correlationId);
+
+ // concat client id
+ $binData .= self::encodeString($clientId, self::PACK_INT16);
+
+ return $binData;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeMessage()
+
+ /**
+ * encode signal message
+ *
+ * @param string $message
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE)
+ {
+ // int8 -- magic int8 -- attribute
+ $data = self::pack(self::BIT_B8, self::MESSAGE_MAGIC);
+ $data .= self::pack(self::BIT_B8, $compression);
+
+ // message key
+ $data .= self::encodeString('', self::PACK_INT32);
+
+ // message value
+ $data .= self::encodeString($message, self::PACK_INT32, $compression);
+
+ $crc = crc32($data);
+
+ // int32 -- crc code string data
+ $message = self::pack(self::BIT_B32, $crc) . $data;
+
+ return $message;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeProcudePartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeProcudePartion($values, $compression)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['messages']) || empty($values['messages'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.');
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeProcudeTopic()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeProcudeTopic($values, $compression)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression);
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['offset'])) {
+ $values['offset'] = 0;
+ }
+
+ if (!isset($values['max_bytes'])) {
+ $values['max_bytes'] = 100 * 1024 * 1024;
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::pack(self::BIT_B64, $values['offset']);
+ $data .= self::pack(self::BIT_B32, $values['max_bytes']);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchTopic()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchTopic($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeOffsetPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeOffsetPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['time'])) {
+ $values['time'] = -1; // -1
+ }
+
+ if (!isset($values['max_offset'])) {
+ $values['max_offset'] = 100000;
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::pack(self::BIT_B64, $values['time']);
+ $data .= self::pack(self::BIT_B32, $values['max_offset']);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeOffsetTopic()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeOffsetTopic($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeCommitOffsetPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeCommitOffsetPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['offset'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.');
+ }
+
+ if (!isset($values['time'])) {
+ $values['time'] = -1;
+ }
+
+ if (!isset($values['metadata'])) {
+ $values['metadata'] = 'm';
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::pack(self::BIT_B64, $values['offset']);
+ $data .= self::pack(self::BIT_B64, $values['time']);
+ $data .= self::encodeString($values['metadata'], self::PACK_INT16);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeCommitOffset()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeCommitOffset($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchOffsetPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchOffsetPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.');
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchOffset()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchOffset($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
new file mode 100644
index 00000000..8424cf78
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
@@ -0,0 +1,119 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class CommitOffset extends HelperAbstract
+{
+ // {{{ members
+
+ /**
+ * consumer group
+ *
+ * @var string
+ * @access protected
+ */
+ protected $group = '';
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($client)
+ {
+ $this->client = $client;
+ }
+
+ // }}}
+ // {{{ public function setGroup()
+
+ /**
+ * set consumer group
+ *
+ * @access public
+ * @return void
+ */
+ public function setGroup($group)
+ {
+ $this->group = $group;
+ }
+
+ // }}}
+ // {{{ public function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ public function onStreamEof($streamKey)
+ {
+ }
+
+ // }}}
+ // {{{ public function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ public function onTopicEof($topicName)
+ {
+ }
+
+ // }}}
+ // {{{ public function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ public function onPartitionEof($partition)
+ {
+ $partitionId = $partition->key();
+ $topicName = $partition->getTopicName();
+ $offset = $partition->getMessageOffset();
+ $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId);
+ $offsetObject->setOffset($offset);
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
new file mode 100644
index 00000000..acf0223e
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
@@ -0,0 +1,39 @@
+<?php
+namespace Kafka\Protocol\Fetch\Helper;
+/**
+ * Description of Consumer
+ *
+ * @author daniel
+ */
+class Consumer extends HelperAbstract
+{
+ protected $consumer;
+
+ protected $offsetStrategy;
+
+
+ public function __construct(\Kafka\Consumer $consumer)
+ {
+ $this->consumer = $consumer;
+ }
+
+
+ public function onPartitionEof($partition)
+ {
+ $partitionId = $partition->key();
+ $topicName = $partition->getTopicName();
+ $offset = $partition->getMessageOffset();
+ $this->consumer->setFromOffset(true);
+ $this->consumer->setPartition($topicName, $partitionId, ($offset +1));
+ }
+
+ public function onStreamEof($streamKey)
+ {
+
+ }
+
+ public function onTopicEof($topicName)
+ {
+
+ }
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
new file mode 100644
index 00000000..bba38dd6
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
@@ -0,0 +1,117 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class FreeStream extends HelperAbstract
+{
+ // {{{ members
+
+ /**
+ * streams
+ *
+ * @var array
+ * @access protected
+ */
+ protected $streams = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($client)
+ {
+ $this->client = $client;
+ }
+
+ // }}}
+ // {{{ public function setStreams()
+
+ /**
+ * set streams
+ *
+ * @access public
+ * @return void
+ */
+ public function setStreams($streams)
+ {
+ $this->streams = $streams;
+ }
+
+ // }}}
+ // {{{ public function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ public function onStreamEof($streamKey)
+ {
+ if (isset($this->streams[$streamKey])) {
+ $this->client->freeStream($streamKey);
+ }
+ }
+
+ // }}}
+ // {{{ public function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ public function onTopicEof($topicName)
+ {
+ }
+
+ // }}}
+ // {{{ public function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ public function onPartitionEof($partition)
+ {
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
new file mode 100644
index 00000000..4ec23926
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
@@ -0,0 +1,160 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Helper
+{
+ // {{{ members
+
+ /**
+ * helper object
+ */
+ private static $helpers = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public staitc function registerHelper()
+
+ /**
+ * register helper
+ *
+ * @param string $key
+ * @param \Kafka\Protocol\Fetch\Helper\HelperAbstract $helper
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function registerHelper($key, $helper = null)
+ {
+ if (is_null($helper)) {
+ $className = '\\Kafka\\Protocol\\Fetch\\Helper\\' . $key;
+ if (!class_exists($className)) {
+ throw new \Kafka\Exception('helper is not exists.');
+ }
+ $helper = new $className();
+ }
+
+ if ($helper instanceof \Kafka\Protocol\Fetch\Helper\HelperAbstract) {
+ self::$helpers[$key] = $helper;
+ } else {
+ throw new \Kafka\Exception('this helper not instance of `\Kafka\Protocol\Fetch\Helper\HelperAbstract`');
+ }
+ }
+
+ // }}}
+ // {{{ public staitc function unRegisterHelper()
+
+ /**
+ * unregister helper
+ *
+ * @param string $key
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function unRegisterHelper($key)
+ {
+ if (isset(self::$helpers[$key])) {
+ unset(self::$helpers[$key]);
+ }
+ }
+
+ // }}}
+ // {{{ public static function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onStreamEof($streamKey)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onStreamEof')) {
+ $helper->onStreamEof($streamKey);
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public static function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onTopicEof($topicName)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onTopicEof')) {
+ $helper->onStreamEof($topicName);
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public static function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onPartitionEof($partition)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onPartitionEof')) {
+ $helper->onPartitionEof($partition);
+ }
+ }
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php
new file mode 100644
index 00000000..476f3da1
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php
@@ -0,0 +1,71 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+abstract class HelperAbstract
+{
+ // {{{ members
+ // }}}
+ // {{{ functions
+ // {{{ abstract public function onStreamEof()
+
+ /**
+ * on stream eof
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ abstract public function onStreamEof($streamKey);
+
+ // }}}
+ // {{{ abstract public function onTopicEof()
+
+ /**
+ * on topic eof
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ abstract public function onTopicEof($topicName);
+
+ // }}}
+ // {{{ abstract public function onPartitionEof()
+
+ /**
+ * on partition eof
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ abstract public function onPartitionEof($partition);
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
new file mode 100644
index 00000000..42d7da1d
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
@@ -0,0 +1,175 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Message
+{
+ // {{{ members
+
+ /**
+ * init read bytes
+ *
+ * @var float
+ * @access private
+ */
+ private $initOffset = 0;
+
+ /**
+ * validByteCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validByteCount = 0;
+
+ /**
+ * crc32 code
+ *
+ * @var float
+ * @access private
+ */
+ private $crc = 0;
+
+ /**
+ * This is a version id used to allow backwards compatible evolution of the
+ * message binary format.
+ *
+ * @var float
+ * @access private
+ */
+ private $magic = 0;
+
+ /**
+ * The lowest 2 bits contain the compression codec used for the message. The
+ * other bits should be set to 0.
+ *
+ * @var float
+ * @access private
+ */
+ private $attribute = 0;
+
+ /**
+ * message key
+ *
+ * @var string
+ * @access private
+ */
+ private $key = '';
+
+ /**
+ * message value
+ *
+ * @var string
+ * @access private
+ */
+ private $value = '';
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param string(raw) $msg
+ * @access public
+ * @return void
+ */
+ public function __construct($msg)
+ {
+ $offset = 0;
+ $crc = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $offset += 4;
+ $this->crc = array_shift($crc);
+ $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
+ $this->magic = array_shift($magic);
+ $offset += 1;
+ $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
+ $this->attribute = array_shift($attr);
+ $offset += 1;
+ $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $keyLen = array_shift($keyLen);
+ $offset += 4;
+ if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) {
+ $this->key = substr($msg, $offset, $keyLen);
+ $offset += $keyLen;
+ }
+ $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $messageSize = array_shift($messageSize);
+ $offset += 4;
+ if ($messageSize) {
+ $this->value = substr($msg, $offset, $messageSize);
+ }
+ }
+
+ // }}}
+ // {{{ public function getMessage()
+
+ /**
+ * get message data
+ *
+ * @access public
+ * @return string (raw)
+ */
+ public function getMessage()
+ {
+ return $this->value;
+ }
+
+ // }}}
+ // {{{ public function getMessageKey()
+
+ /**
+ * get message key
+ *
+ * @access public
+ * @return string (raw)
+ */
+ public function getMessageKey()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function __toString()
+
+ /**
+ * __toString
+ *
+ * @access public
+ * @return void
+ */
+ public function __toString()
+ {
+ return $this->value;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
new file mode 100644
index 00000000..50413b67
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
@@ -0,0 +1,269 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class MessageSet implements \Iterator
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var mixed
+ * @access private
+ */
+ private $stream = null;
+
+ /**
+ * messageSet size
+ *
+ * @var float
+ * @access private
+ */
+ private $messageSetSize = 0;
+
+ /**
+ * validByteCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validByteCount = 0;
+
+ /**
+ * messageSet offset
+ *
+ * @var float
+ * @access private
+ */
+ private $offset = 0;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * partition object
+ *
+ * @var \Kafka\Protocol\Fetch\Partition
+ * @access private
+ */
+ private $partition = null;
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array())
+ {
+ $this->stream = $partition->getStream();
+ $this->partition = $partition;
+ $this->context = $context;
+ $this->messageSetSize = $this->getMessageSetSize();
+ \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO);
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->validByteCount;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextMessage();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ if (!$this->valid) {
+ $this->partition->setMessageOffset($this->offset);
+
+ // one partition iterator end
+ \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition);
+ }
+
+ return $this->valid;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextMessage();
+ }
+
+ // }}}
+ // {{{ protected function getMessageSetSize()
+
+ /**
+ * get message set size
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getMessageSetSize()
+ {
+ // read message size
+ $data = $this->stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $size = array_shift($data);
+ if ($size <= 0) {
+ throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size');
+ }
+
+ return $size;
+ }
+
+ // }}}
+ // {{{ public function loadNextMessage()
+
+ /**
+ * load next message
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextMessage()
+ {
+ if ($this->validByteCount >= $this->messageSetSize) {
+ return false;
+ }
+
+ try {
+ if ($this->validByteCount + 12 > $this->messageSetSize) {
+ // read socket buffer dirty data
+ $this->stream->read($this->messageSetSize - $this->validByteCount);
+ return false;
+ }
+ $offset = $this->stream->read(8, true);
+ $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
+ $messageSize = $this->stream->read(4, true);
+ $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize);
+ $messageSize = array_shift($messageSize);
+ $this->validByteCount += 12;
+ if (($this->validByteCount + $messageSize) > $this->messageSetSize) {
+ // read socket buffer dirty data
+ $this->stream->read($this->messageSetSize - $this->validByteCount);
+ return false;
+ }
+ $msg = $this->stream->read($messageSize, true);
+ $this->current = new Message($msg);
+ } catch (\Kafka\Exception $e) {
+ \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO);
+ return false;
+ }
+
+ $this->validByteCount += $messageSize;
+
+ return true;
+ }
+
+ // }}}
+ // {{{ public function messageOffset()
+
+ /**
+ * current message offset in producer
+ *
+ * @return void
+ */
+ public function messageOffset()
+ {
+ return $this->offset;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
new file mode 100644
index 00000000..9f8578d5
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
@@ -0,0 +1,375 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Partition implements \Iterator, \Countable
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var mixed
+ * @access private
+ */
+ private $stream = null;
+
+ /**
+ * validCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validCount = 0;
+
+ /**
+ * partitions count
+ *
+ * @var float
+ * @access private
+ */
+ private $partitionCount = false;
+
+ /**
+ * current topic
+ *
+ * @var mixed
+ * @access private
+ */
+ private $current = null;
+
+ /**
+ * current iterator key
+ * partition id
+ *
+ * @var string
+ * @access private
+ */
+ private $key = null;
+
+ /**
+ * partition errCode
+ *
+ * @var float
+ * @access private
+ */
+ private $errCode = 0;
+
+ /**
+ * partition offset
+ *
+ * @var float
+ * @access private
+ */
+ private $offset = 0;
+
+ /**
+ * partition current fetch offset
+ *
+ * @var float
+ * @access private
+ */
+ private $currentOffset = 0;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * cuerrent topic name
+ *
+ * @var string
+ * @access private
+ */
+ private $topicName = '';
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Protocol\Fetch\Topic $topic
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = array())
+ {
+ $this->stream = $topic->getStream();
+ $this->topicName = $topic->key();
+ $this->context = $context;
+ $this->partitionCount = $this->getPartitionCount();
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextPartition();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ return $this->valid && $this->validCount <= $this->partitionCount;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextPartition();
+ }
+
+ // }}}
+ // {{{ public function count()
+
+ /**
+ * implements Countable function
+ *
+ * @access public
+ * @return integer
+ */
+ public function count()
+ {
+ return $this->partitionCount;
+ }
+
+ // }}}
+ // {{{ public function getErrCode()
+
+ /**
+ * get partition errcode
+ *
+ * @access public
+ * @return void
+ */
+ public function getErrCode()
+ {
+ return $this->errCode;
+ }
+
+ // }}}
+ // {{{ public function getHighOffset()
+
+ /**
+ * get partition high offset
+ *
+ * @access public
+ * @return void
+ */
+ public function getHighOffset()
+ {
+ return $this->offset;
+ }
+
+ // }}}
+ // {{{ public function getTopicName()
+
+ /**
+ * get partition topic name
+ *
+ * @access public
+ * @return void
+ */
+ public function getTopicName()
+ {
+ return $this->topicName;
+ }
+
+ // }}}
+ // {{{ public function getStream()
+
+ /**
+ * get current stream
+ *
+ * @access public
+ * @return \Kafka\Socket
+ */
+ public function getStream()
+ {
+ return $this->stream;
+ }
+
+ // }}}
+ // {{{ protected function getPartitionCount()
+
+ /**
+ * get message size
+ * only use to object init
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getPartitionCount()
+ {
+ // read topic count
+ $data = $this->stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $count = array_shift($data);
+ if ($count <= 0) {
+ throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count');
+ }
+
+ return $count;
+ }
+
+ // }}}
+ // {{{ public function loadNextPartition()
+
+ /**
+ * load next partition
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextPartition()
+ {
+ if ($this->validCount >= $this->partitionCount) {
+ return false;
+ }
+
+ try {
+ $partitionId = $this->stream->read(4, true);
+ $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId);
+ $partitionId = array_shift($partitionId);
+ \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO);
+
+ $errCode = $this->stream->read(2, true);
+ $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode);
+ $this->errCode = array_shift($errCode);
+ if ($this->errCode != 0) {
+ throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode));
+ }
+ $offset = $this->stream->read(8, true);
+ $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
+
+ $this->key = $partitionId;
+ $this->current = new MessageSet($this, $this->context);
+ } catch (\Kafka\Exception $e) {
+ return false;
+ }
+
+ $this->validCount++;
+ return true;
+ }
+
+ // }}}
+ // {{{ public function setMessageOffset()
+
+ /**
+ * set messageSet fetch offset current
+ *
+ * @param intger $offset
+ * @return void
+ */
+ public function setMessageOffset($offset)
+ {
+ $this->currentOffset = $offset;
+ }
+
+ // }}}
+ // {{{ public function getMessageOffset()
+
+ /**
+ * get messageSet fetch offset current
+ *
+ * @return int
+ */
+ public function getMessageOffset()
+ {
+ return $this->currentOffset;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
new file mode 100644
index 00000000..500e6b1f
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
@@ -0,0 +1,345 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Topic implements \Iterator, \Countable
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var array
+ * @access private
+ */
+ private $streams = array();
+
+ /**
+ * each topic count
+ *
+ * @var array
+ * @access private
+ */
+ private $topicCounts = array();
+
+ /**
+ * current iterator stream
+ *
+ * @var mixed
+ * @access private
+ */
+ private $currentStreamKey = 0;
+
+ /**
+ * current lock key
+ *
+ * @var string
+ * @access private
+ */
+ private $currentStreamLockKey = '';
+
+ /**
+ * currentStreamCount
+ *
+ * @var float
+ * @access private
+ */
+ private $currentStreamCount = 0;
+
+ /**
+ * validCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validCount = 0;
+
+ /**
+ * topic count
+ *
+ * @var float
+ * @access private
+ */
+ private $topicCount = false;
+
+ /**
+ * current topic
+ *
+ * @var mixed
+ * @access private
+ */
+ private $current = null;
+
+ /**
+ * current iterator key
+ * topic name
+ *
+ * @var string
+ * @access private
+ */
+ private $key = null;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct($streams, $context = array())
+ {
+ if (!is_array($streams)) {
+ $streams = array($streams);
+ }
+ $this->streams = $streams;
+ $topicInfos = array();
+ foreach ($context as $values) {
+ if (!isset($values['data'])) {
+ continue;
+ }
+
+ foreach ($values['data'] as $value) {
+ if (!isset($value['topic_name']) || !isset($value['partitions'])) {
+ continue;
+ }
+
+ $topicName = $value['topic_name'];
+ foreach ($value['partitions'] as $part) {
+ $topicInfos[$topicName][$part['partition_id']] = array(
+ 'offset' => $part['offset'],
+ );
+ }
+ }
+ }
+ $this->context = $topicInfos;
+ $this->topicCount = $this->getTopicCount();
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextTopic();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ return $this->valid;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextTopic();
+ }
+
+ // }}}
+ // {{{ public function count()
+
+ /**
+ * implements Countable function
+ *
+ * @access public
+ * @return integer
+ */
+ public function count()
+ {
+ return $this->topicCount;
+ }
+
+ // }}}
+ // {{{ protected function getTopicCount()
+
+ /**
+ * get message size
+ * only use to object init
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getTopicCount()
+ {
+ $count = 0;
+ foreach (array_values($this->streams) as $key => $stream) {
+ // read topic count
+ $stream->read(8, true);
+ $data = $stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $topicCount = array_shift($data);
+ $count += $topicCount;
+ $this->topicCounts[$key] = $topicCount;
+ if ($count <= 0) {
+ throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count');
+ }
+ }
+
+ return $count;
+ }
+
+ // }}}
+ // {{{ public function loadNextTopic()
+
+ /**
+ * load next topic
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextTopic()
+ {
+ if ($this->validCount >= $this->topicCount) {
+ \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey);
+ return false;
+ }
+
+ if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) {
+ \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey);
+ $this->currentStreamKey++;
+ }
+
+ $lockKeys = array_keys($this->streams);
+ $streams = array_values($this->streams);
+ if (!isset($streams[$this->currentStreamKey])) {
+ return false;
+ }
+
+ $stream = $streams[$this->currentStreamKey];
+ $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey];
+
+ try {
+ $topicLen = $stream->read(2, true);
+ $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen);
+ $topicLen = array_shift($topicLen);
+ if ($topicLen <= 0) {
+ return false;
+ }
+
+ // topic name
+ $this->key = $stream->read($topicLen, true);
+ $this->current = new Partition($this, $this->context);
+ } catch (\Kafka\Exception $e) {
+ return false;
+ }
+
+ $this->validCount++;
+ $this->currentStreamCount++;
+
+ return true;
+ }
+
+ // }}}
+ // {{{ public function getStream()
+
+ /**
+ * get current stream
+ *
+ * @access public
+ * @return \Kafka\Socket
+ */
+ public function getStream()
+ {
+ $streams = array_values($this->streams);
+ return $streams[$this->currentStreamKey];
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
new file mode 100644
index 00000000..a31067b5
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
@@ -0,0 +1,230 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+abstract class Protocol
+{
+ // {{{ consts
+
+ /**
+ * Kafka server protocol version
+ */
+ const API_VERSION = 0;
+
+ /**
+ * use encode message, This is a version id used to allow backwards
+ * compatible evolution of the message binary format.
+ */
+ const MESSAGE_MAGIC = 0;
+
+ /**
+ * message no compression
+ */
+ const COMPRESSION_NONE = 0;
+
+ /**
+ * Message using gzip compression
+ */
+ const COMPRESSION_GZIP = 1;
+
+ /**
+ * Message using Snappy compression
+ */
+ const COMPRESSION_SNAPPY = 2;
+
+ /**
+ * pack int32 type
+ */
+ const PACK_INT32 = 0;
+
+ /**
+ * pack int16 type
+ */
+ const PACK_INT16 = 1;
+
+ /**
+ * protocol request code
+ */
+ const PRODUCE_REQUEST = 0;
+ const FETCH_REQUEST = 1;
+ const OFFSET_REQUEST = 2;
+ const METADATA_REQUEST = 3;
+ const OFFSET_COMMIT_REQUEST = 8;
+ const OFFSET_FETCH_REQUEST = 9;
+ const CONSUMER_METADATA_REQUEST = 10;
+
+ // unpack/pack bit
+ const BIT_B64 = 'N2';
+ const BIT_B32 = 'N';
+ const BIT_B16 = 'n';
+ const BIT_B8 = 'C';
+
+ // }}}
+ // {{{ members
+
+ /**
+ * stream
+ *
+ * @var mixed
+ * @access protected
+ */
+ protected $stream = null;
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Socket $stream)
+ {
+ $this->stream = $stream;
+ }
+
+ // }}}
+ // {{{ public static function Khex2bin()
+
+ /**
+ * hex to bin
+ *
+ * @param string $string
+ * @static
+ * @access protected
+ * @return string (raw)
+ */
+ public static function Khex2bin($string)
+ {
+ if (function_exists('\hex2bin')) {
+ return \hex2bin($string);
+ } else {
+ $bin = '';
+ $len = strlen($string);
+ for ($i = 0; $i < $len; $i += 2) {
+ $bin .= pack('H*', substr($string, $i, 2));
+ }
+
+ return $bin;
+ }
+ }
+
+ // }}}
+ // {{{ public static function unpack()
+
+ /**
+ * Unpack a bit integer as big endian long
+ *
+ * @static
+ * @access public
+ * @return integer
+ */
+ public static function unpack($type, $bytes)
+ {
+ self::checkLen($type, $bytes);
+ if ($type == self::BIT_B64) {
+ $set = unpack($type, $bytes);
+ $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF);
+ return $original;
+ } else {
+ return unpack($type, $bytes);
+ }
+ }
+
+ // }}}
+ // {{{ public static function pack()
+
+ /**
+ * pack a bit integer as big endian long
+ *
+ * @static
+ * @access public
+ * @return integer
+ */
+ public static function pack($type, $data)
+ {
+ if ($type == self::BIT_B64) {
+ if ($data == -1) { // -1L
+ $data = self::Khex2bin('ffffffffffffffff');
+ } elseif ($data == -2) { // -2L
+ $data = self::Khex2bin('fffffffffffffffe');
+ } else {
+ $left = 0xffffffff00000000;
+ $right = 0x00000000ffffffff;
+
+ $l = ($data & $left) >> 32;
+ $r = $data & $right;
+ $data = pack($type, $l, $r);
+ }
+ } else {
+ $data = pack($type, $data);
+ }
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function checkLen()
+
+ /**
+ * check unpack bit is valid
+ *
+ * @param string $type
+ * @param string(raw) $bytes
+ * @static
+ * @access protected
+ * @return void
+ */
+ protected static function checkLen($type, $bytes)
+ {
+ $len = 0;
+ switch($type) {
+ case self::BIT_B64:
+ $len = 8;
+ break;
+ case self::BIT_B32:
+ $len = 4;
+ break;
+ case self::BIT_B16:
+ $len = 2;
+ break;
+ case self::BIT_B8:
+ $len = 1;
+ break;
+ }
+
+ if (strlen($bytes) != $len) {
+ throw new \Kafka\Exception\Protocol('unpack failed. string(raw) length is ' . strlen($bytes) . ' , TO ' . $type);
+ }
+ }
+
+ // }}}
+ // }}}
+}