summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php80
1 files changed, 14 insertions, 66 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 1ad687036..b8731d543 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -84,85 +84,33 @@ class StompQueueManager
. $notice->id . ' for ' . $transport);
}
- function nextItem($queue, $timeout=null)
+ function service($queue, $handler)
{
$result = null;
$this->_connect();
- $frame = $this->con->readFrame();
+ $this->con->setReadTimeout($handler->timeout());
- if ($frame) {
- $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
+ $this->con->subscribe($this->_queueName($queue));
- // XXX: Now the queue handler receives only the ID of the
- // notice, and it has to get it from the DB
- // A massive improvement would be avoid DB query by transmitting
- // all the notice details via queue server...
+ while (true) {
- $notice = Notice::staticGet($frame->body);
+ $frame = $this->con->readFrame();
- if ($notice) {
- $this->_saveFrame($notice, $queue, $frame);
- } else {
- $this->log(LOG_WARNING, 'queue item for notice that does not exist');
- }
- }
- }
-
- function done($object, $queue)
- {
- $notice = $object;
+ if ($frame) {
+ $notice = Notice::staticGet($frame->body);
- $this->_connect();
-
- $frame = $this->_getFrame($notice, $queue);
-
- if (empty($frame)) {
- $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue);
- } else {
- // if the msg has been handled positively, ack it
- // and the queue server will remove it from the queue
- $this->con->ack($frame);
- $this->_clearFrame($notice, $queue);
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+ $this->con->ack($frame);
+ }
+ }
- $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+ $handler->idle(0);
}
- }
-
- function fail($object, $queue)
- {
- $notice = $object;
-
- // STOMP server will requeue it after a while anyways,
- // so no need to notify. Just get it out of our little
- // array
-
- $this->_clearFrame($notice, $queue);
- }
-
- function _frameKey($notice, $queue)
- {
- return ((string)$notice->id) . '-' . $queue;
- }
- function _saveFrame($notice, $queue, $frame)
- {
- $k = $this->_frameKey($notice, $queue);
- $this->_frames[$k] = $frame;
- return true;
- }
-
- function _getFrame($notice, $queue)
- {
- $k = $this->_frameKey($notice, $queue);
- return $this->_frames[$k];
- }
-
- function _clearFrame($notice, $queue)
- {
- $k = $this->_frameKey($notice, $queue);
- unset($this->_frames[$k]);
+ $this->con->unsubscribe($this->_queueName($queue));
}
function _queueName($queue)