diff options
-rw-r--r-- | lib/stompqueuemanager.php | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 20c6e7a34..1b4a26f2e 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,7 +30,7 @@ require_once 'Stomp.php'; -class QueueManager +class StompQueueManager { var $server = null; var $username = null; @@ -61,10 +61,12 @@ class QueueManager function enqueue($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); + // XXX: serialize and send entire notice + $result = $this->con->send($this->_queueName($queue), $notice->id, // BODY of the message array ('created' => $notice->created)); @@ -93,9 +95,11 @@ class QueueManager // notice, and it has to get it from the DB // A massive improvement would be avoid DB query by transmitting // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); if ($notice) { + $this->_saveFrame($notice, $queue, $frame); } else { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } @@ -104,7 +108,7 @@ class QueueManager function done($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); @@ -116,7 +120,33 @@ class QueueManager // if the msg has been handled positively, ack it // and the queue server will remove it from the queue $this->con->ack($frame); + $this->_clearFrame($notice, $queue); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); } } + + function _frameKey($notice, $queue) + { + return ((string)$notice->id) . '-' . $queue; + } + + function _saveFrame($notice, $queue, $frame) + { + $k = $this->_frameKey($notice, $queue); + $this->_frames[$k] = $frame; + return true; + } + + function _getFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + return $this->_frames[$k]; + } + + function _clearFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + unset($this->_frames[$k]); + } } |