From e5b758dbbef6774943abf453a43114a2c3371b4a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 28 Jun 2009 14:38:31 -0400 Subject: start of queuemanager code --- lib/dbqueuemanager.php | 106 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 lib/dbqueuemanager.php (limited to 'lib/dbqueuemanager.php') diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php new file mode 100644 index 000000000..c0d4dcd29 --- /dev/null +++ b/lib/dbqueuemanager.php @@ -0,0 +1,106 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class DBQueueManager extends QueueManager +{ + var $qis = array(); + + function enqueue($object, $queue) + { + $notice = (Notice)$object; + + $qi = new Queue_item(); + + $qi->notice_id = $notice->id; + $qi->transport = $queue; + $qi->created = $notice->created; + $result = $qi->insert(); + + if (!$result) { + common_log_db_error($qi, 'INSERT', __FILE__); + throw new ServerException('DB error inserting queue item'); + } + + return true; + } + + function nextItem($queue, $timeout=null) + { + $start = time(); + $result = null; + + do { + $qi = Queue_item::top($queue); + if (!empty($qi)) { + $notice = Notice::staticGet('id', $qi->notice_id); + if (!empty($notice)) { + $result = $notice; + } else { + $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); + $qi->delete(); + $qi->free(); + $qi = null; + } + } + } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + + return $result; + } + + function done($object, $queue) + { + $notice = (Notice)$object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } + $qi->delete(); + $qi->free(); + $qi = null; + } + + $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _log($level, $msg) + { + common_log($level, 'DBQueueManager: '.$msg); + } +} -- cgit v1.2.3-54-g00ecf From 58b427869a001a91d66cff497f1563b8277f1a67 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:09:42 -0400 Subject: compile errors in DBQueueManager --- lib/dbqueuemanager.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'lib/dbqueuemanager.php') diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c0d4dcd29..46be54b30 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -34,7 +34,7 @@ class DBQueueManager extends QueueManager function enqueue($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $qi = new Queue_item(); @@ -76,7 +76,9 @@ class DBQueueManager extends QueueManager function done($object, $queue) { - $notice = (Notice)$object; + // XXX: right now, we only handle notices + + $notice = $object; $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, 'transport' => $queue)); -- cgit v1.2.3-54-g00ecf From a35138b2684ec5275a1ffd7badfe7826cf2173b1 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:10:25 -0400 Subject: add fail() method to dbqueuemanager and fix logging --- lib/dbqueuemanager.php | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'lib/dbqueuemanager.php') diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 46be54b30..c9e5ef243 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -84,10 +84,10 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); } else { if (empty($qi->claimed)) { - $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. 'for '.$notice->id.', queue '.$queue); } $qi->delete(); @@ -95,7 +95,36 @@ class DBQueueManager extends QueueManager $qi = null; } - $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function fail($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } else { + $orig = clone($qi); + $qi->claimed = null; + $qi->update($orig); + $qi = null; + } + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); $notice->free(); $notice = null; -- cgit v1.2.3-54-g00ecf From 49c5c6f92bc1d06e6464eade81eead891d86f10d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 00:31:28 -0400 Subject: move handling code into queuemanager --- lib/dbqueuemanager.php | 35 +++++++++++++++++++-- lib/queuehandler.php | 30 ++---------------- lib/queuemanager.php | 19 ++--------- lib/stompqueuemanager.php | 80 +++++++++-------------------------------------- 4 files changed, 51 insertions(+), 113 deletions(-) (limited to 'lib/dbqueuemanager.php') diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c9e5ef243..6e7172de0 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager return true; } - function nextItem($queue, $timeout=null) + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $notice = $this->_nextItem($queue, null); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) { $start = time(); $result = null; @@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager return $result; } - function done($object, $queue) + function _done($object, $queue) { // XXX: right now, we only handle notices @@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager $notice = null; } - function fail($object, $queue) + function _fail($object, $queue) { // XXX: right now, we only handle notices diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ddb47a28e..c0f38f4e3 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php'); define('CLAIM_TIMEOUT', 1200); define('QUEUE_HANDLER_MISS_IDLE', 10); -define('QUEUE_HANDLER_HIT_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 0); class QueueHandler extends Daemon { @@ -42,7 +42,7 @@ class QueueHandler extends Daemon function timeout() { - return null; + return 60; } function class_name() @@ -96,31 +96,7 @@ class QueueHandler extends Daemon $qm = QueueManager::get(); - while (true) { - $this->log(LOG_DEBUG, 'Checking for notices...'); - $notice = $qm->nextItem($queue, $timeout); - if (empty($notice)) { - $this->log(LOG_DEBUG, 'No notices waiting; idling.'); - // Nothing in the queue. Do you - // have other tasks, like servicing your - // XMPP connection, to do? - $this->idle(QUEUE_HANDLER_MISS_IDLE); - } else { - $this->log(LOG_INFO, 'Got notice '. $notice->id); - // Yay! Got one! - if ($this->handle_notice($notice)) { - $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id); - $qm->done($notice, $queue); - } else { - $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id); - $qm->fail($notice, $queue); - } - // Chance to e.g. service your XMPP connection - $this->log(LOG_DEBUG, 'Idling after success.'); - $this->idle(QUEUE_HANDLER_HIT_IDLE); - } - // XXX: when do we give up? - } + $qm->service($queue, $this); if (!$this->finish()) { return false; diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 1bf4d4dec..f36e99d16 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -67,23 +67,8 @@ class QueueManager throw ServerException("Unimplemented function 'enqueue' called"); } - function peek($queue) + function service($queue, $handler) { - throw ServerException("Unimplemented function 'peek' called"); - } - - function nextItem($queue, $timeout=null) - { - throw ServerException("Unimplemented function 'nextItem' called"); - } - - function done($object, $queue) - { - throw ServerException("Unimplemented function 'done' called"); - } - - function fail($object, $queue) - { - throw ServerException("Unimplemented function 'fail' called"); + throw ServerException("Unimplemented function 'service' called"); } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 1ad687036..b8731d543 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -84,85 +84,33 @@ class StompQueueManager . $notice->id . ' for ' . $transport); } - function nextItem($queue, $timeout=null) + function service($queue, $handler) { $result = null; $this->_connect(); - $frame = $this->con->readFrame(); + $this->con->setReadTimeout($handler->timeout()); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + $this->con->subscribe($this->_queueName($queue)); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + while (true) { - $notice = Notice::staticGet($frame->body); + $frame = $this->con->readFrame(); - if ($notice) { - $this->_saveFrame($notice, $queue, $frame); - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - } - } - - function done($object, $queue) - { - $notice = $object; + if ($frame) { + $notice = Notice::staticGet($frame->body); - $this->_connect(); - - $frame = $this->_getFrame($notice, $queue); - - if (empty($frame)) { - $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); - } else { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $this->con->ack($frame); - $this->_clearFrame($notice, $queue); + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->con->ack($frame); + } + } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + $handler->idle(0); } - } - - function fail($object, $queue) - { - $notice = $object; - - // STOMP server will requeue it after a while anyways, - // so no need to notify. Just get it out of our little - // array - - $this->_clearFrame($notice, $queue); - } - - function _frameKey($notice, $queue) - { - return ((string)$notice->id) . '-' . $queue; - } - function _saveFrame($notice, $queue, $frame) - { - $k = $this->_frameKey($notice, $queue); - $this->_frames[$k] = $frame; - return true; - } - - function _getFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - return $this->_frames[$k]; - } - - function _clearFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - unset($this->_frames[$k]); + $this->con->unsubscribe($this->_queueName($queue)); } function _queueName($queue) -- cgit v1.2.3-54-g00ecf