From a1789ddde42033f1b05cc4929491214ee6e79383 Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Thu, 17 Dec 2015 09:15:42 +0100 Subject: Update to MediaWiki 1.26.0 --- .../nmred/kafka-php/src/Kafka/Protocol/Encoder.php | 652 +++++++++++++++++++++ 1 file changed, 652 insertions(+) create mode 100644 vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php (limited to 'vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php') diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php new file mode 100644 index 00000000..7d36e10f --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php @@ -0,0 +1,652 @@ +stream->write($data); + } + + // }}} + // {{{ public function metadataRequest() + + /** + * build metadata request protocol + * + * @param array $topics + * @access public + * @return string + */ + public function metadataRequest($topics) + { + if (!is_array($topics)) { + $topics = array($topics); + } + + foreach ($topics as $topic) { + if (!is_string($topic)) { + throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. '); + } + } + + $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST); + $data = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchRequest() + + /** + * build fetch request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + if (!isset($payloads['max_wait_time'])) { + $payloads['max_wait_time'] = 100; // default timeout 100ms + } + + if (!isset($payloads['min_bytes'])) { + $payloads['min_bytes'] = 64 * 1024; // 64k + } + + $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::pack(self::BIT_B32, $payloads['max_wait_time']); + $data .= self::pack(self::BIT_B32, $payloads['min_bytes']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function offsetRequest() + + /** + * build offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function offsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['replica_id'])) { + $payloads['replica_id'] = -1; + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST); + $data = self::pack(self::BIT_B32, $payloads['replica_id']); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function commitOffsetRequest() + + /** + * build consumer commit offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function commitOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public function fetchOffsetRequest() + + /** + * build consumer fetch offset request + * + * @param array $payloads + * @access public + * @return string + */ + public function fetchOffsetRequest($payloads) + { + if (!isset($payloads['data'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.'); + } + + if (!isset($payloads['group_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.'); + } + + $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST); + $data = self::encodeString($payloads['group_id'], self::PACK_INT16); + $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset')); + $data = self::encodeString($header . $data, self::PACK_INT32); + + return $this->stream->write($data); + } + + // }}} + // {{{ public static function encodeString() + + /** + * encode pack string type + * + * @param string $string + * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order. + * @static + * @access public + * @return string + */ + public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE) + { + $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16; + switch ($compression) { + case self::COMPRESSION_NONE: + break; + case self::COMPRESSION_GZIP: + $string = gzencode($string); + break; + case self::COMPRESSION_SNAPPY: + throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented'); + default: + throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression); + } + return self::pack($packLen, strlen($string)) . $string; + } + + // }}} + // {{{ public static function encodeArray() + + /** + * encode key array + * + * @param array $array + * @param Callable $func + * @static + * @access public + * @return string + */ + public static function encodeArray(array $array, $func, $options = null) + { + if (!is_callable($func, false)) { + throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.'); + } + + $arrayCount = count($array); + + $body = ''; + foreach ($array as $value) { + if (!is_null($options)) { + $body .= call_user_func($func, $value, $options); + } else { + $body .= call_user_func($func, $value); + } + } + + return self::pack(self::BIT_B32, $arrayCount) . $body; + } + + // }}} + // {{{ public static function encodeMessageSet() + + /** + * encode message set + * N.B., MessageSets are not preceded by an int32 like other array elements + * in the protocol. + * + * @param array $messages + * @static + * @access public + * @return string + */ + public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE) + { + if (!is_array($messages)) { + $messages = array($messages); + } + + $data = ''; + foreach ($messages as $message) { + $tmpMessage = self::_encodeMessage($message, $compression); + + // int64 -- message offset Message + $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32); + } + return $data; + } + + // }}} + // {{{ public static function requestHeader() + + /** + * get request header + * + * @param string $clientId + * @param integer $correlationId + * @param integer $apiKey + * @static + * @access public + * @return void + */ + public static function requestHeader($clientId, $correlationId, $apiKey) + { + // int16 -- apiKey int16 -- apiVersion int32 correlationId + $binData = self::pack(self::BIT_B16, $apiKey); + $binData .= self::pack(self::BIT_B16, self::API_VERSION); + $binData .= self::pack(self::BIT_B32, $correlationId); + + // concat client id + $binData .= self::encodeString($clientId, self::PACK_INT16); + + return $binData; + } + + // }}} + // {{{ protected static function _encodeMessage() + + /** + * encode signal message + * + * @param string $message + * @static + * @access protected + * @return string + */ + protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE) + { + // int8 -- magic int8 -- attribute + $data = self::pack(self::BIT_B8, self::MESSAGE_MAGIC); + $data .= self::pack(self::BIT_B8, $compression); + + // message key + $data .= self::encodeString('', self::PACK_INT32); + + // message value + $data .= self::encodeString($message, self::PACK_INT32, $compression); + + $crc = crc32($data); + + // int32 -- crc code string data + $message = self::pack(self::BIT_B32, $crc) . $data; + + return $message; + } + + // }}} + // {{{ protected static function _encodeProcudePartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudePartion($values, $compression) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['messages']) || empty($values['messages'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32); + + return $data; + } + + // }}} + // {{{ protected static function _encodeProcudeTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeProcudeTopic($values, $compression) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + $values['offset'] = 0; + } + + if (!isset($values['max_bytes'])) { + $values['max_bytes'] = 100 * 1024 * 1024; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B32, $values['max_bytes']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; // -1 + } + + if (!isset($values['max_offset'])) { + $values['max_offset'] = 100000; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::pack(self::BIT_B32, $values['max_offset']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeOffsetTopic() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeOffsetTopic($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeCommitOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.'); + } + + if (!isset($values['offset'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.'); + } + + if (!isset($values['time'])) { + $values['time'] = -1; + } + + if (!isset($values['metadata'])) { + $values['metadata'] = 'm'; + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + $data .= self::pack(self::BIT_B64, $values['offset']); + $data .= self::pack(self::BIT_B64, $values['time']); + $data .= self::encodeString($values['metadata'], self::PACK_INT16); + + return $data; + } + + // }}} + // {{{ protected static function _encodeCommitOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeCommitOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // {{{ protected static function _encodeFetchOffsetPartion() + + /** + * encode signal part + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffsetPartion($values) + { + if (!isset($values['partition_id'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.'); + } + + $data = self::pack(self::BIT_B32, $values['partition_id']); + + return $data; + } + + // }}} + // {{{ protected static function _encodeFetchOffset() + + /** + * encode signal topic + * + * @param partions + * @static + * @access protected + * @return string + */ + protected static function _encodeFetchOffset($values) + { + if (!isset($values['topic_name'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.'); + } + + if (!isset($values['partitions']) || empty($values['partitions'])) { + throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.'); + } + + $topic = self::encodeString($values['topic_name'], self::PACK_INT16); + $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion')); + + return $topic . $partitions; + } + + // }}} + // }}} +} -- cgit v1.2.3-54-g00ecf