diff options
Diffstat (limited to 'lib/queuehandler.php')
-rw-r--r-- | lib/queuehandler.php | 210 |
1 files changed, 112 insertions, 98 deletions
diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 23f295c45..9ce9e32b3 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -25,108 +25,122 @@ require_once(INSTALLDIR.'/lib/daemon.php'); require_once(INSTALLDIR.'/classes/Queue_item.php'); require_once(INSTALLDIR.'/classes/Notice.php'); -class QueueHandler extends Daemon { +class QueueHandler extends Daemon +{ - var $_id = 'generic'; + var $_id = 'generic'; - function QueueHandler($id=NULL) { - if ($id) { - $this->set_id($id); - } - } - - function class_name() { - return ucfirst($this->transport()) . 'Handler'; - } + function QueueHandler($id=null) + { + if ($id) { + $this->set_id($id); + } + } + + function class_name() + { + return ucfirst($this->transport()) . 'Handler'; + } - function name() { - return strtolower($this->class_name().'.'.$this->get_id()); - } - - function get_id() { - return $this->_id; - } + function name() + { + return strtolower($this->class_name().'.'.$this->get_id()); + } + + function get_id() + { + return $this->_id; + } - function set_id($id) { - $this->_id = $id; - } - - function transport() { - return NULL; - } - - function start() { - } - - function finish() { - } + function set_id($id) + { + $this->_id = $id; + } + + function transport() + { + return null; + } + + function start() + { + } + + function finish() + { + } - function handle_notice($notice) { - return true; - } - - function run() { - if (!$this->start()) { - return false; - } - $this->log(LOG_INFO, 'checking for queued notices'); - $transport = $this->transport(); - do { - $qi = Queue_item::top($transport); - if ($qi) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); - $notice = Notice::staticGet($qi->notice_id); - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - # XXX: what to do if broadcast fails? - $result = $this->handle_notice($notice); - if (!$result) { - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - $orig = $qi; - $qi->claimed = NULL; - $qi->update($orig); - $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); - continue; - } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - $notice->free(); - unset($notice); - $notice = NULL; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - $qi->delete(); - $qi->free(); - unset($qi); - $this->idle(0); - } else { - $this->clear_old_claims(); - $this->idle(5); - } - } while (true); - if (!$this->finish()) { - return false; - } - return true; - } + function handle_notice($notice) + { + return true; + } + + function run() + { + if (!$this->start()) { + return false; + } + $this->log(LOG_INFO, 'checking for queued notices'); + $transport = $this->transport(); + do { + $qi = Queue_item::top($transport); + if ($qi) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); + $notice = Notice::staticGet($qi->notice_id); + if ($notice) { + $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); + # XXX: what to do if broadcast fails? + $result = $this->handle_notice($notice); + if (!$result) { + $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); + $orig = $qi; + $qi->claimed = null; + $qi->update($orig); + $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); + continue; + } + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + $notice->free(); + unset($notice); + $notice = null; + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + $qi->delete(); + $qi->free(); + unset($qi); + $this->idle(0); + } else { + $this->clear_old_claims(); + $this->idle(5); + } + } while (true); + if (!$this->finish()) { + return false; + } + return true; + } - function idle($timeout=0) { - if ($timeout>0) { - sleep($timeout); - } - } - - function clear_old_claims() { - $qi = new Queue_item(); - $qi->transport = $this->transport(); - $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); - $qi->free(); - unset($qi); - } - - function log($level, $msg) { - common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); - } + function idle($timeout=0) + { + if ($timeout>0) { + sleep($timeout); + } + } + + function clear_old_claims() + { + $qi = new Queue_item(); + $qi->transport = $this->transport(); + $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); + $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); + $qi->free(); + unset($qi); + } + + function log($level, $msg) + { + common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); + } } -
\ No newline at end of file +
\ No newline at end of file |