diff options
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r-- | lib/stompqueuemanager.php | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 6496b5cf1..00590fdb6 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -125,25 +125,28 @@ class StompQueueManager extends QueueManager } /** - * Saves an object into the queue item table. + * Saves a notice object reference into the queue item table. * @return boolean true on success */ public function enqueue($object, $queue) { - $msg = serialize($object); + $notice = $object; $this->_connect(); + // XXX: serialize and send entire notice + $result = $this->con->send($this->queueName($queue), - $msg, // BODY of the message - array ('created' => $timestamp)); + $notice->id, // BODY of the message + array ('created' => $notice->created)); if (!$result) { common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); return false; } - common_log(LOG_DEBUG, "complete remote queueing $log for $queue"); + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' + . $notice->id . ' for ' . $queue); $this->stats('enqueued', $queue); } @@ -171,7 +174,7 @@ class StompQueueManager extends QueueManager $ok = true; $frames = $this->con->readFrames(); foreach ($frames as $frame) { - $ok = $ok && $this->_handleItem($frame); + $ok = $ok && $this->_handleNotice($frame); } return $ok; } @@ -262,10 +265,10 @@ class StompQueueManager extends QueueManager } /** - * Handle and acknowledge an event that's come in through a queue. + * Handle and acknowledge a notice event that's come in through a queue. * * If the queue handler reports failure, the message is requeued for later. - * Missing objects or handler classes will drop the message. + * Missing notices or handler classes will drop the message. * * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. @@ -273,15 +276,24 @@ class StompQueueManager extends QueueManager * @param StompFrame $frame * @return bool */ - protected function _handleItem($frame) + protected function _handleNotice($frame) { list($site, $queue) = $this->parseDestination($frame->headers['destination']); if ($site != common_config('site', 'server')) { $this->stats('switch'); StatusNet::init($site); } - $info = "object posted at {$frame->headers['created']} in queue $queue"; - $item = unserialize($frame->body); + + $id = intval($frame->body); + $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; + + $notice = Notice::staticGet('id', $id); + if (empty($notice)) { + $this->_log(LOG_WARNING, "Skipping missing $info"); + $this->con->ack($frame); + $this->stats('badnotice', $queue); + return false; + } $handler = $this->getHandler($queue); if (!$handler) { @@ -291,7 +303,7 @@ class StompQueueManager extends QueueManager return false; } - $ok = $handler->handle($item); + $ok = $handler->handle_notice($notice); if (!$ok) { $this->_log(LOG_WARNING, "Failed handling $info"); @@ -299,7 +311,7 @@ class StompQueueManager extends QueueManager // this kind of queue management ourselves; // if we don't ack, it should resend... $this->con->ack($frame); - $this->enqueue($item, $queue); + $this->enqueue($notice, $queue); $this->stats('requeued', $queue); return false; } |