summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-02-08 16:43:37 -0800
committerBrion Vibber <brion@pobox.com>2010-02-10 12:27:41 -0800
commit045797331c82b86e03c61f00f4db68a085688520 (patch)
tree96d31110682e81c732d8529f62b86b3bf5a0da25
parentc4557d4d0700c09742b9d2e002c2d2b0161558f3 (diff)
fix up hub queueing to work w/ stomp queues
-rw-r--r--lib/queuemanager.php36
-rw-r--r--lib/stompqueuemanager.php26
-rw-r--r--plugins/OStatus/lib/hubdistribqueuehandler.php1
-rw-r--r--plugins/OStatus/lib/huboutqueuehandler.php2
4 files changed, 34 insertions, 31 deletions
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index afe710e88..149617eb5 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -155,26 +155,26 @@ abstract class QueueManager extends IoManager
}
/**
- * Encode an object for queued storage.
- * Next gen may use serialization.
+ * Encode an object or variable for queued storage.
+ * Notice objects are currently stored as an id reference;
+ * other items are serialized.
*
- * @param mixed $object
+ * @param mixed $item
* @return string
*/
- protected function encode($object)
+ protected function encode($item)
{
- if ($object instanceof Notice) {
- return $object->id;
- } else if (is_string($object)) {
- return $object;
+ if ($item instanceof Notice) {
+ // Backwards compat
+ return $item->id;
} else {
- throw new ServerException("Can't queue this type", 500);
+ return serialize($item);
}
}
/**
* Decode an object from queued storage.
- * Accepts back-compat notice reference entries and strings for now.
+ * Accepts notice reference entries and serialized items.
*
* @param string
* @return mixed
@@ -182,9 +182,23 @@ abstract class QueueManager extends IoManager
protected function decode($frame)
{
if (is_numeric($frame)) {
+ // Back-compat for notices...
return Notice::staticGet(intval($frame));
- } else {
+ } elseif (substr($frame, 0, 1) == '<') {
+ // Back-compat for XML source
return $frame;
+ } else {
+ // Deserialize!
+ #$old = error_reporting();
+ #error_reporting($old & ~E_NOTICE);
+ $out = unserialize($frame);
+ #error_reporting($old);
+
+ if ($out === false && $frame !== 'b:0;') {
+ common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame");
+ return false;
+ }
+ return $out;
}
}
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index cc4c817d8..cd62c25bd 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -549,26 +549,14 @@ class StompQueueManager extends QueueManager
}
$host = $this->cons[$idx]->getServer();
- if (is_numeric($frame->body)) {
- $id = intval($frame->body);
- $info = "notice $id posted at {$frame->headers['created']} in queue $queue from $host";
-
- $notice = Notice::staticGet('id', $id);
- if (empty($notice)) {
- $this->_log(LOG_WARNING, "Skipping missing $info");
- $this->ack($idx, $frame);
- $this->commit($idx);
- $this->begin($idx);
- $this->stats('badnotice', $queue);
- return false;
- }
-
- $item = $notice;
- } else {
- // @fixme should we serialize, or json, or what here?
- $info = "string posted at {$frame->headers['created']} in queue $queue from $host";
- $item = $frame->body;
+ $item = $this->decode($frame->body);
+ if (empty($item)) {
+ $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
+ return true;
}
+ $info = $this->logrep($item) . " posted at " .
+ $frame->headers['created'] . " in queue $queue from $host";
+ $this->_log(LOG_DEBUG, "Dequeued $info");
$handler = $this->getHandler($queue);
if (!$handler) {
diff --git a/plugins/OStatus/lib/hubdistribqueuehandler.php b/plugins/OStatus/lib/hubdistribqueuehandler.php
index 189ccbedf..de3a81385 100644
--- a/plugins/OStatus/lib/hubdistribqueuehandler.php
+++ b/plugins/OStatus/lib/hubdistribqueuehandler.php
@@ -56,6 +56,7 @@ class HubDistribQueueHandler extends QueueHandler
} else {
common_log(LOG_INFO, "No PuSH subscribers for $feed");
}
+ return true;
}
function pushGroup($notice, $group_id)
diff --git a/plugins/OStatus/lib/huboutqueuehandler.php b/plugins/OStatus/lib/huboutqueuehandler.php
index cb44ad2c4..0791c7e5d 100644
--- a/plugins/OStatus/lib/huboutqueuehandler.php
+++ b/plugins/OStatus/lib/huboutqueuehandler.php
@@ -43,7 +43,7 @@ class HubOutQueueHandler extends QueueHandler
common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " .
$e->getMessage());
// @fixme Reschedule a later delivery?
- // Currently we have no way to do this other than 'send NOW'
+ return true;
}
return true;