summaryrefslogtreecommitdiff
path: root/lib/queuehandler.php
diff options
context:
space:
mode:
authorSarven Capadisli <csarven@controlyourself.ca>2009-04-08 22:55:34 +0000
committerSarven Capadisli <csarven@controlyourself.ca>2009-04-08 22:55:34 +0000
commit03a4a4bebf2197097e672e094535ee067fe25441 (patch)
tree2b73b7cf06cd7a65a327df726cc6d1106bfecaff /lib/queuehandler.php
parente5657ca4bbd8a43bf2dd44c2554e807a45224bd7 (diff)
parent4e3e4fcfff53ea3a5ef5c90672f3848596d3acbf (diff)
Merge branch '0.8.x' of git@gitorious.org:laconica/dev into 0.8.x
Diffstat (limited to 'lib/queuehandler.php')
-rw-r--r--lib/queuehandler.php91
1 files changed, 73 insertions, 18 deletions
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
index 9ce9e32b3..f76f16e07 100644
--- a/lib/queuehandler.php
+++ b/lib/queuehandler.php
@@ -36,7 +36,7 @@ class QueueHandler extends Daemon
$this->set_id($id);
}
}
-
+
function class_name()
{
return ucfirst($this->transport()) . 'Handler';
@@ -46,7 +46,7 @@ class QueueHandler extends Daemon
{
return strtolower($this->class_name().'.'.$this->get_id());
}
-
+
function get_id()
{
return $this->_id;
@@ -56,16 +56,16 @@ class QueueHandler extends Daemon
{
$this->_id = $id;
}
-
+
function transport()
{
return null;
}
-
+
function start()
{
}
-
+
function finish()
{
}
@@ -74,16 +74,10 @@ class QueueHandler extends Daemon
{
return true;
}
-
- function run()
- {
- if (!$this->start()) {
- return false;
- }
- $this->log(LOG_INFO, 'checking for queued notices');
- $transport = $this->transport();
+
+ function db_dispatch() {
do {
- $qi = Queue_item::top($transport);
+ $qi = Queue_item::top($this->transport());
if ($qi) {
$this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
$notice = Notice::staticGet($qi->notice_id);
@@ -113,8 +107,69 @@ class QueueHandler extends Daemon
} else {
$this->clear_old_claims();
$this->idle(5);
- }
+ }
} while (true);
+ }
+
+ function stomp_dispatch() {
+ require("Stomp.php");
+ $con = new Stomp(common_config('queue','stomp_server'));
+ if (!$con->connect()) {
+ $this->log(LOG_ERR, 'Failed to connect to queue server');
+ return false;
+ }
+ $queue_basename = common_config('queue','queue_basename');
+ // subscribe to the relevant queue (format: basename-transport)
+ $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
+
+ do {
+ $frame = $con->readFrame();
+ if ($frame) {
+ $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
+
+ // XXX: Now the queue handler receives only the ID of the
+ // notice, and it has to get it from the DB
+ // A massive improvement would be avoid DB query by transmitting
+ // all the notice details via queue server...
+ $notice = Notice::staticGet($frame->body);
+
+ if ($notice) {
+ $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
+ $result = $this->handle_notice($notice);
+ if ($result) {
+ // if the msg has been handled positively, ack it
+ // and the queue server will remove it from the queue
+ $con->ack($frame);
+ $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+ }
+ else {
+ // no ack
+ $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
+ }
+ $notice->free();
+ unset($notice);
+ $notice = null;
+ } else {
+ $this->log(LOG_WARNING, 'queue item for notice that does not exist');
+ }
+ }
+ } while (true);
+
+ $con->disconnect();
+ }
+
+ function run()
+ {
+ if (!$this->start()) {
+ return false;
+ }
+ $this->log(LOG_INFO, 'checking for queued notices');
+ if (common_config('queue','subsystem') == 'stomp') {
+ $this->stomp_dispatch();
+ }
+ else {
+ $this->db_dispatch();
+ }
if (!$this->finish()) {
return false;
}
@@ -127,7 +182,7 @@ class QueueHandler extends Daemon
sleep($timeout);
}
}
-
+
function clear_old_claims()
{
$qi = new Queue_item();
@@ -137,10 +192,10 @@ class QueueHandler extends Daemon
$qi->free();
unset($qi);
}
-
+
function log($level, $msg)
{
common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
}
}
- \ No newline at end of file
+