diff options
author | Brion Vibber <brion@pobox.com> | 2010-02-08 16:43:37 -0800 |
---|---|---|
committer | Brion Vibber <brion@pobox.com> | 2010-02-10 12:27:41 -0800 |
commit | 045797331c82b86e03c61f00f4db68a085688520 (patch) | |
tree | 96d31110682e81c732d8529f62b86b3bf5a0da25 | |
parent | c4557d4d0700c09742b9d2e002c2d2b0161558f3 (diff) |
fix up hub queueing to work w/ stomp queues
-rw-r--r-- | lib/queuemanager.php | 36 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 26 | ||||
-rw-r--r-- | plugins/OStatus/lib/hubdistribqueuehandler.php | 1 | ||||
-rw-r--r-- | plugins/OStatus/lib/huboutqueuehandler.php | 2 |
4 files changed, 34 insertions, 31 deletions
diff --git a/lib/queuemanager.php b/lib/queuemanager.php index afe710e88..149617eb5 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -155,26 +155,26 @@ abstract class QueueManager extends IoManager } /** - * Encode an object for queued storage. - * Next gen may use serialization. + * Encode an object or variable for queued storage. + * Notice objects are currently stored as an id reference; + * other items are serialized. * - * @param mixed $object + * @param mixed $item * @return string */ - protected function encode($object) + protected function encode($item) { - if ($object instanceof Notice) { - return $object->id; - } else if (is_string($object)) { - return $object; + if ($item instanceof Notice) { + // Backwards compat + return $item->id; } else { - throw new ServerException("Can't queue this type", 500); + return serialize($item); } } /** * Decode an object from queued storage. - * Accepts back-compat notice reference entries and strings for now. + * Accepts notice reference entries and serialized items. * * @param string * @return mixed @@ -182,9 +182,23 @@ abstract class QueueManager extends IoManager protected function decode($frame) { if (is_numeric($frame)) { + // Back-compat for notices... return Notice::staticGet(intval($frame)); - } else { + } elseif (substr($frame, 0, 1) == '<') { + // Back-compat for XML source return $frame; + } else { + // Deserialize! + #$old = error_reporting(); + #error_reporting($old & ~E_NOTICE); + $out = unserialize($frame); + #error_reporting($old); + + if ($out === false && $frame !== 'b:0;') { + common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame"); + return false; + } + return $out; } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index cc4c817d8..cd62c25bd 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -549,26 +549,14 @@ class StompQueueManager extends QueueManager } $host = $this->cons[$idx]->getServer(); - if (is_numeric($frame->body)) { - $id = intval($frame->body); - $info = "notice $id posted at {$frame->headers['created']} in queue $queue from $host"; - - $notice = Notice::staticGet('id', $id); - if (empty($notice)) { - $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); - $this->stats('badnotice', $queue); - return false; - } - - $item = $notice; - } else { - // @fixme should we serialize, or json, or what here? - $info = "string posted at {$frame->headers['created']} in queue $queue from $host"; - $item = $frame->body; + $item = $this->decode($frame->body); + if (empty($item)) { + $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); + return true; } + $info = $this->logrep($item) . " posted at " . + $frame->headers['created'] . " in queue $queue from $host"; + $this->_log(LOG_DEBUG, "Dequeued $info"); $handler = $this->getHandler($queue); if (!$handler) { diff --git a/plugins/OStatus/lib/hubdistribqueuehandler.php b/plugins/OStatus/lib/hubdistribqueuehandler.php index 189ccbedf..de3a81385 100644 --- a/plugins/OStatus/lib/hubdistribqueuehandler.php +++ b/plugins/OStatus/lib/hubdistribqueuehandler.php @@ -56,6 +56,7 @@ class HubDistribQueueHandler extends QueueHandler } else { common_log(LOG_INFO, "No PuSH subscribers for $feed"); } + return true; } function pushGroup($notice, $group_id) diff --git a/plugins/OStatus/lib/huboutqueuehandler.php b/plugins/OStatus/lib/huboutqueuehandler.php index cb44ad2c4..0791c7e5d 100644 --- a/plugins/OStatus/lib/huboutqueuehandler.php +++ b/plugins/OStatus/lib/huboutqueuehandler.php @@ -43,7 +43,7 @@ class HubOutQueueHandler extends QueueHandler common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " . $e->getMessage()); // @fixme Reschedule a later delivery? - // Currently we have no way to do this other than 'send NOW' + return true; } return true; |