summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Prodromou <evan@prodromou.name>2008-08-30 20:32:10 -0400
committerEvan Prodromou <evan@prodromou.name>2008-08-30 20:32:10 -0400
commitd5f83d92521dbf838b1fc1fad2716efc3122c6b7 (patch)
tree5318e5c4ad04f8415b2d6d57cddcd03c85b6560d
parentea5b12916342910d9df18af69f82a4f3aedb6d4b (diff)
switch around how XMLStream does processing
darcs-hash:20080831003210-84dde-92ccffd5b2e1d50963b18babd93c70fb1d20cdba.gz
-rw-r--r--extlib/XMPPHP/XMLStream.php75
-rw-r--r--lib/queuehandler.php15
-rwxr-xr-xscripts/publicqueuehandler.php12
-rwxr-xr-xscripts/xmppconfirmhandler.php22
-rwxr-xr-xscripts/xmppqueuehandler.php11
5 files changed, 63 insertions, 72 deletions
diff --git a/extlib/XMPPHP/XMLStream.php b/extlib/XMPPHP/XMLStream.php
index 119016768..3f85ed0f8 100644
--- a/extlib/XMPPHP/XMLStream.php
+++ b/extlib/XMPPHP/XMLStream.php
@@ -310,24 +310,50 @@ class XMPPHP_XMLStream {
return $this->disconnected;
}
- private function __process() {
- $read = array($this->socket);
- $write = null;
- $except = null;
- $updated = @stream_select($read, $write, $except, 1);
- if ($updated > 0) {
- $buff = @fread($this->socket, 1024);
- if(!$buff) {
- if($this->reconnect) {
- $this->doReconnect();
- } else {
- fclose($this->socket);
- return false;
+ /**
+ * Core reading tool
+ * 0 -> only read if data is immediately ready
+ * NULL -> wait forever and ever
+ * integer -> process for this amount of time
+ */
+
+ private function __process($maximum=0) {
+
+ $remaining = $maximum;
+
+ do {
+ $starttime = microtime();
+ $read = array($this->socket);
+ $write = array();
+ $except = array();
+ if (is_null($maximum)) {
+ $secs = NULL;
+ $usecs = NULL;
+ } else if ($maximum == 0) {
+ $secs = 0;
+ $usecs = 0;
+ } else {
+ $secs = $remaining / 1000000;
+ $usecs = $remaining % 1000000;
+ }
+ $updated = @stream_select($read, $write, $except, $secs, $usecs);
+ if ($updated > 0) {
+ # XXX: Is this big enough?
+ $buff = @fread($this->socket, 4096);
+ if(!$buff) {
+ if($this->reconnect) {
+ $this->doReconnect();
+ } else {
+ fclose($this->socket);
+ return false;
+ }
}
+ $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE);
+ xml_parse($this->parser, $buff, false);
}
- $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE);
- xml_parse($this->parser, $buff, false);
- }
+ $remaining -= (microtime() - $starttime);
+ } while (is_null($maximum) || $remaining > 0);
+ return true;
}
/**
@@ -336,10 +362,7 @@ class XMPPHP_XMLStream {
* @return string
*/
public function process() {
- $updated = '';
- while(!$this->disconnect) {
- $this->__process();
- }
+ $this->__process(NULL);
}
/**
@@ -348,11 +371,11 @@ class XMPPHP_XMLStream {
* @param integer $timeout
* @return string
*/
- public function processTime($timeout = -1) {
- $start = time();
- $updated = '';
- while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) {
- $this->__process();
+ public function processTime($timeout=NULL) {
+ if (is_null($timeout)) {
+ return $this->__process(NULL);
+ } else {
+ return $this->__process($timeout * 1000000);
}
}
@@ -372,7 +395,7 @@ class XMPPHP_XMLStream {
reset($this->until);
$updated = '';
while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) {
- $this->__process();
+ $this->__process(0);
}
if(array_key_exists($event_key, $this->until_payload)) {
$payload = $this->until_payload[$event_key];
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index ba7a93ab2..3115ea38d 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -81,21 +81,18 @@ class QueueHandler {
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
}
$qi->delete();
- $this->idle();
+ $this->idle(0);
} else {
$this->clear_old_claims();
- $start = microtime();
- $this->idle();
- $used = microtime() - $start;
- if ($used < 1000000) {
- usleep(1000000 - $used);
- }
+ $this->idle(5);
}
} while (true);
}
- function idle() {
- return true;
+ function idle($timeout=0) {
+ if ($timeout>0) {
+ sleep($timeout);
+ }
}
function clear_old_claims() {
diff --git a/scripts/publicqueuehandler.php b/scripts/publicqueuehandler.php
index 9a7b6df5f..555298f6a 100755
--- a/scripts/publicqueuehandler.php
+++ b/scripts/publicqueuehandler.php
@@ -54,16 +54,8 @@ class PublicQueueHandler extends QueueHandler {
return jabber_public_notice($notice);
}
- function idle() {
- $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
- # Process the queue for a second
- if ($this->conn->readyToProcess()) {
- $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
- $this->conn->processTime(1);
- $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
- } else {
- $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
- }
+ function idle($timeout=0) {
+ $this->conn->processTime($timeout);
}
function forward_message(&$pl) {
diff --git a/scripts/xmppconfirmhandler.php b/scripts/xmppconfirmhandler.php
index 08a397fc4..7971198b1 100755
--- a/scripts/xmppconfirmhandler.php
+++ b/scripts/xmppconfirmhandler.php
@@ -86,16 +86,10 @@ class XmppConfirmHandler {
continue;
}
}
- $this->idle();
+ $this->idle(0);
} else {
# $this->clear_old_confirm_claims();
- $start = microtime();
- $this->idle();
- $used = microtime() - $start;
- if ($used < 10000000) {
- usleep(10000000 - $used);
- }
- sleep(10);
+ $this->idle(10);
}
} while (true);
}
@@ -137,16 +131,8 @@ class XmppConfirmHandler {
common_log($level, 'XmppConfirmHandler ('. $this->_id .'): '.$msg);
}
- function idle() {
- $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
- # Process the queue for a second
- if ($this->conn->readyToProcess()) {
- $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
- $this->conn->processTime(1);
- $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
- } else {
- $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
- }
+ function idle($timeout=0) {
+ $this->conn->processTime($timeout);
}
function forward_message(&$pl) {
diff --git a/scripts/xmppqueuehandler.php b/scripts/xmppqueuehandler.php
index c6f5c3f12..7f1e6c28f 100755
--- a/scripts/xmppqueuehandler.php
+++ b/scripts/xmppqueuehandler.php
@@ -56,16 +56,9 @@ class XmppQueueHandler extends QueueHandler {
return jabber_broadcast_notice($notice);
}
- function idle() {
- $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
+ function idle($timeout=0) {
# Process the queue for a second
- if ($this->conn->readyToProcess()) {
- $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
- $this->conn->processTime(1);
- $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
- } else {
- $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
- }
+ $this->conn->processTime($timeout);
}
function finish() {