From 2ac684f53df3f76aacb4d1be79d7db64567f1964 Mon Sep 17 00:00:00 2001 From: Federico Marani Date: Fri, 20 Feb 2009 18:57:20 +0000 Subject: * implemented STOMP support for external message queueing systems * (ticket-724) * included PHP STOMP libraries (http://code.google.com/p/stompcli/) --- lib/queuehandler.php | 74 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 65 insertions(+), 9 deletions(-) (limited to 'lib/queuehandler.php') diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 9ce9e32b3..752bbc6d2 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -75,15 +75,9 @@ class QueueHandler extends Daemon return true; } - function run() - { - if (!$this->start()) { - return false; - } - $this->log(LOG_INFO, 'checking for queued notices'); - $transport = $this->transport(); + function db_dispatch() { do { - $qi = Queue_item::top($transport); + $qi = Queue_item::top($this->transport()); if ($qi) { $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); $notice = Notice::staticGet($qi->notice_id); @@ -115,6 +109,68 @@ class QueueHandler extends Daemon $this->idle(5); } } while (true); + } + + function stomp_dispatch() { + require("Stomp.php"); + $con = new Stomp(common_config('queue','stomp_server')); + if (!$con->connect()) { + $this->log(LOG_ERR, 'Failed to connect to queue server'); + return false; + } + $queue_basename = common_config('queue','queue_basename'); + // subscribe to the relevant queue (format: basename-transport) + $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); + + do { + $frame = $con->readFrame(); + if ($frame) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); + + if ($notice) { + $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); + $result = $this->handle_notice($notice); + if ($result) { + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + else { + // no ack + $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); + } + $notice->free(); + unset($notice); + $notice = null; + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + } + } while (true); + + $con->disconnect(); + } + + + function run() + { + if (!$this->start()) { + return false; + } + $this->log(LOG_INFO, 'checking for queued notices'); + if (common_config('queue','subsystem') == 'stomp') { + $this->stomp_dispatch(); + } + else { + $this->db_dispatch(); + } if (!$this->finish()) { return false; } @@ -143,4 +199,4 @@ class QueueHandler extends Daemon common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } } - \ No newline at end of file + -- cgit v1.2.3-54-g00ecf