diff options
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r-- | lib/stompqueuemanager.php | 73 |
1 files changed, 52 insertions, 21 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 4d89806f8..7da7c0011 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,6 +30,14 @@ require_once 'Stomp.php'; +class LiberalStomp extends Stomp +{ + function getSocket() + { + return $this->_socket; + } +} + class StompQueueManager { var $server = null; @@ -50,7 +58,7 @@ class StompQueueManager { if (empty($this->con)) { $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); - $this->con = new Stomp($this->server); + $this->con = new LiberalStomp($this->server); if ($this->con->connect($this->username, $this->password)) { $this->_log(LOG_INFO, "Connected."); @@ -94,34 +102,57 @@ class StompQueueManager while (true) { - $frame = $this->con->readFrame(); + // Wait for something on one of our sockets - if (!empty($frame)) { - $notice = Notice::staticGet('id', $frame->body); + $stompsock = $this->con->getSocket(); - if (empty($notice)) { - $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); - $this->con->ack($frame); - } else { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + $handsocks = $handler->getSockets(); + + $this->_log(LOG_DEBUG, "Got ".count($handsocks)." sockets from handler."); + $this->_log(LOG_DEBUG, print_r($handsocks, true)); + + $socks = array_merge(array($stompsock), $handsocks); + + $read = $socks; + $write = array(); + $except = array(); + + $this->_log(LOG_DEBUG, "Starting select"); + $ready = stream_select($read, $write, $except, $handler->timeout(), 0); + $this->_log(LOG_DEBUG, "Finished select with value '$ready'"); + + if (!$ready || $read[0] !== $stompsock) { + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $frame = $this->con->readFrame(); + + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); + + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); $this->con->ack($frame); } else { - $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves - $this->con->ack($frame); - $this->enqueue($notice, $queue); + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + $this->con->ack($frame); + } else { + $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves + $this->con->ack($frame); + $this->enqueue($notice, $queue); + } + unset($notice); } - unset($notice); - } - unset($frame); + unset($frame); - $handler->idle(QUEUE_HANDLER_HIT_IDLE); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); - } else { - $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } } } |