diff options
Diffstat (limited to 'lib/dbqueuemanager.php')
-rw-r--r-- | lib/dbqueuemanager.php | 95 |
1 files changed, 31 insertions, 64 deletions
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 889365b64..139f50234 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -31,19 +31,17 @@ class DBQueueManager extends QueueManager { /** - * Saves a notice object reference into the queue item table. + * Saves an object into the queue item table. * @return boolean true on success * @throws ServerException on failure */ public function enqueue($object, $queue) { - $notice = $object; - $qi = new Queue_item(); - $qi->notice_id = $notice->id; + $qi->frame = serialize($object); $qi->transport = $queue; - $qi->created = $notice->created; + $qi->created = common_sql_now(); $result = $qi->insert(); if (!$result) { @@ -73,34 +71,35 @@ class DBQueueManager extends QueueManager */ public function poll() { - $this->_log(LOG_DEBUG, 'Checking for notices...'); - $item = $this->_nextItem(); - if ($item === false) { - $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + $this->_log(LOG_DEBUG, 'Checking for queued objects...'); + $qi = $this->_nextItem(); + if ($qi === false) { + $this->_log(LOG_DEBUG, 'No queue items waiting; idling.'); return false; } - if ($item === true) { - // We dequeued an entry for a deleted or invalid notice. + if ($qi === true) { + // We dequeued an entry for a deleted or invalid object. // Consider it a hit for poll rate purposes. return true; } - list($queue, $notice) = $item; - $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue); + $queue = $qi->transport; + $object = unserialize($qi->frame); + $this->_log(LOG_INFO, 'Got item id=' . $qi->id . ' for transport ' . $queue); // Yay! Got one! $handler = $this->getHandler($queue); if ($handler) { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice"); - $this->_done($notice, $queue); + if ($handler->handle($object)) { + $this->_log(LOG_INFO, "[$queue] Successfully handled object"); + $this->_done($qi); } else { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice"); - $this->_fail($notice, $queue); + $this->_log(LOG_INFO, "[$queue] Failed to handle object"); + $this->_fail($qi); } } else { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding."); - $this->_done($notice, $queue); + $this->_log(LOG_INFO, "[$queue] No handler for queue $queue; discarding."); + $this->_done($qi); } return true; } @@ -108,8 +107,7 @@ class DBQueueManager extends QueueManager /** * Pop the oldest unclaimed item off the queue set and claim it. * - * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice) - * giving the queue transport name. + * @return mixed false if no items; true if bogus hit; otherwise Queue_item */ protected function _nextItem() { @@ -121,70 +119,42 @@ class DBQueueManager extends QueueManager return false; } - $queue = $qi->transport; - $notice = Notice::staticGet('id', $qi->notice_id); - if (empty($notice)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice"); - $qi->delete(); - return true; - } - - $result = $notice; - return array($queue, $notice); + return $qi; } /** * Delete our claimed item from the queue after successful processing. * - * @param Notice $object - * @param string $queue + * @param QueueItem $qi */ - protected function _done($object, $queue) + protected function _done($qi) { - // XXX: right now, we only handle notices - - $notice = $object; - - $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, - 'transport' => $queue)); - if (empty($qi)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); + $this->_log(LOG_INFO, "_done passed an empty queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item"); + $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item"); } $qi->delete(); $qi->free(); } - $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item"); - $this->stats('handled', $queue); - - $notice->free(); + $this->_log(LOG_INFO, "done with item"); } /** * Free our claimed queue item for later reprocessing in case of * temporary failure. * - * @param Notice $object - * @param string $queue + * @param QueueItem $qi */ - protected function _fail($object, $queue) + protected function _fail($qi) { - // XXX: right now, we only handle notices - - $notice = $object; - - $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, - 'transport' => $queue)); - if (empty($qi)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); + $this->_log(LOG_INFO, "_fail passed an empty queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item"); + $this->_log(LOG_WARNING, "Ignoring failure for unclaimed queue item"); } else { $orig = clone($qi); $qi->claimed = null; @@ -193,10 +163,7 @@ class DBQueueManager extends QueueManager } } - $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item"); - $this->stats('error', $queue); - - $notice->free(); + $this->_log(LOG_INFO, "done with queue item"); } protected function _log($level, $msg) |