diff options
Diffstat (limited to 'extlib/XMPPHP/XMLStream.php')
-rw-r--r-- | extlib/XMPPHP/XMLStream.php | 121 |
1 files changed, 70 insertions, 51 deletions
diff --git a/extlib/XMPPHP/XMLStream.php b/extlib/XMPPHP/XMLStream.php index 5e1d34235..e59e95902 100644 --- a/extlib/XMPPHP/XMLStream.php +++ b/extlib/XMPPHP/XMLStream.php @@ -253,29 +253,36 @@ class XMPPHP_XMLStream { * @param boolean $sendinit */ public function connect($timeout = 30, $persistent = false, $sendinit = true) { - $this->disconnected = false; - $this->sent_disconnect = false; - if($persistent) { - $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + $starttime = time(); + + do { + $this->disconnected = false; + $this->sent_disconnect = false; + if($persistent) { + $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + } else { + $conflag = STREAM_CLIENT_CONNECT; + } + $conntype = 'tcp'; + if($this->use_ssl) $conntype = 'ssl'; + $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); + try { + $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); + } catch (Exception $e) { + throw new XMPPHP_Exception($e->getMessage()); + } + if(!$this->socket) { + $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); + $this->disconnected = true; + } + } while (!$this->socket && (time() - $starttime) < $timeout); + + if ($this->socket) { + stream_set_blocking($this->socket, 1); + if($sendinit) $this->send($this->stream_start); } else { - $conflag = STREAM_CLIENT_CONNECT; - } - $conntype = 'tcp'; - if($this->use_ssl) $conntype = 'ssl'; - $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); - try { - $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); - } catch (Exception $e) { - throw new XMPPHP_Exception($e->getMessage()); - } - if(!$this->socket) { - $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); - $this->disconnected = true; - - throw new XMPPHP_Exception('Could not connect.'); + throw new XMPPHP_Exception("Could not connect before timeout."); } - stream_set_blocking($this->socket, 1); - if($sendinit) $this->send($this->stream_start); } /** @@ -286,6 +293,7 @@ class XMPPHP_XMLStream { $this->log->log("Reconnecting...", XMPPHP_Log::LEVEL_WARNING); $this->connect(30, false, false); $this->reset(); + $this->event('reconnect'); } } @@ -319,12 +327,10 @@ class XMPPHP_XMLStream { private function __process($maximum=0) { - $this->log->log("__process($maximum)", XMPPHP_Log::LEVEL_VERBOSE); - $remaining = $maximum; do { - $starttime = microtime(); + $starttime = (microtime(true) * 1000000); $read = array($this->socket); $write = array(); $except = array(); @@ -338,27 +344,25 @@ class XMPPHP_XMLStream { $usecs = $remaining % 1000000; $secs = floor(($remaining - $usecs) / 1000000); } - $this->log->log("stream_select(read, write, except, $secs, $usecs)", XMPPHP_Log::LEVEL_VERBOSE); $updated = @stream_select($read, $write, $except, $secs, $usecs); if ($updated === false) { $this->log->log("Error on stream_select()", XMPPHP_Log::LEVEL_VERBOSE); if ($this->reconnect) { $this->doReconnect(); } else { - $this->log->log("Giving up", XMPPHP_Log::LEVEL_VERBOSE); fclose($this->socket); + $this->socket = NULL; return false; } } else if ($updated > 0) { # XXX: Is this big enough? - $this->log->log("Reading from socket", XMPPHP_Log::LEVEL_VERBOSE); $buff = @fread($this->socket, 4096); if(!$buff) { if($this->reconnect) { $this->doReconnect(); } else { - $this->log->log("Error on fread(), reconnect", XMPPHP_Log::LEVEL_VERBOSE); fclose($this->socket); + $this->socket = NULL; return false; } } @@ -367,7 +371,9 @@ class XMPPHP_XMLStream { } else { # $updated == 0 means no changes during timeout. } - $remaining -= (microtime() - $starttime); + $endtime = (microtime(true)*1000000); + $time_past = $endtime - $starttime; + $remaining = $remaining - $time_past; } while (is_null($maximum) || $remaining > 0); return true; } @@ -595,34 +601,47 @@ class XMPPHP_XMLStream { * * @param string $msg */ - public function send($msg, $rec=false) { - if($this->time() - $this->last_send < .1) { - usleep(100000); + public function send($msg, $timeout=NULL) { + + if (is_null($timeout)) { + $secs = NULL; + $usecs = NULL; + } else if ($timeout == 0) { + $secs = 0; + $usecs = 0; + } else { + $maximum = $timeout * 1000000; + $usecs = $maximum % 1000000; + $secs = floor(($maximum - $usecs) / 1000000); } - $wait = true; - while($wait) { - $read = null; - $write = array($this->socket); - $except = null; - $select = @stream_select($read, $write, $except, 0, 0); - if($select === False) { - $this->doReconnect(); - return false; - } elseif ($select > 0) { - $wait = false; - } else { - usleep(100000); - //$this->processTime(.25); - } + + $read = array(); + $write = array($this->socket); + $except = array(); + + $select = @stream_select($read, $write, $except, $secs, $usecs); + + if($select === False) { + $this->log->log("ERROR sending message; reconnecting."); + $this->doReconnect(); + # TODO: retry send here + return false; + } elseif ($select > 0) { + $this->log->log("Socket is ready; send it."); + } else { + $this->log->log("Socket is not ready; break."); + return false; } - $sentbytes = @fwrite($this->socket, $msg, 1024); - $this->last_send = $this->time(); + + $sentbytes = @fwrite($this->socket, $msg); $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); if($sentbytes === FALSE) { + $this->log->log("ERROR sending message; reconnecting."); $this->doReconnect(); - } elseif ($sentbytes != mb_strlen($msg, '8bit')) { - $this->send(mb_substr($msg, $sentbytes, mb_strlen($msg, '8bit'), '8bit'), true); + return false; } + $this->log->log("Successfully sent $sentbytes bytes."); + return $sentbytes; } public function time() { |