summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
authorEvan Prodromou <evan@controlyourself.ca>2009-07-09 12:09:20 -0400
committerEvan Prodromou <evan@controlyourself.ca>2009-07-09 12:09:20 -0400
commit03200235b1a1bba56c24e1bb659023ba24265eb3 (patch)
tree44f921927809cdf03b2f3c4b9ee2f89b9b226486 /lib/stompqueuemanager.php
parent1daad01f362d72e6ed89f415546220016c32404a (diff)
use select() to bring down xmpp latency
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php73
1 files changed, 52 insertions, 21 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 4d89806f8..7da7c0011 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -30,6 +30,14 @@
require_once 'Stomp.php';
+class LiberalStomp extends Stomp
+{
+ function getSocket()
+ {
+ return $this->_socket;
+ }
+}
+
class StompQueueManager
{
var $server = null;
@@ -50,7 +58,7 @@ class StompQueueManager
{
if (empty($this->con)) {
$this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
- $this->con = new Stomp($this->server);
+ $this->con = new LiberalStomp($this->server);
if ($this->con->connect($this->username, $this->password)) {
$this->_log(LOG_INFO, "Connected.");
@@ -94,34 +102,57 @@ class StompQueueManager
while (true) {
- $frame = $this->con->readFrame();
+ // Wait for something on one of our sockets
- if (!empty($frame)) {
- $notice = Notice::staticGet('id', $frame->body);
+ $stompsock = $this->con->getSocket();
- if (empty($notice)) {
- $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
- $this->con->ack($frame);
- } else {
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ $handsocks = $handler->getSockets();
+
+ $this->_log(LOG_DEBUG, "Got ".count($handsocks)." sockets from handler.");
+ $this->_log(LOG_DEBUG, print_r($handsocks, true));
+
+ $socks = array_merge(array($stompsock), $handsocks);
+
+ $read = $socks;
+ $write = array();
+ $except = array();
+
+ $this->_log(LOG_DEBUG, "Starting select");
+ $ready = stream_select($read, $write, $except, $handler->timeout(), 0);
+ $this->_log(LOG_DEBUG, "Finished select with value '$ready'");
+
+ if (!$ready || $read[0] !== $stompsock) {
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $frame = $this->con->readFrame();
+
+ if (!empty($frame)) {
+ $notice = Notice::staticGet('id', $frame->body);
+
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
$this->con->ack($frame);
} else {
- $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
- // FIXME we probably shouldn't have to do
- // this kind of queue management ourselves
- $this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ $this->con->ack($frame);
+ } else {
+ $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ // FIXME we probably shouldn't have to do
+ // this kind of queue management ourselves
+ $this->con->ack($frame);
+ $this->enqueue($notice, $queue);
+ }
+ unset($notice);
}
- unset($notice);
- }
- unset($frame);
+ unset($frame);
- $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ $handler->idle(QUEUE_HANDLER_HIT_IDLE);
- } else {
- $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ }
}
}