host = $host; $this->port = $port; $this->setRecvTimeoutSec($recvTimeoutSec); $this->setRecvTimeoutUsec($recvTimeoutUsec); $this->setSendTimeoutSec($sendTimeoutSec); $this->setSendTimeoutUsec($sendTimeoutUsec); } /** * @param float $sendTimeoutSec */ public function setSendTimeoutSec($sendTimeoutSec) { $this->sendTimeoutSec = $sendTimeoutSec; } /** * @param float $sendTimeoutUsec */ public function setSendTimeoutUsec($sendTimeoutUsec) { $this->sendTimeoutUsec = $sendTimeoutUsec; } /** * @param float $recvTimeoutSec */ public function setRecvTimeoutSec($recvTimeoutSec) { $this->recvTimeoutSec = $recvTimeoutSec; } /** * @param float $recvTimeoutUsec */ public function setRecvTimeoutUsec($recvTimeoutUsec) { $this->recvTimeoutUsec = $recvTimeoutUsec; } // }}} // {{{ public static function createFromStream() /** * Optional method to set the internal stream handle * * @static * @access public * @return void */ public static function createFromStream($stream) { $socket = new self('localhost', 0); $socket->setStream($stream); return $socket; } // }}} // {{{ public function setStream() /** * Optional method to set the internal stream handle * * @param mixed $stream * @access public * @return void */ public function setStream($stream) { $this->stream = $stream; } // }}} // {{{ public function connect() /** * Connects the socket * * @access public * @return void */ public function connect() { if (is_resource($this->stream)) { return false; } if (empty($this->host)) { throw new \Kafka\Exception('Cannot open null host.'); } if ($this->port <= 0) { throw new \Kafka\Exception('Cannot open without port.'); } $this->stream = @fsockopen( $this->host, $this->port, $errno, $errstr, $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000) ); if ($this->stream == false) { $error = 'Could not connect to ' . $this->host . ':' . $this->port . ' ('.$errstr.' ['.$errno.'])'; throw new \Kafka\Exception\SocketConnect($error); } stream_set_blocking($this->stream, 0); } // }}} // {{{ public function close() /** * close the socket * * @access public * @return void */ public function close() { if (is_resource($this->stream)) { fclose($this->stream); } } // }}} // {{{ public function read() /** * Read from the socket at most $len bytes. * * This method will not wait for all the requested data, it will return as * soon as any data is received. * * @param integer $len Maximum number of bytes to read. * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len * * @return string Binary data * @throws Kafka_Exception_Socket */ public function read($len, $verifyExactLength = false) { if ($len > self::READ_MAX_LEN) { throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream, length too longer.'); } $null = null; $read = array($this->stream); $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); if ($readable > 0) { $remainingBytes = $len; $data = $chunk = ''; while ($remainingBytes > 0) { $chunk = fread($this->stream, $remainingBytes); if ($chunk === false) { $this->close(); throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)'); } if (strlen($chunk) === 0) { // Zero bytes because of EOF? if (feof($this->stream)) { $this->close(); throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)'); } // Otherwise wait for bytes $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); if ($readable !== 1) { throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go'); } continue; // attempt another read } $data .= $chunk; $remainingBytes -= strlen($chunk); } if ($len === $remainingBytes || ($verifyExactLength && $len !== strlen($data))) { // couldn't read anything at all OR reached EOF sooner than expected $this->close(); throw new \Kafka\Exception\SocketEOF('Read ' . strlen($data) . ' bytes instead of the requested ' . $len . ' bytes'); } return $data; } if (false !== $readable) { $res = stream_get_meta_data($this->stream); if (!empty($res['timed_out'])) { $this->close(); throw new \Kafka\Exception\SocketTimeout('Timed out reading '.$len.' bytes from stream'); } } $this->close(); throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (not readable)'); } // }}} // {{{ public function write() /** * Write to the socket. * * @param string $buf The data to write * * @return integer * @throws Kafka_Exception_Socket */ public function write($buf) { $null = null; $write = array($this->stream); // fwrite to a socket may be partial, so loop until we // are done with the entire buffer $written = 0; $buflen = strlen($buf); while ( $written < $buflen ) { // wait for stream to become available for writing $writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec); if ($writable > 0) { // write remaining buffer bytes to stream $wrote = fwrite($this->stream, substr($buf, $written)); if ($wrote === -1 || $wrote === false) { throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' bytes'); } $written += $wrote; continue; } if (false !== $writable) { $res = stream_get_meta_data($this->stream); if (!empty($res['timed_out'])) { throw new \Kafka\Exception\SocketTimeout('Timed out writing ' . strlen($buf) . ' bytes to stream after writing ' . $written . ' bytes'); } } throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream'); } return $written; } // }}} // {{{ public function rewind() /** * Rewind the stream * * @return void */ public function rewind() { if (is_resource($this->stream)) { rewind($this->stream); } } // }}} // }}} }