diff options
Diffstat (limited to 'vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php')
-rw-r--r-- | vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php new file mode 100644 index 00000000..50413b67 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php @@ -0,0 +1,269 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class MessageSet implements \Iterator +{ + // {{{ members + + /** + * kafka socket object + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * messageSet size + * + * @var float + * @access private + */ + private $messageSetSize = 0; + + /** + * validByteCount + * + * @var float + * @access private + */ + private $validByteCount = 0; + + /** + * messageSet offset + * + * @var float + * @access private + */ + private $offset = 0; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * partition object + * + * @var \Kafka\Protocol\Fetch\Partition + * @access private + */ + private $partition = null; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @param int $initOffset + * @access public + * @return void + */ + public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array()) + { + $this->stream = $partition->getStream(); + $this->partition = $partition; + $this->context = $context; + $this->messageSetSize = $this->getMessageSetSize(); + \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->validByteCount; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + if (!$this->valid) { + $this->partition->setMessageOffset($this->offset); + + // one partition iterator end + \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition); + } + + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ protected function getMessageSetSize() + + /** + * get message set size + * + * @access protected + * @return integer + */ + protected function getMessageSetSize() + { + // read message size + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $size = array_shift($data); + if ($size <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size'); + } + + return $size; + } + + // }}} + // {{{ public function loadNextMessage() + + /** + * load next message + * + * @access public + * @return void + */ + public function loadNextMessage() + { + if ($this->validByteCount >= $this->messageSetSize) { + return false; + } + + try { + if ($this->validByteCount + 12 > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + $messageSize = $this->stream->read(4, true); + $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize); + $messageSize = array_shift($messageSize); + $this->validByteCount += 12; + if (($this->validByteCount + $messageSize) > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $msg = $this->stream->read($messageSize, true); + $this->current = new Message($msg); + } catch (\Kafka\Exception $e) { + \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO); + return false; + } + + $this->validByteCount += $messageSize; + + return true; + } + + // }}} + // {{{ public function messageOffset() + + /** + * current message offset in producer + * + * @return void + */ + public function messageOffset() + { + return $this->offset; + } + + // }}} + // }}} +} |