summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php38
1 files changed, 25 insertions, 13 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 6496b5cf1..00590fdb6 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -125,25 +125,28 @@ class StompQueueManager extends QueueManager
}
/**
- * Saves an object into the queue item table.
+ * Saves a notice object reference into the queue item table.
* @return boolean true on success
*/
public function enqueue($object, $queue)
{
- $msg = serialize($object);
+ $notice = $object;
$this->_connect();
+ // XXX: serialize and send entire notice
+
$result = $this->con->send($this->queueName($queue),
- $msg, // BODY of the message
- array ('created' => $timestamp));
+ $notice->id, // BODY of the message
+ array ('created' => $notice->created));
if (!$result) {
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
return false;
}
- common_log(LOG_DEBUG, "complete remote queueing $log for $queue");
+ common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
+ . $notice->id . ' for ' . $queue);
$this->stats('enqueued', $queue);
}
@@ -171,7 +174,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleItem($frame);
+ $ok = $ok && $this->_handleNotice($frame);
}
return $ok;
}
@@ -262,10 +265,10 @@ class StompQueueManager extends QueueManager
}
/**
- * Handle and acknowledge an event that's come in through a queue.
+ * Handle and acknowledge a notice event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
- * Missing objects or handler classes will drop the message.
+ * Missing notices or handler classes will drop the message.
*
* Side effects: in multi-site mode, may reset site configuration to
* match the site that queued the event.
@@ -273,15 +276,24 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleItem($frame)
+ protected function _handleNotice($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
$this->stats('switch');
StatusNet::init($site);
}
- $info = "object posted at {$frame->headers['created']} in queue $queue";
- $item = unserialize($frame->body);
+
+ $id = intval($frame->body);
+ $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+
+ $notice = Notice::staticGet('id', $id);
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->con->ack($frame);
+ $this->stats('badnotice', $queue);
+ return false;
+ }
$handler = $this->getHandler($queue);
if (!$handler) {
@@ -291,7 +303,7 @@ class StompQueueManager extends QueueManager
return false;
}
- $ok = $handler->handle($item);
+ $ok = $handler->handle_notice($notice);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
@@ -299,7 +311,7 @@ class StompQueueManager extends QueueManager
// this kind of queue management ourselves;
// if we don't ack, it should resend...
$this->con->ack($frame);
- $this->enqueue($item, $queue);
+ $this->enqueue($notice, $queue);
$this->stats('requeued', $queue);
return false;
}