From b9f0ea6f0e16f2314a2bdd56365205e7a3c31aa1 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 27 Aug 2008 16:54:07 -0400 Subject: break up monolithic xmppdaemon into multiple queue handlers Eventually, the poor xmppdaemon has become overloaded with extra tasks. So, I've broken it up. Now, we have 5 background scripts, and more coming: * xmppdaemon.php - handles incoming XMPP messages only. * xmppqueuehandler.php - sends notices from the queue out through XMPP. * smsqueuehandler.php - sends notices from the queue out over SMS * ombqueuehandler.php - sends notices from the queue out over OMB * xmppconfirmhandler.php - sends confirmation requests out over XMPP. This is in addition to maildaemon.php, which takes incoming messages. None of these are "true" daemons -- they don't daemonize themselves automatically. Use nohup or another tool to background them. monit can also be useful to keep them running. At some point, these might become fork()'ing daemons, able to handle more than one notice at a time. For now, I'm just running multiple instances, hoping they don't interfere. darcs-hash:20080827205407-84dde-97884a12f5f4e54c93bc785bd280683d1ee7e749.gz --- lib/queuehandler.php | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 lib/queuehandler.php (limited to 'lib/queuehandler.php') diff --git a/lib/queuehandler.php b/lib/queuehandler.php new file mode 100644 index 000000000..f12b880b1 --- /dev/null +++ b/lib/queuehandler.php @@ -0,0 +1,105 @@ +. + */ + +define('CLAIM_TIMEOUT', 1200); + +class QueueHandler { + + var $_id = 'generic'; + + function QueueHandler($id=NULL) { + if ($id) { + $this->set_id($id); + } + } + + function class_name() { + return ucfirst($this->transport()) . 'Handler'; + } + + function get_id() { + return $this->_id; + } + + function set_id($id) { + $this->_id = $id; + } + + function transport() { + return NULL; + } + + function start() { + } + + function finish() { + } + + function handle_notice($notice) { + return true; + } + + function handle_queue() { + $this->clear_old_claims(); + $this->log(LOG_INFO, 'checking for queued notices'); + $cnt = 0; + $transport = $this->transport(); + do { + $qi = Queue_item::top($transport); + if ($qi) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); + $notice = Notice::staticGet($qi->notice_id); + if ($notice) { + $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); + # XXX: what to do if broadcast fails? + $result = $this->handle_notice($notice); + if (!$result) { + $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); + $orig = $qi; + $qi->claimed = NULL; + $qi->update($orig); + $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); + continue; + } + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + $notice = NULL; + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + $qi->delete(); + $cnt++; + } else { + $this->clear_old_claims(); + sleep(10); + } + } while (true); + } + + function clear_old_claims() { + $qi = new Queue_item(); + $qi->transport = $this->transport(); + $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); + $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); + } + + function log($level, $msg) { + common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); + } +} + \ No newline at end of file -- cgit v1.2.3-54-g00ecf