summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/stompqueuemanager.php36
1 files changed, 33 insertions, 3 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 20c6e7a34..1b4a26f2e 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -30,7 +30,7 @@
require_once 'Stomp.php';
-class QueueManager
+class StompQueueManager
{
var $server = null;
var $username = null;
@@ -61,10 +61,12 @@ class QueueManager
function enqueue($object, $queue)
{
- $notice = (Notice)$object;
+ $notice = $object;
$this->_connect();
+ // XXX: serialize and send entire notice
+
$result = $this->con->send($this->_queueName($queue),
$notice->id, // BODY of the message
array ('created' => $notice->created));
@@ -93,9 +95,11 @@ class QueueManager
// notice, and it has to get it from the DB
// A massive improvement would be avoid DB query by transmitting
// all the notice details via queue server...
+
$notice = Notice::staticGet($frame->body);
if ($notice) {
+ $this->_saveFrame($notice, $queue, $frame);
} else {
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
}
@@ -104,7 +108,7 @@ class QueueManager
function done($object, $queue)
{
- $notice = (Notice)$object;
+ $notice = $object;
$this->_connect();
@@ -116,7 +120,33 @@ class QueueManager
// if the msg has been handled positively, ack it
// and the queue server will remove it from the queue
$this->con->ack($frame);
+ $this->_clearFrame($notice, $queue);
+
$this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
}
}
+
+ function _frameKey($notice, $queue)
+ {
+ return ((string)$notice->id) . '-' . $queue;
+ }
+
+ function _saveFrame($notice, $queue, $frame)
+ {
+ $k = $this->_frameKey($notice, $queue);
+ $this->_frames[$k] = $frame;
+ return true;
+ }
+
+ function _getFrame($notice, $queue)
+ {
+ $k = $this->_frameKey($notice, $queue);
+ return $this->_frames[$k];
+ }
+
+ function _clearFrame($notice, $queue)
+ {
+ $k = $this->_frameKey($notice, $queue);
+ unset($this->_frames[$k]);
+ }
}