summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Prodromou <evan@controlyourself.ca>2009-07-09 12:09:20 -0400
committerEvan Prodromou <evan@controlyourself.ca>2009-07-09 12:09:20 -0400
commit03200235b1a1bba56c24e1bb659023ba24265eb3 (patch)
tree44f921927809cdf03b2f3c4b9ee2f89b9b226486
parent1daad01f362d72e6ed89f415546220016c32404a (diff)
use select() to bring down xmpp latency
-rw-r--r--lib/jabber.php10
-rw-r--r--lib/queuehandler.php5
-rw-r--r--lib/stompqueuemanager.php73
-rw-r--r--lib/xmppqueuehandler.php5
4 files changed, 71 insertions, 22 deletions
diff --git a/lib/jabber.php b/lib/jabber.php
index 7d584ad01..e15076160 100644
--- a/lib/jabber.php
+++ b/lib/jabber.php
@@ -77,6 +77,14 @@ function jabber_daemon_address()
return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server');
}
+class Sharing_XMPP extends XMPPHP_XMPP
+{
+ function getSocket()
+ {
+ return $this->socket;
+ }
+}
+
/**
* connect the configured Jabber account to the configured server
*
@@ -89,7 +97,7 @@ function jabber_connect($resource=null)
{
static $conn = null;
if (!$conn) {
- $conn = new XMPPHP_XMPP(common_config('xmpp', 'host') ?
+ $conn = new Sharing_XMPP(common_config('xmpp', 'host') ?
common_config('xmpp', 'host') :
common_config('xmpp', 'server'),
common_config('xmpp', 'port'),
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index c0f38f4e3..c2ff10f32 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -115,5 +115,10 @@ class QueueHandler extends Daemon
{
common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
}
+
+ function getSockets()
+ {
+ return array();
+ }
}
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 4d89806f8..7da7c0011 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -30,6 +30,14 @@
require_once 'Stomp.php';
+class LiberalStomp extends Stomp
+{
+ function getSocket()
+ {
+ return $this->_socket;
+ }
+}
+
class StompQueueManager
{
var $server = null;
@@ -50,7 +58,7 @@ class StompQueueManager
{
if (empty($this->con)) {
$this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
- $this->con = new Stomp($this->server);
+ $this->con = new LiberalStomp($this->server);
if ($this->con->connect($this->username, $this->password)) {
$this->_log(LOG_INFO, "Connected.");
@@ -94,34 +102,57 @@ class StompQueueManager
while (true) {
- $frame = $this->con->readFrame();
+ // Wait for something on one of our sockets
- if (!empty($frame)) {
- $notice = Notice::staticGet('id', $frame->body);
+ $stompsock = $this->con->getSocket();
- if (empty($notice)) {
- $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
- $this->con->ack($frame);
- } else {
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ $handsocks = $handler->getSockets();
+
+ $this->_log(LOG_DEBUG, "Got ".count($handsocks)." sockets from handler.");
+ $this->_log(LOG_DEBUG, print_r($handsocks, true));
+
+ $socks = array_merge(array($stompsock), $handsocks);
+
+ $read = $socks;
+ $write = array();
+ $except = array();
+
+ $this->_log(LOG_DEBUG, "Starting select");
+ $ready = stream_select($read, $write, $except, $handler->timeout(), 0);
+ $this->_log(LOG_DEBUG, "Finished select with value '$ready'");
+
+ if (!$ready || $read[0] !== $stompsock) {
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $frame = $this->con->readFrame();
+
+ if (!empty($frame)) {
+ $notice = Notice::staticGet('id', $frame->body);
+
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
$this->con->ack($frame);
} else {
- $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
- // FIXME we probably shouldn't have to do
- // this kind of queue management ourselves
- $this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ $this->con->ack($frame);
+ } else {
+ $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ // FIXME we probably shouldn't have to do
+ // this kind of queue management ourselves
+ $this->con->ack($frame);
+ $this->enqueue($notice, $queue);
+ }
+ unset($notice);
}
- unset($notice);
- }
- unset($frame);
+ unset($frame);
- $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ $handler->idle(QUEUE_HANDLER_HIT_IDLE);
- } else {
- $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ }
}
}
diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php
index cbcfe5e1c..77d476c30 100644
--- a/lib/xmppqueuehandler.php
+++ b/lib/xmppqueuehandler.php
@@ -126,4 +126,9 @@ class XmppQueueHandler extends QueueHandler
return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
}
}
+
+ function getSockets()
+ {
+ return array($this->conn->getSocket());
+ }
}