stream = $topic->getStream(); $this->topicName = $topic->key(); $this->context = $context; $this->partitionCount = $this->getPartitionCount(); } // }}} // {{{ public function current() /** * current * * @access public * @return void */ public function current() { return $this->current; } // }}} // {{{ public function key() /** * key * * @access public * @return void */ public function key() { return $this->key; } // }}} // {{{ public function rewind() /** * implements Iterator function * * @access public * @return integer */ public function rewind() { $this->valid = $this->loadNextPartition(); } // }}} // {{{ public function valid() /** * implements Iterator function * * @access public * @return integer */ public function valid() { return $this->valid && $this->validCount <= $this->partitionCount; } // }}} // {{{ public function next() /** * implements Iterator function * * @access public * @return integer */ public function next() { $this->valid = $this->loadNextPartition(); } // }}} // {{{ public function count() /** * implements Countable function * * @access public * @return integer */ public function count() { return $this->partitionCount; } // }}} // {{{ public function getErrCode() /** * get partition errcode * * @access public * @return void */ public function getErrCode() { return $this->errCode; } // }}} // {{{ public function getHighOffset() /** * get partition high offset * * @access public * @return void */ public function getHighOffset() { return $this->offset; } // }}} // {{{ public function getTopicName() /** * get partition topic name * * @access public * @return void */ public function getTopicName() { return $this->topicName; } // }}} // {{{ public function getStream() /** * get current stream * * @access public * @return \Kafka\Socket */ public function getStream() { return $this->stream; } // }}} // {{{ protected function getPartitionCount() /** * get message size * only use to object init * * @access protected * @return integer */ protected function getPartitionCount() { // read topic count $data = $this->stream->read(4, true); $data = Decoder::unpack(Decoder::BIT_B32, $data); $count = array_shift($data); if ($count <= 0) { throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count'); } return $count; } // }}} // {{{ public function loadNextPartition() /** * load next partition * * @access public * @return void */ public function loadNextPartition() { if ($this->validCount >= $this->partitionCount) { return false; } try { $partitionId = $this->stream->read(4, true); $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId); $partitionId = array_shift($partitionId); \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO); $errCode = $this->stream->read(2, true); $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode); $this->errCode = array_shift($errCode); if ($this->errCode != 0) { throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode)); } $offset = $this->stream->read(8, true); $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); $this->key = $partitionId; $this->current = new MessageSet($this, $this->context); } catch (\Kafka\Exception $e) { return false; } $this->validCount++; return true; } // }}} // {{{ public function setMessageOffset() /** * set messageSet fetch offset current * * @param intger $offset * @return void */ public function setMessageOffset($offset) { $this->currentOffset = $offset; } // }}} // {{{ public function getMessageOffset() /** * get messageSet fetch offset current * * @return int */ public function getMessageOffset() { return $this->currentOffset; } // }}} // }}} }