diff options
Diffstat (limited to 'vendor/nmred/kafka-php/src/Kafka/Protocol')
12 files changed, 2982 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php new file mode 100644 index 00000000..f1e4b496 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php @@ -0,0 +1,430 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Decoder extends Protocol +{ + // {{{ functions + // {{{ public function produceResponse() + + /** + * decode produce response + * + * @param string $data + * @access public + * @return array + */ + public function produceResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('produce response invalid.'); + } + $data = $this->stream->read($dataLen, true); + + // parse data struct + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $topicCount = array_shift($topicCount); + $offset += 4; + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + 'offset' => $partitionOffset + ); + } + } + + return $result; + } + + // }}} + // {{{ public function fetchResponse() + + /** + * decode fetch response + * + * @param string $data + * @access public + * @return Iterator + */ + public function fetchResponse() + { + return new \Kafka\Protocol\Fetch\Topic($this->stream); + } + + // }}} + // {{{ public function metadataResponse() + + /** + * decode metadata response + * + * @param string $data + * @access public + * @return array + */ + public function metadataResponse() + { + $result = array(); + $broker = array(); + $topic = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('metaData response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0; + for ($i = 0; $i < $brokerCount; $i++) { + $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $nodeId = $nodeId[1]; + $offset += 4; + $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length + $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0; + $offset += 2; + $hostName = substr($data, $offset, $hostNameLen); + $offset += $hostNameLen; + $port = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $broker[$nodeId] = array( + 'host' => $hostName, + 'port' => $port[1], + ); + } + + $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0; + for ($i = 0; $i < $topicMetaCount; $i++) { + $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $topicName = substr($data, $offset, $topicLen[1]); + $offset += $topicLen[1]; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $topic[$topicName]['errCode'] = $topicErrCode[1]; + $partitions = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0; + $offset += 4; + $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0; + $replias = array(); + for ($z = 0; $z < $repliasCount; $z++) { + $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $replias[] = $repliaId[1]; + } + $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0; + $isrs = array(); + for ($z = 0; $z < $isrCount; $z++) { + $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $isrs[] = $isrId[1]; + } + + $partitions[$partitionId] = array( + 'errCode' => $partitionErrCode[1], + 'leader' => $leaderId[1], + 'replicas' => $replias, + 'isr' => $isrs, + ); + } + $topic[$topicName]['partitions'] = $partitions; + } + + $result = array( + 'brokers' => $broker, + 'topics' => $topic, + ); + return $result; + } + + // }}} + // {{{ public function offsetResponse() + + /** + * decode offset response + * + * @param string $data + * @access public + * @return array + */ + public function offsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $offsetCount = array_shift($offsetCount); + $offsetArr = array(); + for ($z = 0; $z < $offsetCount; $z++) { + $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + } + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + 'offset' => $offsetArr + ); + } + } + return $result; + } + + // }}} + // {{{ public function commitOffsetResponse() + + /** + * decode commit offset response + * + * @param string $data + * @access public + * @return array + */ + public function commitOffsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('commit offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $result[$topicName][$partitionId[1]] = array( + 'errCode' => $errCode[1], + ); + } + } + return $result; + } + + // }}} + // {{{ public function fetchOffsetResponse() + + /** + * decode fetch offset response + * + * @param string $data + * @access public + * @return array + */ + public function fetchOffsetResponse() + { + $result = array(); + $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); + $dataLen = array_shift($dataLen); + if (!$dataLen) { + throw new \Kafka\Exception\Protocol('fetch offset response invalid.'); + } + $data = $this->stream->read($dataLen, true); + $offset = 4; + $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $topicCount = array_shift($topicCount); + for ($i = 0; $i < $topicCount; $i++) { + $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length + $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0; + $offset += 2; + $topicName = substr($data, $offset, $topicLen); + $offset += $topicLen; + $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0; + $offset += 4; + $result[$topicName] = array(); + for ($j = 0; $j < $partitionCount; $j++) { + $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); + $offset += 4; + $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $metaLen = array_shift($metaLen); + $offset += 2; + $metaData = ''; + if ($metaLen) { + $metaData = substr($data, $offset, $metaLen); + $offset += $metaLen; + } + $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $offset += 2; + $result[$topicName][$partitionId[1]] = array( + 'offset' => $partitionOffset, + 'metadata' => $metaData, + 'errCode' => $errCode[1], + ); + } + } + return $result; + } + + // }}} + // {{{ public static function getError() + + /** + * get error + * + * @param integer $errCode + * @static + * @access public + * @return string + */ + public static function getError($errCode) + { + $error = ''; + switch($errCode) { + case 0: + $error = 'No error--it worked!'; + break; + case -1: + $error = 'An unexpected server error'; + break; + case 1: + $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.'; + break; + case 2: + $error = 'This indicates that a message contents does not match its CRC'; + break; + case 3: + $error = 'This request is for a topic or partition that does not exist on this broker.'; + break; + case 4: + $error = 'The message has a negative size'; + break; + case 5: + $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes'; + break; + case 6: + $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.'; + break; + case 7: + $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.'; + break; + case 8: + $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.'; + break; + case 10: + $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.'; + break; + case 11: + $error = 'Internal error code for broker-to-broker communication.'; + break; + case 12: + $error = 'If you specify a string larger than configured maximum for offset metadata'; + break; + case 14: + $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).'; + break; + case 15: + $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.'; + break; + case 16: + $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.'; + break; + default: + $error = 'Unknown error'; + } + + return $error; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php new file mode 100644 index 00000000..7d36e10f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php @@ -0,0 +1,652 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Encoder extends Protocol +{ + // {{{ functions + // {{{ public function produceRequest() + + /** + * produce request + * + * @param array $payloads + * @static + * @access public + * @return void + */ + public function produceRequest($payloads, $compression = self::COMPRESSION_NONE) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given procude data invalid. `data` is undefined.'); + } + + if (!isset($payloads['required_ack'])) { + // default server will not send any response + // (this is the only case where the server will not reply to a request) + $payloads['required_ack'] = 0; + } + + if (!isset($payloads['timeout'])) { + $payloads['timeout'] = 100; // default timeout 100ms + } + + $header = self::requestHeader('kafka-php', 0, self::PRODUCE_REQUEST); + $data = self::pack(self::BIT_B16, $payloads['required_ack']); + $data .= self::pack(self::BIT_B32, $payloads['timeout']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeProcudeTopic'), $compression); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function metadataRequest() + + /** + * build metadata request protocol + * + * @param array $topics + * @access public + * @return string + */ + public function metadataRequest($topics) + { + if (!is_array($topics)) { + $topics = array($topics); + } + + foreach ($topics as $topic) { + if (!is_string($topic)) { + throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. '); + } + } + + $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST); + $data = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchRequest() + + /** + * build fetch request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + if (!isset($payloads['max_wait_time'])) { + $payloads['max_wait_time'] = 100; // default timeout 100ms + } + + if (!isset($payloads['min_bytes'])) { + $payloads['min_bytes'] = 64 * 1024; // 64k + } + + $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::pack(self::BIT_B32, $payloads['max_wait_time']); + $data .= self::pack(self::BIT_B32, $payloads['min_bytes']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function offsetRequest() + + /** + * build offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function offsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function commitOffsetRequest() + + /** + * build consumer commit offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function commitOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchOffsetRequest() + + /** + * build consumer fetch offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public static function encodeString() + + /** + * encode pack string type + * + * @param string $string + * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order. + * @static + * @access public + * @return string + */ + public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE) + { + $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16; + switch ($compression) { + case self::COMPRESSION_NONE: + break; + case self::COMPRESSION_GZIP: + $string = gzencode($string); + break; + case self::COMPRESSION_SNAPPY: + throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented'); + default: + throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression); + } + return self::pack($packLen, strlen($string)) . $string; + } + + // }}} + // {{{ public static function encodeArray() + + /** + * encode key array + * + * @param array $array + * @param Callable $func + * @static + * @access public + * @return string + */ + public static function encodeArray(array $array, $func, $options = null) + { + if (!is_callable($func, false)) { + throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.'); + } + + $arrayCount = count($array); + + $body = ''; + foreach ($array as $value) { + if (!is_null($options)) { + $body .= call_user_func($func, $value, $options); + } else { + $body .= call_user_func($func, $value); + } + } + + return self::pack(self::BIT_B32, $arrayCount) . $body; + } + + // }}} + // {{{ public static function encodeMessageSet() + + /** + * encode message set + * N.B., MessageSets are not preceded by an int32 like other array elements + * in the protocol. + * + * @param array $messages + * @static + * @access public + * @return string + */ + public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE) + { + if (!is_array($messages)) { + $messages = array($messages); + } + + $data = ''; + foreach ($messages as $message) { + $tmpMessage = self::_encodeMessage($message, $compression); + + // int64 -- message offset Message + $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32); + } + return $data; + } + + // }}} + // {{{ public static function requestHeader() + + /** + * get request header + * + * @param string $clientId + * @param integer $correlationId + * @param integer $apiKey + * @static + * @access public + * @return void + */ + public static function requestHeader($clientId, $correlationId, $apiKey) + { + // int16 -- apiKey int16 -- apiVersion int32 correlationId + $binData = self::pack(self::BIT_B16, $apiKey); + $binData .= self::pack(self::BIT_B16, self::API_VERSION); + $binData .= self::pack(self::BIT_B32, $correlationId); + + // concat client id + $binData .= self::encodeString($clientId, self::PACK_INT16); + + return $binData; + } + + // }}} + // {{{ protected static function _encodeMessage() + + /** + * encode signal message + * + * @param string $message + * @static + * @access protected + * @return string + */ + protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE) + { + // int8 -- magic int8 -- attribute + $data = self::pack(self::BIT_B8, self::MESSAGE_MAGIC); + $data .= self::pack(self::BIT_B8, $compression); + + // message key + $data .= self::encodeString('', self::PACK_INT32); + + // message value + $data .= self::encodeString($message, self::PACK_INT32, $compression); + + $crc = crc32($data); + + // int32 -- crc code string data + $message = self::pack(self::BIT_B32, $crc) . $data; + + return $message; + } + + // }}} + // {{{ protected static function _encodeProcudePartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudePartion($values, $compression) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['messages']) || empty($values['messages'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32); + + return $data; + } + + // }}} + // {{{ protected static function _encodeProcudeTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudeTopic($values, $compression) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + $values['offset'] = 0; + } + + if (!isset($values['max_bytes'])) { + $values['max_bytes'] = 100 * 1024 * 1024; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B32, $values['max_bytes']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; // -1 + } + + if (!isset($values['max_offset'])) { + $values['max_offset'] = 100000; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::pack(self::BIT_B32, $values['max_offset']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeOffsetTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeCommitOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; + } + + if (!isset($values['metadata'])) { + $values['metadata'] = 'm'; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::encodeString($values['metadata'], self::PACK_INT16); + + return $data; + } + + // }}} + // {{{ protected static function _encodeCommitOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php new file mode 100644 index 00000000..8424cf78 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php @@ -0,0 +1,119 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class CommitOffset extends HelperAbstract +{ + // {{{ members + + /** + * consumer group + * + * @var string + * @access protected + */ + protected $group = ''; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($client) + { + $this->client = $client; + } + + // }}} + // {{{ public function setGroup() + + /** + * set consumer group + * + * @access public + * @return void + */ + public function setGroup($group) + { + $this->group = $group; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); + $offsetObject->setOffset($offset); + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php new file mode 100644 index 00000000..acf0223e --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php @@ -0,0 +1,39 @@ +<?php +namespace Kafka\Protocol\Fetch\Helper; +/** + * Description of Consumer + * + * @author daniel + */ +class Consumer extends HelperAbstract +{ + protected $consumer; + + protected $offsetStrategy; + + + public function __construct(\Kafka\Consumer $consumer) + { + $this->consumer = $consumer; + } + + + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $this->consumer->setFromOffset(true); + $this->consumer->setPartition($topicName, $partitionId, ($offset +1)); + } + + public function onStreamEof($streamKey) + { + + } + + public function onTopicEof($topicName) + { + + } +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php new file mode 100644 index 00000000..bba38dd6 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php @@ -0,0 +1,117 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class FreeStream extends HelperAbstract +{ + // {{{ members + + /** + * streams + * + * @var array + * @access protected + */ + protected $streams = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @access public + * @return void + */ + public function __construct($client) + { + $this->client = $client; + } + + // }}} + // {{{ public function setStreams() + + /** + * set streams + * + * @access public + * @return void + */ + public function setStreams($streams) + { + $this->streams = $streams; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + if (isset($this->streams[$streamKey])) { + $this->client->freeStream($streamKey); + } + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php new file mode 100644 index 00000000..4ec23926 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php @@ -0,0 +1,160 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Helper +{ + // {{{ members + + /** + * helper object + */ + private static $helpers = array(); + + // }}} + // {{{ functions + // {{{ public staitc function registerHelper() + + /** + * register helper + * + * @param string $key + * @param \Kafka\Protocol\Fetch\Helper\HelperAbstract $helper + * @static + * @access public + * @return void + */ + public static function registerHelper($key, $helper = null) + { + if (is_null($helper)) { + $className = '\\Kafka\\Protocol\\Fetch\\Helper\\' . $key; + if (!class_exists($className)) { + throw new \Kafka\Exception('helper is not exists.'); + } + $helper = new $className(); + } + + if ($helper instanceof \Kafka\Protocol\Fetch\Helper\HelperAbstract) { + self::$helpers[$key] = $helper; + } else { + throw new \Kafka\Exception('this helper not instance of `\Kafka\Protocol\Fetch\Helper\HelperAbstract`'); + } + } + + // }}} + // {{{ public staitc function unRegisterHelper() + + /** + * unregister helper + * + * @param string $key + * @static + * @access public + * @return void + */ + public static function unRegisterHelper($key) + { + if (isset(self::$helpers[$key])) { + unset(self::$helpers[$key]); + } + } + + // }}} + // {{{ public static function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @static + * @access public + * @return void + */ + public static function onStreamEof($streamKey) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onStreamEof')) { + $helper->onStreamEof($streamKey); + } + } + } + + // }}} + // {{{ public static function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @static + * @access public + * @return void + */ + public static function onTopicEof($topicName) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onTopicEof')) { + $helper->onStreamEof($topicName); + } + } + } + + // }}} + // {{{ public static function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @static + * @access public + * @return void + */ + public static function onPartitionEof($partition) + { + if (empty(self::$helpers)) { + return false; + } + + foreach (self::$helpers as $key => $helper) { + if (method_exists($helper, 'onPartitionEof')) { + $helper->onPartitionEof($partition); + } + } + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php new file mode 100644 index 00000000..476f3da1 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php @@ -0,0 +1,71 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch\Helper; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +abstract class HelperAbstract +{ + // {{{ members + // }}} + // {{{ functions + // {{{ abstract public function onStreamEof() + + /** + * on stream eof + * + * @param string $streamKey + * @access public + * @return void + */ + abstract public function onStreamEof($streamKey); + + // }}} + // {{{ abstract public function onTopicEof() + + /** + * on topic eof + * + * @param string $topicName + * @access public + * @return void + */ + abstract public function onTopicEof($topicName); + + // }}} + // {{{ abstract public function onPartitionEof() + + /** + * on partition eof + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + abstract public function onPartitionEof($partition); + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php new file mode 100644 index 00000000..42d7da1d --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php @@ -0,0 +1,175 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Message +{ + // {{{ members + + /** + * init read bytes + * + * @var float + * @access private + */ + private $initOffset = 0; + + /** + * validByteCount + * + * @var float + * @access private + */ + private $validByteCount = 0; + + /** + * crc32 code + * + * @var float + * @access private + */ + private $crc = 0; + + /** + * This is a version id used to allow backwards compatible evolution of the + * message binary format. + * + * @var float + * @access private + */ + private $magic = 0; + + /** + * The lowest 2 bits contain the compression codec used for the message. The + * other bits should be set to 0. + * + * @var float + * @access private + */ + private $attribute = 0; + + /** + * message key + * + * @var string + * @access private + */ + private $key = ''; + + /** + * message value + * + * @var string + * @access private + */ + private $value = ''; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param string(raw) $msg + * @access public + * @return void + */ + public function __construct($msg) + { + $offset = 0; + $crc = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $offset += 4; + $this->crc = array_shift($crc); + $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->magic = array_shift($magic); + $offset += 1; + $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1)); + $this->attribute = array_shift($attr); + $offset += 1; + $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $keyLen = array_shift($keyLen); + $offset += 4; + if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) { + $this->key = substr($msg, $offset, $keyLen); + $offset += $keyLen; + } + $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4)); + $messageSize = array_shift($messageSize); + $offset += 4; + if ($messageSize) { + $this->value = substr($msg, $offset, $messageSize); + } + } + + // }}} + // {{{ public function getMessage() + + /** + * get message data + * + * @access public + * @return string (raw) + */ + public function getMessage() + { + return $this->value; + } + + // }}} + // {{{ public function getMessageKey() + + /** + * get message key + * + * @access public + * @return string (raw) + */ + public function getMessageKey() + { + return $this->key; + } + + // }}} + // {{{ public function __toString() + + /** + * __toString + * + * @access public + * @return void + */ + public function __toString() + { + return $this->value; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php new file mode 100644 index 00000000..50413b67 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php @@ -0,0 +1,269 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class MessageSet implements \Iterator +{ + // {{{ members + + /** + * kafka socket object + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * messageSet size + * + * @var float + * @access private + */ + private $messageSetSize = 0; + + /** + * validByteCount + * + * @var float + * @access private + */ + private $validByteCount = 0; + + /** + * messageSet offset + * + * @var float + * @access private + */ + private $offset = 0; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * partition object + * + * @var \Kafka\Protocol\Fetch\Partition + * @access private + */ + private $partition = null; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @param int $initOffset + * @access public + * @return void + */ + public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array()) + { + $this->stream = $partition->getStream(); + $this->partition = $partition; + $this->context = $context; + $this->messageSetSize = $this->getMessageSetSize(); + \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->validByteCount; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + if (!$this->valid) { + $this->partition->setMessageOffset($this->offset); + + // one partition iterator end + \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition); + } + + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextMessage(); + } + + // }}} + // {{{ protected function getMessageSetSize() + + /** + * get message set size + * + * @access protected + * @return integer + */ + protected function getMessageSetSize() + { + // read message size + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $size = array_shift($data); + if ($size <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size'); + } + + return $size; + } + + // }}} + // {{{ public function loadNextMessage() + + /** + * load next message + * + * @access public + * @return void + */ + public function loadNextMessage() + { + if ($this->validByteCount >= $this->messageSetSize) { + return false; + } + + try { + if ($this->validByteCount + 12 > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + $messageSize = $this->stream->read(4, true); + $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize); + $messageSize = array_shift($messageSize); + $this->validByteCount += 12; + if (($this->validByteCount + $messageSize) > $this->messageSetSize) { + // read socket buffer dirty data + $this->stream->read($this->messageSetSize - $this->validByteCount); + return false; + } + $msg = $this->stream->read($messageSize, true); + $this->current = new Message($msg); + } catch (\Kafka\Exception $e) { + \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO); + return false; + } + + $this->validByteCount += $messageSize; + + return true; + } + + // }}} + // {{{ public function messageOffset() + + /** + * current message offset in producer + * + * @return void + */ + public function messageOffset() + { + return $this->offset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php new file mode 100644 index 00000000..9f8578d5 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php @@ -0,0 +1,375 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Partition implements \Iterator, \Countable +{ + // {{{ members + + /** + * kafka socket object + * + * @var mixed + * @access private + */ + private $stream = null; + + /** + * validCount + * + * @var float + * @access private + */ + private $validCount = 0; + + /** + * partitions count + * + * @var float + * @access private + */ + private $partitionCount = false; + + /** + * current topic + * + * @var mixed + * @access private + */ + private $current = null; + + /** + * current iterator key + * partition id + * + * @var string + * @access private + */ + private $key = null; + + /** + * partition errCode + * + * @var float + * @access private + */ + private $errCode = 0; + + /** + * partition offset + * + * @var float + * @access private + */ + private $offset = 0; + + /** + * partition current fetch offset + * + * @var float + * @access private + */ + private $currentOffset = 0; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * cuerrent topic name + * + * @var string + * @access private + */ + private $topicName = ''; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Protocol\Fetch\Topic $topic + * @param int $initOffset + * @access public + * @return void + */ + public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = array()) + { + $this->stream = $topic->getStream(); + $this->topicName = $topic->key(); + $this->context = $context; + $this->partitionCount = $this->getPartitionCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid && $this->validCount <= $this->partitionCount; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextPartition(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->partitionCount; + } + + // }}} + // {{{ public function getErrCode() + + /** + * get partition errcode + * + * @access public + * @return void + */ + public function getErrCode() + { + return $this->errCode; + } + + // }}} + // {{{ public function getHighOffset() + + /** + * get partition high offset + * + * @access public + * @return void + */ + public function getHighOffset() + { + return $this->offset; + } + + // }}} + // {{{ public function getTopicName() + + /** + * get partition topic name + * + * @access public + * @return void + */ + public function getTopicName() + { + return $this->topicName; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + return $this->stream; + } + + // }}} + // {{{ protected function getPartitionCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getPartitionCount() + { + // read topic count + $data = $this->stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $count = array_shift($data); + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count'); + } + + return $count; + } + + // }}} + // {{{ public function loadNextPartition() + + /** + * load next partition + * + * @access public + * @return void + */ + public function loadNextPartition() + { + if ($this->validCount >= $this->partitionCount) { + return false; + } + + try { + $partitionId = $this->stream->read(4, true); + $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId); + $partitionId = array_shift($partitionId); + \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO); + + $errCode = $this->stream->read(2, true); + $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode); + $this->errCode = array_shift($errCode); + if ($this->errCode != 0) { + throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode)); + } + $offset = $this->stream->read(8, true); + $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset); + + $this->key = $partitionId; + $this->current = new MessageSet($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + return true; + } + + // }}} + // {{{ public function setMessageOffset() + + /** + * set messageSet fetch offset current + * + * @param intger $offset + * @return void + */ + public function setMessageOffset($offset) + { + $this->currentOffset = $offset; + } + + // }}} + // {{{ public function getMessageOffset() + + /** + * get messageSet fetch offset current + * + * @return int + */ + public function getMessageOffset() + { + return $this->currentOffset; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php new file mode 100644 index 00000000..500e6b1f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php @@ -0,0 +1,345 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol\Fetch; + +use \Kafka\Protocol\Decoder; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +class Topic implements \Iterator, \Countable +{ + // {{{ members + + /** + * kafka socket object + * + * @var array + * @access private + */ + private $streams = array(); + + /** + * each topic count + * + * @var array + * @access private + */ + private $topicCounts = array(); + + /** + * current iterator stream + * + * @var mixed + * @access private + */ + private $currentStreamKey = 0; + + /** + * current lock key + * + * @var string + * @access private + */ + private $currentStreamLockKey = ''; + + /** + * currentStreamCount + * + * @var float + * @access private + */ + private $currentStreamCount = 0; + + /** + * validCount + * + * @var float + * @access private + */ + private $validCount = 0; + + /** + * topic count + * + * @var float + * @access private + */ + private $topicCount = false; + + /** + * current topic + * + * @var mixed + * @access private + */ + private $current = null; + + /** + * current iterator key + * topic name + * + * @var string + * @access private + */ + private $key = null; + + /** + * valid + * + * @var mixed + * @access private + */ + private $valid = false; + + /** + * request fetch context + * + * @var array + */ + private $context = array(); + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @param int $initOffset + * @access public + * @return void + */ + public function __construct($streams, $context = array()) + { + if (!is_array($streams)) { + $streams = array($streams); + } + $this->streams = $streams; + $topicInfos = array(); + foreach ($context as $values) { + if (!isset($values['data'])) { + continue; + } + + foreach ($values['data'] as $value) { + if (!isset($value['topic_name']) || !isset($value['partitions'])) { + continue; + } + + $topicName = $value['topic_name']; + foreach ($value['partitions'] as $part) { + $topicInfos[$topicName][$part['partition_id']] = array( + 'offset' => $part['offset'], + ); + } + } + } + $this->context = $topicInfos; + $this->topicCount = $this->getTopicCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return void + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return void + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function rewind() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function next() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->topicCount; + } + + // }}} + // {{{ protected function getTopicCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getTopicCount() + { + $count = 0; + foreach (array_values($this->streams) as $key => $stream) { + // read topic count + $stream->read(8, true); + $data = $stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $topicCount = array_shift($data); + $count += $topicCount; + $this->topicCounts[$key] = $topicCount; + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count'); + } + } + + return $count; + } + + // }}} + // {{{ public function loadNextTopic() + + /** + * load next topic + * + * @access public + * @return void + */ + public function loadNextTopic() + { + if ($this->validCount >= $this->topicCount) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + return false; + } + + if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + $this->currentStreamKey++; + } + + $lockKeys = array_keys($this->streams); + $streams = array_values($this->streams); + if (!isset($streams[$this->currentStreamKey])) { + return false; + } + + $stream = $streams[$this->currentStreamKey]; + $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey]; + + try { + $topicLen = $stream->read(2, true); + $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen); + $topicLen = array_shift($topicLen); + if ($topicLen <= 0) { + return false; + } + + // topic name + $this->key = $stream->read($topicLen, true); + $this->current = new Partition($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + $this->currentStreamCount++; + + return true; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + $streams = array_values($this->streams); + return $streams[$this->currentStreamKey]; + } + + // }}} + // }}} +} diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php new file mode 100644 index 00000000..a31067b5 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php @@ -0,0 +1,230 @@ +<?php +/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */ +// +--------------------------------------------------------------------------- +// | SWAN [ $_SWANBR_SLOGAN_$ ] +// +--------------------------------------------------------------------------- +// | Copyright $_SWANBR_COPYRIGHT_$ +// +--------------------------------------------------------------------------- +// | Version $_SWANBR_VERSION_$ +// +--------------------------------------------------------------------------- +// | Licensed ( $_SWANBR_LICENSED_URL_$ ) +// +--------------------------------------------------------------------------- +// | $_SWANBR_WEB_DOMAIN_$ +// +--------------------------------------------------------------------------- + +namespace Kafka\Protocol; + +/** ++------------------------------------------------------------------------------ +* Kafka protocol since Kafka v0.8 ++------------------------------------------------------------------------------ +* +* @package +* @version $_SWANBR_VERSION_$ +* @copyright Copyleft +* @author $_SWANBR_AUTHOR_$ ++------------------------------------------------------------------------------ +*/ + +abstract class Protocol +{ + // {{{ consts + + /** + * Kafka server protocol version + */ + const API_VERSION = 0; + + /** + * use encode message, This is a version id used to allow backwards + * compatible evolution of the message binary format. + */ + const MESSAGE_MAGIC = 0; + + /** + * message no compression + */ + const COMPRESSION_NONE = 0; + + /** + * Message using gzip compression + */ + const COMPRESSION_GZIP = 1; + + /** + * Message using Snappy compression + */ + const COMPRESSION_SNAPPY = 2; + + /** + * pack int32 type + */ + const PACK_INT32 = 0; + + /** + * pack int16 type + */ + const PACK_INT16 = 1; + + /** + * protocol request code + */ + const PRODUCE_REQUEST = 0; + const FETCH_REQUEST = 1; + const OFFSET_REQUEST = 2; + const METADATA_REQUEST = 3; + const OFFSET_COMMIT_REQUEST = 8; + const OFFSET_FETCH_REQUEST = 9; + const CONSUMER_METADATA_REQUEST = 10; + + // unpack/pack bit + const BIT_B64 = 'N2'; + const BIT_B32 = 'N'; + const BIT_B16 = 'n'; + const BIT_B8 = 'C'; + + // }}} + // {{{ members + + /** + * stream + * + * @var mixed + * @access protected + */ + protected $stream = null; + + // }}} + // {{{ functions + // {{{ public function __construct() + + /** + * __construct + * + * @param \Kafka\Socket $stream + * @access public + * @return void + */ + public function __construct(\Kafka\Socket $stream) + { + $this->stream = $stream; + } + + // }}} + // {{{ public static function Khex2bin() + + /** + * hex to bin + * + * @param string $string + * @static + * @access protected + * @return string (raw) + */ + public static function Khex2bin($string) + { + if (function_exists('\hex2bin')) { + return \hex2bin($string); + } else { + $bin = ''; + $len = strlen($string); + for ($i = 0; $i < $len; $i += 2) { + $bin .= pack('H*', substr($string, $i, 2)); + } + + return $bin; + } + } + + // }}} + // {{{ public static function unpack() + + /** + * Unpack a bit integer as big endian long + * + * @static + * @access public + * @return integer + */ + public static function unpack($type, $bytes) + { + self::checkLen($type, $bytes); + if ($type == self::BIT_B64) { + $set = unpack($type, $bytes); + $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF); + return $original; + } else { + return unpack($type, $bytes); + } + } + + // }}} + // {{{ public static function pack() + + /** + * pack a bit integer as big endian long + * + * @static + * @access public + * @return integer + */ + public static function pack($type, $data) + { + if ($type == self::BIT_B64) { + if ($data == -1) { // -1L + $data = self::Khex2bin('ffffffffffffffff'); + } elseif ($data == -2) { // -2L + $data = self::Khex2bin('fffffffffffffffe'); + } else { + $left = 0xffffffff00000000; + $right = 0x00000000ffffffff; + + $l = ($data & $left) >> 32; + $r = $data & $right; + $data = pack($type, $l, $r); + } + } else { + $data = pack($type, $data); + } + + return $data; + } + + // }}} + // {{{ protected static function checkLen() + + /** + * check unpack bit is valid + * + * @param string $type + * @param string(raw) $bytes + * @static + * @access protected + * @return void + */ + protected static function checkLen($type, $bytes) + { + $len = 0; + switch($type) { + case self::BIT_B64: + $len = 8; + break; + case self::BIT_B32: + $len = 4; + break; + case self::BIT_B16: + $len = 2; + break; + case self::BIT_B8: + $len = 1; + break; + } + + if (strlen($bytes) != $len) { + throw new \Kafka\Exception\Protocol('unpack failed. string(raw) length is ' . strlen($bytes) . ' , TO ' . $type); + } + } + + // }}} + // }}} +} |