summaryrefslogtreecommitdiff
path: root/lib/dbqueuemanager.php
diff options
context:
space:
mode:
authorEvan Prodromou <evan@status.net>2010-01-12 23:25:25 -0800
committerEvan Prodromou <evan@status.net>2010-01-12 23:25:25 -0800
commite34ece8b0630394b606b57b19de58c830fad9c4d (patch)
treed6354a6e02517dd75c32f40b1d08a4b9cffd9b11 /lib/dbqueuemanager.php
parent392bc728c795dd50e19a56cb097ab9a64f129573 (diff)
parentec145b73fc91dd54695dd374c8a71a11e233b8c0 (diff)
Merge branch '0.9.x' of git@gitorious.org:statusnet/mainline into 0.9.x
Diffstat (limited to 'lib/dbqueuemanager.php')
-rw-r--r--lib/dbqueuemanager.php161
1 files changed, 97 insertions, 64 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php
index 750300928..a5c6fd28b 100644
--- a/lib/dbqueuemanager.php
+++ b/lib/dbqueuemanager.php
@@ -22,16 +22,20 @@
* @category QueueManager
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
- * @copyright 2009 StatusNet, Inc.
+ * @author Brion Vibber <brion@status.net>
+ * @copyright 2009-2010 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
* @link http://status.net/
*/
class DBQueueManager extends QueueManager
{
- var $qis = array();
-
- function enqueue($object, $queue)
+ /**
+ * Saves a notice object reference into the queue item table.
+ * @return boolean true on success
+ * @throws ServerException on failure
+ */
+ public function enqueue($object, $queue)
{
$notice = $object;
@@ -47,70 +51,95 @@ class DBQueueManager extends QueueManager
throw new ServerException('DB error inserting queue item');
}
+ $this->stats('enqueued', $queue);
+
return true;
}
- function service($queue, $handler)
+ /**
+ * Poll every minute for new events during idle periods.
+ * We'll look in more often when there's data available.
+ *
+ * @return int seconds
+ */
+ public function pollInterval()
+ {
+ return 60;
+ }
+
+ /**
+ * Run a polling cycle during idle processing in the input loop.
+ * @return boolean true if we had a hit
+ */
+ public function poll()
{
- while (true) {
- $this->_log(LOG_DEBUG, 'Checking for notices...');
- $timeout = $handler->timeout();
- $notice = $this->_nextItem($queue, $timeout);
- if (empty($notice)) {
- $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
- // Nothing in the queue. Do you
- // have other tasks, like servicing your
- // XMPP connection, to do?
- $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ $this->_log(LOG_DEBUG, 'Checking for notices...');
+ $item = $this->_nextItem();
+ if ($item === false) {
+ $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+ return false;
+ }
+ if ($item === true) {
+ // We dequeued an entry for a deleted or invalid notice.
+ // Consider it a hit for poll rate purposes.
+ return true;
+ }
+
+ list($queue, $notice) = $item;
+ $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
+
+ // Yay! Got one!
+ $handler = $this->getHandler($queue);
+ if ($handler) {
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
+ $this->_done($notice, $queue);
} else {
- $this->_log(LOG_INFO, 'Got notice '. $notice->id);
- // Yay! Got one!
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
- $this->_done($notice, $queue);
- } else {
- $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
- $this->_fail($notice, $queue);
- }
- // Chance to e.g. service your XMPP connection
- $this->_log(LOG_DEBUG, 'Idling after success.');
- $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
+ $this->_fail($notice, $queue);
}
- // XXX: when do we give up?
+ } else {
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue");
+ $this->_fail($notice, $queue);
}
+ return true;
}
- function _nextItem($queue, $timeout=null)
+ /**
+ * Pop the oldest unclaimed item off the queue set and claim it.
+ *
+ * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
+ * giving the queue transport name.
+ */
+ protected function _nextItem()
{
$start = time();
$result = null;
- $sleeptime = 1;
+ $qi = Queue_item::top();
+ if (empty($qi)) {
+ return false;
+ }
- do {
- $qi = Queue_item::top($queue);
- if (empty($qi)) {
- $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds.");
- sleep($sleeptime);
- $sleeptime *= 2;
- } else {
- $notice = Notice::staticGet('id', $qi->notice_id);
- if (!empty($notice)) {
- $result = $notice;
- } else {
- $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
- $qi->delete();
- $qi->free();
- $qi = null;
- }
- $sleeptime = 1;
- }
- } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
+ $queue = $qi->transport;
+ $notice = Notice::staticGet('id', $qi->notice_id);
+ if (empty($notice)) {
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
+ $qi->delete();
+ return true;
+ }
- return $result;
+ $result = $notice;
+ return array($queue, $notice);
}
- function _done($object, $queue)
+ /**
+ * Delete our claimed item from the queue after successful processing.
+ *
+ * @param Notice $object
+ * @param string $queue
+ */
+ protected function _done($object, $queue)
{
// XXX: right now, we only handle notices
@@ -120,24 +149,29 @@ class DBQueueManager extends QueueManager
'transport' => $queue));
if (empty($qi)) {
- $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
} else {
if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
- 'for '.$notice->id.', queue '.$queue);
+ $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
}
$qi->delete();
$qi->free();
- $qi = null;
}
- $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
+ $this->stats('handled', $queue);
$notice->free();
- $notice = null;
}
- function _fail($object, $queue)
+ /**
+ * Free our claimed queue item for later reprocessing in case of
+ * temporary failure.
+ *
+ * @param Notice $object
+ * @param string $queue
+ */
+ protected function _fail($object, $queue)
{
// XXX: right now, we only handle notices
@@ -147,11 +181,10 @@ class DBQueueManager extends QueueManager
'transport' => $queue));
if (empty($qi)) {
- $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
} else {
if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
- 'for '.$notice->id.', queue '.$queue);
+ $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
} else {
$orig = clone($qi);
$qi->claimed = null;
@@ -160,13 +193,13 @@ class DBQueueManager extends QueueManager
}
}
- $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+ $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
+ $this->stats('error', $queue);
$notice->free();
- $notice = null;
}
- function _log($level, $msg)
+ protected function _log($level, $msg)
{
common_log($level, 'DBQueueManager: '.$msg);
}