diff options
-rw-r--r-- | lib/queuehandler.php | 79 |
1 files changed, 39 insertions, 40 deletions
diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 752bbc6d2..f76f16e07 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -36,7 +36,7 @@ class QueueHandler extends Daemon $this->set_id($id); } } - + function class_name() { return ucfirst($this->transport()) . 'Handler'; @@ -46,7 +46,7 @@ class QueueHandler extends Daemon { return strtolower($this->class_name().'.'.$this->get_id()); } - + function get_id() { return $this->_id; @@ -56,16 +56,16 @@ class QueueHandler extends Daemon { $this->_id = $id; } - + function transport() { return null; } - + function start() { } - + function finish() { } @@ -74,7 +74,7 @@ class QueueHandler extends Daemon { return true; } - + function db_dispatch() { do { $qi = Queue_item::top($this->transport()); @@ -107,43 +107,43 @@ class QueueHandler extends Daemon } else { $this->clear_old_claims(); $this->idle(5); - } + } } while (true); } function stomp_dispatch() { - require("Stomp.php"); - $con = new Stomp(common_config('queue','stomp_server')); - if (!$con->connect()) { - $this->log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - $queue_basename = common_config('queue','queue_basename'); - // subscribe to the relevant queue (format: basename-transport) - $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); + require("Stomp.php"); + $con = new Stomp(common_config('queue','stomp_server')); + if (!$con->connect()) { + $this->log(LOG_ERR, 'Failed to connect to queue server'); + return false; + } + $queue_basename = common_config('queue','queue_basename'); + // subscribe to the relevant queue (format: basename-transport) + $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); do { $frame = $con->readFrame(); if ($frame) { $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... $notice = Notice::staticGet($frame->body); if ($notice) { $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); $result = $this->handle_notice($notice); if ($result) { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $con->ack($frame); - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - else { - // no ack + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + else { + // no ack $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); } $notice->free(); @@ -152,12 +152,11 @@ class QueueHandler extends Daemon } else { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } - } + } } while (true); - - $con->disconnect(); + + $con->disconnect(); } - function run() { @@ -165,12 +164,12 @@ class QueueHandler extends Daemon return false; } $this->log(LOG_INFO, 'checking for queued notices'); - if (common_config('queue','subsystem') == 'stomp') { - $this->stomp_dispatch(); - } - else { - $this->db_dispatch(); - } + if (common_config('queue','subsystem') == 'stomp') { + $this->stomp_dispatch(); + } + else { + $this->db_dispatch(); + } if (!$this->finish()) { return false; } @@ -183,7 +182,7 @@ class QueueHandler extends Daemon sleep($timeout); } } - + function clear_old_claims() { $qi = new Queue_item(); @@ -193,10 +192,10 @@ class QueueHandler extends Daemon $qi->free(); unset($qi); } - + function log($level, $msg) { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } } - + |