summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php38
1 files changed, 13 insertions, 25 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 00590fdb6..6496b5cf1 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -125,28 +125,25 @@ class StompQueueManager extends QueueManager
}
/**
- * Saves a notice object reference into the queue item table.
+ * Saves an object into the queue item table.
* @return boolean true on success
*/
public function enqueue($object, $queue)
{
- $notice = $object;
+ $msg = serialize($object);
$this->_connect();
- // XXX: serialize and send entire notice
-
$result = $this->con->send($this->queueName($queue),
- $notice->id, // BODY of the message
- array ('created' => $notice->created));
+ $msg, // BODY of the message
+ array ('created' => $timestamp));
if (!$result) {
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
return false;
}
- common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
- . $notice->id . ' for ' . $queue);
+ common_log(LOG_DEBUG, "complete remote queueing $log for $queue");
$this->stats('enqueued', $queue);
}
@@ -174,7 +171,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleNotice($frame);
+ $ok = $ok && $this->_handleItem($frame);
}
return $ok;
}
@@ -265,10 +262,10 @@ class StompQueueManager extends QueueManager
}
/**
- * Handle and acknowledge a notice event that's come in through a queue.
+ * Handle and acknowledge an event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
- * Missing notices or handler classes will drop the message.
+ * Missing objects or handler classes will drop the message.
*
* Side effects: in multi-site mode, may reset site configuration to
* match the site that queued the event.
@@ -276,24 +273,15 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleNotice($frame)
+ protected function _handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
$this->stats('switch');
StatusNet::init($site);
}
-
- $id = intval($frame->body);
- $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
-
- $notice = Notice::staticGet('id', $id);
- if (empty($notice)) {
- $this->_log(LOG_WARNING, "Skipping missing $info");
- $this->con->ack($frame);
- $this->stats('badnotice', $queue);
- return false;
- }
+ $info = "object posted at {$frame->headers['created']} in queue $queue";
+ $item = unserialize($frame->body);
$handler = $this->getHandler($queue);
if (!$handler) {
@@ -303,7 +291,7 @@ class StompQueueManager extends QueueManager
return false;
}
- $ok = $handler->handle_notice($notice);
+ $ok = $handler->handle($item);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
@@ -311,7 +299,7 @@ class StompQueueManager extends QueueManager
// this kind of queue management ourselves;
// if we don't ack, it should resend...
$this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ $this->enqueue($item, $queue);
$this->stats('requeued', $queue);
return false;
}