summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/dbqueuemanager.php35
-rw-r--r--lib/queuehandler.php30
-rw-r--r--lib/queuemanager.php19
-rw-r--r--lib/stompqueuemanager.php80
4 files changed, 51 insertions, 113 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php
index c9e5ef243..6e7172de0 100644
--- a/lib/dbqueuemanager.php
+++ b/lib/dbqueuemanager.php
@@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager
return true;
}
- function nextItem($queue, $timeout=null)
+ function service($queue, $handler)
+ {
+ while (true) {
+ $this->_log(LOG_DEBUG, 'Checking for notices...');
+ $notice = $this->_nextItem($queue, null);
+ if (empty($notice)) {
+ $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+ // Nothing in the queue. Do you
+ // have other tasks, like servicing your
+ // XMPP connection, to do?
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $this->_log(LOG_INFO, 'Got notice '. $notice->id);
+ // Yay! Got one!
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+ $this->_done($notice, $queue);
+ } else {
+ $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
+ $this->_fail($notice, $queue);
+ }
+ // Chance to e.g. service your XMPP connection
+ $this->_log(LOG_DEBUG, 'Idling after success.');
+ $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ }
+ // XXX: when do we give up?
+ }
+ }
+
+ function _nextItem($queue, $timeout=null)
{
$start = time();
$result = null;
@@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager
return $result;
}
- function done($object, $queue)
+ function _done($object, $queue)
{
// XXX: right now, we only handle notices
@@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager
$notice = null;
}
- function fail($object, $queue)
+ function _fail($object, $queue)
{
// XXX: right now, we only handle notices
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index ddb47a28e..c0f38f4e3 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php');
define('CLAIM_TIMEOUT', 1200);
define('QUEUE_HANDLER_MISS_IDLE', 10);
-define('QUEUE_HANDLER_HIT_IDLE', 10);
+define('QUEUE_HANDLER_HIT_IDLE', 0);
class QueueHandler extends Daemon
{
@@ -42,7 +42,7 @@ class QueueHandler extends Daemon
function timeout()
{
- return null;
+ return 60;
}
function class_name()
@@ -96,31 +96,7 @@ class QueueHandler extends Daemon
$qm = QueueManager::get();
- while (true) {
- $this->log(LOG_DEBUG, 'Checking for notices...');
- $notice = $qm->nextItem($queue, $timeout);
- if (empty($notice)) {
- $this->log(LOG_DEBUG, 'No notices waiting; idling.');
- // Nothing in the queue. Do you
- // have other tasks, like servicing your
- // XMPP connection, to do?
- $this->idle(QUEUE_HANDLER_MISS_IDLE);
- } else {
- $this->log(LOG_INFO, 'Got notice '. $notice->id);
- // Yay! Got one!
- if ($this->handle_notice($notice)) {
- $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id);
- $qm->done($notice, $queue);
- } else {
- $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id);
- $qm->fail($notice, $queue);
- }
- // Chance to e.g. service your XMPP connection
- $this->log(LOG_DEBUG, 'Idling after success.');
- $this->idle(QUEUE_HANDLER_HIT_IDLE);
- }
- // XXX: when do we give up?
- }
+ $qm->service($queue, $this);
if (!$this->finish()) {
return false;
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index 1bf4d4dec..f36e99d16 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -67,23 +67,8 @@ class QueueManager
throw ServerException("Unimplemented function 'enqueue' called");
}
- function peek($queue)
+ function service($queue, $handler)
{
- throw ServerException("Unimplemented function 'peek' called");
- }
-
- function nextItem($queue, $timeout=null)
- {
- throw ServerException("Unimplemented function 'nextItem' called");
- }
-
- function done($object, $queue)
- {
- throw ServerException("Unimplemented function 'done' called");
- }
-
- function fail($object, $queue)
- {
- throw ServerException("Unimplemented function 'fail' called");
+ throw ServerException("Unimplemented function 'service' called");
}
}
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 1ad687036..b8731d543 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -84,85 +84,33 @@ class StompQueueManager
. $notice->id . ' for ' . $transport);
}
- function nextItem($queue, $timeout=null)
+ function service($queue, $handler)
{
$result = null;
$this->_connect();
- $frame = $this->con->readFrame();
+ $this->con->setReadTimeout($handler->timeout());
- if ($frame) {
- $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
+ $this->con->subscribe($this->_queueName($queue));
- // XXX: Now the queue handler receives only the ID of the
- // notice, and it has to get it from the DB
- // A massive improvement would be avoid DB query by transmitting
- // all the notice details via queue server...
+ while (true) {
- $notice = Notice::staticGet($frame->body);
+ $frame = $this->con->readFrame();
- if ($notice) {
- $this->_saveFrame($notice, $queue, $frame);
- } else {
- $this->log(LOG_WARNING, 'queue item for notice that does not exist');
- }
- }
- }
-
- function done($object, $queue)
- {
- $notice = $object;
+ if ($frame) {
+ $notice = Notice::staticGet($frame->body);
- $this->_connect();
-
- $frame = $this->_getFrame($notice, $queue);
-
- if (empty($frame)) {
- $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue);
- } else {
- // if the msg has been handled positively, ack it
- // and the queue server will remove it from the queue
- $this->con->ack($frame);
- $this->_clearFrame($notice, $queue);
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+ $this->con->ack($frame);
+ }
+ }
- $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+ $handler->idle(0);
}
- }
-
- function fail($object, $queue)
- {
- $notice = $object;
-
- // STOMP server will requeue it after a while anyways,
- // so no need to notify. Just get it out of our little
- // array
-
- $this->_clearFrame($notice, $queue);
- }
-
- function _frameKey($notice, $queue)
- {
- return ((string)$notice->id) . '-' . $queue;
- }
- function _saveFrame($notice, $queue, $frame)
- {
- $k = $this->_frameKey($notice, $queue);
- $this->_frames[$k] = $frame;
- return true;
- }
-
- function _getFrame($notice, $queue)
- {
- $k = $this->_frameKey($notice, $queue);
- return $this->_frames[$k];
- }
-
- function _clearFrame($notice, $queue)
- {
- $k = $this->_frameKey($notice, $queue);
- unset($this->_frames[$k]);
+ $this->con->unsubscribe($this->_queueName($queue));
}
function _queueName($queue)