diff options
author | Evan Prodromou <evan@controlyourself.ca> | 2009-04-02 06:23:50 -0400 |
---|---|---|
committer | Evan Prodromou <evan@controlyourself.ca> | 2009-04-02 06:23:50 -0400 |
commit | 6be15dfcacfa821566c16e71cc7bc478b2ff9433 (patch) | |
tree | 461685095e0c84e5ef6db8c60ca4aee95d49e46a /lib/queuehandler.php | |
parent | 798984381f9ee40681d1feb4a7bef72460fe39f2 (diff) | |
parent | 5e9ff6ae973423c06ac42c679f4ad67a46a77bd0 (diff) |
Merge branch '0.8.x' of git://gitorious.org/laconica/fmarani-clone into fmarani/0.8.x
Diffstat (limited to 'lib/queuehandler.php')
-rw-r--r-- | lib/queuehandler.php | 74 |
1 files changed, 65 insertions, 9 deletions
diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 9ce9e32b3..752bbc6d2 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -75,15 +75,9 @@ class QueueHandler extends Daemon return true; } - function run() - { - if (!$this->start()) { - return false; - } - $this->log(LOG_INFO, 'checking for queued notices'); - $transport = $this->transport(); + function db_dispatch() { do { - $qi = Queue_item::top($transport); + $qi = Queue_item::top($this->transport()); if ($qi) { $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); $notice = Notice::staticGet($qi->notice_id); @@ -115,6 +109,68 @@ class QueueHandler extends Daemon $this->idle(5); } } while (true); + } + + function stomp_dispatch() { + require("Stomp.php"); + $con = new Stomp(common_config('queue','stomp_server')); + if (!$con->connect()) { + $this->log(LOG_ERR, 'Failed to connect to queue server'); + return false; + } + $queue_basename = common_config('queue','queue_basename'); + // subscribe to the relevant queue (format: basename-transport) + $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); + + do { + $frame = $con->readFrame(); + if ($frame) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); + + if ($notice) { + $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); + $result = $this->handle_notice($notice); + if ($result) { + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + else { + // no ack + $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); + } + $notice->free(); + unset($notice); + $notice = null; + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + } + } while (true); + + $con->disconnect(); + } + + + function run() + { + if (!$this->start()) { + return false; + } + $this->log(LOG_INFO, 'checking for queued notices'); + if (common_config('queue','subsystem') == 'stomp') { + $this->stomp_dispatch(); + } + else { + $this->db_dispatch(); + } if (!$this->finish()) { return false; } @@ -143,4 +199,4 @@ class QueueHandler extends Daemon common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } } -
\ No newline at end of file + |