From ec145b73fc91dd54695dd374c8a71a11e233b8c0 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Tue, 12 Jan 2010 19:57:15 -0800 Subject: Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work --- scripts/enjitqueuehandler.php | 121 ------------------- scripts/getvaliddaemons.php | 11 +- scripts/handlequeued.php | 56 +++++++++ scripts/jabberqueuehandler.php | 78 ------------ scripts/ombqueuehandler.php | 87 -------------- scripts/pingqueuehandler.php | 69 ----------- scripts/pluginqueuehandler.php | 64 ---------- scripts/publicqueuehandler.php | 76 ------------ scripts/queuedaemon.php | 265 +++++++++++++++++++++++++++++++++++++++++ scripts/smsqueuehandler.php | 73 ------------ scripts/xmppconfirmhandler.php | 161 ------------------------- 11 files changed, 322 insertions(+), 739 deletions(-) delete mode 100755 scripts/enjitqueuehandler.php create mode 100755 scripts/handlequeued.php delete mode 100755 scripts/jabberqueuehandler.php delete mode 100755 scripts/ombqueuehandler.php delete mode 100755 scripts/pingqueuehandler.php delete mode 100755 scripts/pluginqueuehandler.php delete mode 100755 scripts/publicqueuehandler.php create mode 100755 scripts/queuedaemon.php delete mode 100755 scripts/smsqueuehandler.php delete mode 100755 scripts/xmppconfirmhandler.php (limited to 'scripts') diff --git a/scripts/enjitqueuehandler.php b/scripts/enjitqueuehandler.php deleted file mode 100755 index afcac539a..000000000 --- a/scripts/enjitqueuehandler.php +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "Starting EnjitQueueHandler"); - $this->log(LOG_INFO, "Broadcasting to ".common_config('enjit', 'apiurl')); - return true; - } - - function handle_notice($notice) - { - - $profile = Profile::staticGet($notice->profile_id); - - $this->log(LOG_INFO, "Posting Notice ".$notice->id." from ".$profile->nickname); - - if ( ! $notice->is_local ) { - $this->log(LOG_INFO, "Skipping remote notice"); - return "skipped"; - } - - # - # Build an Atom message from the notice - # - $noticeurl = common_local_url('shownotice', array('notice' => $notice->id)); - $msg = $profile->nickname . ': ' . $notice->content; - - $atom = "\n"; - $atom .= "".common_config('enjit','source')."\n"; - $atom .= "\n"; - $atom .= "" . $profile->nickname . " - " . common_config('site', 'name') . "\n"; - $atom .= "\n"; - $atom .= " $profile->nickname)) . "'/>\n"; - $atom .= "" . $profile->nickname . "\n"; - $atom .= "" . $profile->avatarUrl(AVATAR_PROFILE_SIZE) . "\n"; - $atom .= "\n"; - $atom .= "" . htmlspecialchars($msg) . "\n"; - $atom .= "" . htmlspecialchars($msg) . "\n"; - $atom .= "\n"; - $atom .= "". $notice->uri . "\n"; - $atom .= "".common_date_w3dtf($notice->created)."\n"; - $atom .= "".common_date_w3dtf($notice->modified)."\n"; - $atom .= "\n"; - - $url = common_config('enjit', 'apiurl') . "/submit/". common_config('enjit','apikey'); - $data = array( - 'msg' => $atom, - ); - - # - # POST the message to $config['enjit']['apiurl'] - # - $request = HTTPClient::start(); - $response = $request->post($url, null, $data); - - return $response->isOk(); - } - -} - -if (have_option('-i')) { - $id = get_option_value('-i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new EnjitQueueHandler($id); - -if ($handler->start()) { - $handler->handle_queue(); -} - -$handler->finish(); diff --git a/scripts/getvaliddaemons.php b/scripts/getvaliddaemons.php index 99ad41b37..a332e06b5 100755 --- a/scripts/getvaliddaemons.php +++ b/scripts/getvaliddaemons.php @@ -37,19 +37,10 @@ require_once INSTALLDIR.'/scripts/commandline.inc'; $daemons = array(); -$daemons[] = INSTALLDIR.'/scripts/pluginqueuehandler.php'; -$daemons[] = INSTALLDIR.'/scripts/ombqueuehandler.php'; -$daemons[] = INSTALLDIR.'/scripts/pingqueuehandler.php'; +$daemons[] = INSTALLDIR.'/scripts/queuedaemon.php'; if(common_config('xmpp','enabled')) { $daemons[] = INSTALLDIR.'/scripts/xmppdaemon.php'; - $daemons[] = INSTALLDIR.'/scripts/jabberqueuehandler.php'; - $daemons[] = INSTALLDIR.'/scripts/publicqueuehandler.php'; - $daemons[] = INSTALLDIR.'/scripts/xmppconfirmhandler.php'; -} - -if (common_config('sms', 'enabled')) { - $daemons[] = INSTALLDIR.'/scripts/smsqueuehandler.php'; } if (Event::handle('GetValidDaemons', array(&$daemons))) { diff --git a/scripts/handlequeued.php b/scripts/handlequeued.php new file mode 100755 index 000000000..9031437aa --- /dev/null +++ b/scripts/handlequeued.php @@ -0,0 +1,56 @@ +#!/usr/bin/env php +. + */ + +define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); + +$helptext = << +Run a single queued notice through background processing +as if it were being run through the queue. + + +END_OF_QUEUE_HELP; + +require_once INSTALLDIR.'/scripts/commandline.inc'; + +if (count($args) != 2) { + show_help(); +} + +$queue = trim($args[0]); +$noticeId = intval($args[1]); + +$qm = QueueManager::get(); +$handler = $qm->getHandler($queue); +if (!$handler) { + print "No handler for queue '$queue'.\n"; + exit(1); +} + +$notice = Notice::staticGet('id', $noticeId); +if (empty($notice)) { + print "Invalid notice id $noticeId\n"; + exit(1); +} + +if (!$handler->handle_notice($notice)) { + print "Failed to handle notice id $noticeId on queue '$queue'.\n"; + exit(1); +} diff --git a/scripts/jabberqueuehandler.php b/scripts/jabberqueuehandler.php deleted file mode 100755 index 8f3a56944..000000000 --- a/scripts/jabberqueuehandler.php +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - exit(1); - } - } -} - -// Abort immediately if xmpp is not enabled, otherwise the daemon chews up -// lots of CPU trying to connect to unconfigured servers -if (common_config('xmpp','enabled')==false) { - print "Aborting daemon - xmpp is disabled\n"; - exit(); -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new JabberQueueHandler($id); - -$handler->runOnce(); diff --git a/scripts/ombqueuehandler.php b/scripts/ombqueuehandler.php deleted file mode 100755 index be33b9821..000000000 --- a/scripts/ombqueuehandler.php +++ /dev/null @@ -1,87 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - - function handle_notice($notice) - { - if ($this->is_remote($notice)) { - $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); - return true; - } else { - return omb_broadcast_notice($notice); - } - } - - function finish() - { - } - - function is_remote($notice) - { - $user = User::staticGet($notice->profile_id); - return is_null($user); - } -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new OmbQueueHandler($id); - -$handler->runOnce(); diff --git a/scripts/pingqueuehandler.php b/scripts/pingqueuehandler.php deleted file mode 100755 index c92337e36..000000000 --- a/scripts/pingqueuehandler.php +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - - function handle_notice($notice) { - return ping_broadcast_notice($notice); - } - - function finish() { - } -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new PingQueueHandler($id); - -$handler->runOnce(); diff --git a/scripts/pluginqueuehandler.php b/scripts/pluginqueuehandler.php deleted file mode 100755 index fa39bdda6..000000000 --- a/scripts/pluginqueuehandler.php +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - - function handle_notice($notice) - { - Event::handle('HandleQueuedNotice', array(&$notice)); - return true; - } -} - -if (have_option('i', 'id')) { - $id = get_option_value('i', 'id'); -} else { - $id = null; -} - -$handler = new PluginQueueHandler($id); -$handler->runOnce(); diff --git a/scripts/publicqueuehandler.php b/scripts/publicqueuehandler.php deleted file mode 100755 index 50a11bcba..000000000 --- a/scripts/publicqueuehandler.php +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - die($e->getMessage()); - } - } -} - -// Abort immediately if xmpp is not enabled, otherwise the daemon chews up -// lots of CPU trying to connect to unconfigured servers -if (common_config('xmpp','enabled')==false) { - print "Aborting daemon - xmpp is disabled\n"; - exit(); -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new PublicQueueHandler($id); - -$handler->runOnce(); diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php new file mode 100755 index 000000000..8ef364fe7 --- /dev/null +++ b/scripts/queuedaemon.php @@ -0,0 +1,265 @@ +#!/usr/bin/env php +. + */ + +define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); + +$shortoptions = 'fi:at:'; +$longoptions = array('id=', 'foreground', 'all', 'threads='); + +/** + * Attempts to get a count of the processors available on the current system + * to fan out multiple threads. + * + * Recognizes Linux and Mac OS X; others will return default of 1. + * + * @return intval + */ +function getProcessorCount() +{ + $cpus = 0; + switch (PHP_OS) { + case 'Linux': + $cpuinfo = file('/proc/cpuinfo'); + foreach (file('/proc/cpuinfo') as $line) { + if (preg_match('/^processor\s+:\s+(\d+)\s?$/', $line)) { + $cpus++; + } + } + break; + case 'Darwin': + $cpus = intval(shell_exec("/usr/sbin/sysctl -n hw.ncpu 2>/dev/null")); + break; + } + if ($cpus) { + return $cpus; + } + return 1; +} + +$threads = getProcessorCount(); +$helptext = << Spawn processing threads (default $threads) + + +END_OF_QUEUE_HELP; + +require_once INSTALLDIR.'/scripts/commandline.inc'; + +require_once(INSTALLDIR.'/lib/daemon.php'); +require_once(INSTALLDIR.'/classes/Queue_item.php'); +require_once(INSTALLDIR.'/classes/Notice.php'); + +define('CLAIM_TIMEOUT', 1200); + +/** + * Queue handling daemon... + * + * The queue daemon by default launches in the background, at which point + * it'll pass control to the configured QueueManager class to poll for updates. + * + * We can then pass individual items through the QueueHandler subclasses + * they belong to. + */ +class QueueDaemon extends Daemon +{ + protected $allsites; + protected $threads=1; + + function __construct($id=null, $daemonize=true, $threads=1, $allsites=false) + { + parent::__construct($daemonize); + + if ($id) { + $this->set_id($id); + } + $this->all = $allsites; + $this->threads = $threads; + } + + /** + * How many seconds a polling-based queue manager should wait between + * checks for new items to handle. + * + * Defaults to 60 seconds; override to speed up or slow down. + * + * @return int timeout in seconds + */ + function timeout() + { + return 60; + } + + function name() + { + return strtolower(get_class($this).'.'.$this->get_id()); + } + + function run() + { + if ($this->threads > 1) { + return $this->runThreads(); + } else { + return $this->runLoop(); + } + } + + function runThreads() + { + $children = array(); + for ($i = 1; $i <= $this->threads; $i++) { + $pid = pcntl_fork(); + if ($pid < 0) { + print "Couldn't fork for thread $i; aborting\n"; + exit(1); + } else if ($pid == 0) { + $this->runChild($i); + exit(0); + } else { + $this->log(LOG_INFO, "Spawned thread $i as pid $pid"); + $children[$i] = $pid; + } + } + + $this->log(LOG_INFO, "Waiting for children to complete."); + while (count($children) > 0) { + $status = null; + $pid = pcntl_wait($status); + if ($pid > 0) { + $i = array_search($pid, $children); + if ($i === false) { + $this->log(LOG_ERR, "Unrecognized child pid $pid exited!"); + continue; + } + unset($children[$i]); + $this->log(LOG_INFO, "Thread $i pid $pid exited."); + + $pid = pcntl_fork(); + if ($pid < 0) { + print "Couldn't fork to respawn thread $i; aborting thread.\n"; + } else if ($pid == 0) { + $this->runChild($i); + exit(0); + } else { + $this->log(LOG_INFO, "Respawned thread $i as pid $pid"); + $children[$i] = $pid; + } + } + } + $this->log(LOG_INFO, "All child processes complete."); + return true; + } + + function runChild($thread) + { + $this->set_id($this->get_id() . "." . $thread); + $this->resetDb(); + $this->runLoop(); + } + + /** + * Reconnect to the database for each child process, + * or they'll get very confused trying to use the + * same socket. + */ + function resetDb() + { + // @fixme do we need to explicitly open the db too + // or is this implied? + global $_DB_DATAOBJECT; + unset($_DB_DATAOBJECT['CONNECTIONS']); + + // Reconnect main memcached, or threads will stomp on + // each other and corrupt their requests. + $cache = common_memcache(); + if ($cache) { + $cache->reconnect(); + } + + // Also reconnect memcached for status_network table. + if (!empty(Status_network::$cache)) { + Status_network::$cache->close(); + Status_network::$cache = null; + } + } + + /** + * Setup and start of run loop for this queue handler as a daemon. + * Most of the heavy lifting is passed on to the QueueManager's service() + * method, which passes control on to the QueueHandler's handle_notice() + * method for each notice that comes in on the queue. + * + * Most of the time this won't need to be overridden in a subclass. + * + * @return boolean true on success, false on failure + */ + function runLoop() + { + $this->log(LOG_INFO, 'checking for queued notices'); + + $master = new IoMaster($this->get_id()); + $master->init($this->all); + $master->service(); + + $this->log(LOG_INFO, 'finished servicing the queue'); + + $this->log(LOG_INFO, 'terminating normally'); + + return true; + } + + function log($level, $msg) + { + common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg); + } +} + +if (have_option('i')) { + $id = get_option_value('i'); +} else if (have_option('--id')) { + $id = get_option_value('--id'); +} else if (count($args) > 0) { + $id = $args[0]; +} else { + $id = null; +} + +if (have_option('t')) { + $threads = intval(get_option_value('t')); +} else if (have_option('--threads')) { + $threads = intval(get_option_value('--threads')); +} else { + $threads = 0; +} +if (!$threads) { + $threads = getProcessorCount(); +} + +$daemonize = !(have_option('f') || have_option('--foreground')); +$all = have_option('a') || have_option('--all'); + +$daemon = new QueueDaemon($id, $daemonize, $threads, $all); +$daemon->runOnce(); + diff --git a/scripts/smsqueuehandler.php b/scripts/smsqueuehandler.php deleted file mode 100755 index 6583a77da..000000000 --- a/scripts/smsqueuehandler.php +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - - function handle_notice($notice) - { - return mail_broadcast_notice_sms($notice); - } - - function finish() - { - } -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new SmsQueueHandler($id); - -$handler->runOnce(); diff --git a/scripts/xmppconfirmhandler.php b/scripts/xmppconfirmhandler.php deleted file mode 100755 index 2e3974136..000000000 --- a/scripts/xmppconfirmhandler.php +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<start()) { - return false; - } - $this->log(LOG_INFO, 'checking for queued confirmations'); - do { - $confirm = $this->next_confirm(); - if ($confirm) { - $this->log(LOG_INFO, 'Sending confirmation for ' . $confirm->address); - $user = User::staticGet($confirm->user_id); - if (!$user) { - $this->log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id); - continue; - } - $success = jabber_confirm_address($confirm->code, - $user->nickname, - $confirm->address); - if (!$success) { - $this->log(LOG_ERR, 'Confirmation failed for ' . $confirm->address); - # Just let the claim age out; hopefully things work then - continue; - } else { - $this->log(LOG_INFO, 'Confirmation sent for ' . $confirm->address); - # Mark confirmation sent; need a dupe so we don't have the WHERE clause - $dupe = Confirm_address::staticGet('code', $confirm->code); - if (!$dupe) { - common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__); - continue; - } - $orig = clone($dupe); - $dupe->sent = $dupe->claimed; - $result = $dupe->update($orig); - if (!$result) { - common_log_db_error($dupe, 'UPDATE', __FILE__); - # Just let the claim age out; hopefully things work then - continue; - } - $dupe->free(); - unset($dupe); - } - $user->free(); - unset($user); - $confirm->free(); - unset($confirm); - $this->idle(0); - } else { -# $this->clear_old_confirm_claims(); - $this->idle(10); - } - } while (true); - if (!$this->finish()) { - return false; - } - return true; - } - - function next_confirm() - { - $confirm = new Confirm_address(); - $confirm->whereAdd('claimed IS null'); - $confirm->whereAdd('sent IS null'); - # XXX: eventually we could do other confirmations in the queue, too - $confirm->address_type = 'jabber'; - $confirm->orderBy('modified DESC'); - $confirm->limit(1); - if ($confirm->find(true)) { - $this->log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address); - # working around some weird DB_DataObject behaviour - $confirm->whereAdd(''); # clears where stuff - $original = clone($confirm); - $confirm->claimed = common_sql_now(); - $result = $confirm->update($original); - if ($result) { - $this->log(LOG_INFO, 'Succeeded in claim! '. $result); - return $confirm; - } else { - $this->log(LOG_INFO, 'Failed in claim!'); - return false; - } - } - return null; - } - - function clear_old_confirm_claims() - { - $confirm = new Confirm(); - $confirm->claimed = null; - $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY); - $confirm->free(); - unset($confirm); - } -} - -// Abort immediately if xmpp is not enabled, otherwise the daemon chews up -// lots of CPU trying to connect to unconfigured servers -if (common_config('xmpp','enabled')==false) { - print "Aborting daemon - xmpp is disabled\n"; - exit(); -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new XmppConfirmHandler($id); - -$handler->runOnce(); - -- cgit v1.2.3-54-g00ecf