From ec145b73fc91dd54695dd374c8a71a11e233b8c0 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Tue, 12 Jan 2010 19:57:15 -0800 Subject: Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work --- plugins/Enjit/README | 5 ++ plugins/Enjit/enjitqueuehandler.php | 91 ++++++++++++++++++++++ plugins/Facebook/FacebookPlugin.php | 46 ++--------- plugins/Facebook/facebookqueuehandler.php | 52 ++++--------- plugins/MemcachePlugin.php | 17 ++++ plugins/TwitterBridge/TwitterBridgePlugin.php | 50 ++++-------- .../TwitterBridge/daemons/twitterqueuehandler.php | 73 ----------------- plugins/TwitterBridge/twitterqueuehandler.php | 35 +++++++++ 8 files changed, 186 insertions(+), 183 deletions(-) create mode 100644 plugins/Enjit/README create mode 100644 plugins/Enjit/enjitqueuehandler.php mode change 100755 => 100644 plugins/Facebook/facebookqueuehandler.php delete mode 100755 plugins/TwitterBridge/daemons/twitterqueuehandler.php create mode 100644 plugins/TwitterBridge/twitterqueuehandler.php (limited to 'plugins') diff --git a/plugins/Enjit/README b/plugins/Enjit/README new file mode 100644 index 000000000..03f989490 --- /dev/null +++ b/plugins/Enjit/README @@ -0,0 +1,5 @@ +This doesn't seem to have been functional for a while; can't find other references +to the enjit configuration or transport enqueuing. Keeping it in case someone +wants to bring it up to date. + +-- brion vibber 2009-12-03 diff --git a/plugins/Enjit/enjitqueuehandler.php b/plugins/Enjit/enjitqueuehandler.php new file mode 100644 index 000000000..f0e706b92 --- /dev/null +++ b/plugins/Enjit/enjitqueuehandler.php @@ -0,0 +1,91 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for watching new notices and posting to enjit. + * @fixme is this actually being used/functional atm? + */ +class EnjitQueueHandler extends QueueHandler +{ + function transport() + { + return 'enjit'; + } + + function start() + { + $this->log(LOG_INFO, "Starting EnjitQueueHandler"); + $this->log(LOG_INFO, "Broadcasting to ".common_config('enjit', 'apiurl')); + return true; + } + + function handle_notice($notice) + { + + $profile = Profile::staticGet($notice->profile_id); + + $this->log(LOG_INFO, "Posting Notice ".$notice->id." from ".$profile->nickname); + + if ( ! $notice->is_local ) { + $this->log(LOG_INFO, "Skipping remote notice"); + return "skipped"; + } + + # + # Build an Atom message from the notice + # + $noticeurl = common_local_url('shownotice', array('notice' => $notice->id)); + $msg = $profile->nickname . ': ' . $notice->content; + + $atom = "\n"; + $atom .= "".common_config('enjit','source')."\n"; + $atom .= "\n"; + $atom .= "" . $profile->nickname . " - " . common_config('site', 'name') . "\n"; + $atom .= "\n"; + $atom .= " $profile->nickname)) . "'/>\n"; + $atom .= "" . $profile->nickname . "\n"; + $atom .= "" . $profile->avatarUrl(AVATAR_PROFILE_SIZE) . "\n"; + $atom .= "\n"; + $atom .= "" . htmlspecialchars($msg) . "\n"; + $atom .= "" . htmlspecialchars($msg) . "\n"; + $atom .= "\n"; + $atom .= "". $notice->uri . "\n"; + $atom .= "".common_date_w3dtf($notice->created)."\n"; + $atom .= "".common_date_w3dtf($notice->modified)."\n"; + $atom .= "\n"; + + $url = common_config('enjit', 'apiurl') . "/submit/". common_config('enjit','apikey'); + $data = array( + 'msg' => $atom, + ); + + # + # POST the message to $config['enjit']['apiurl'] + # + $request = HTTPClient::start(); + $response = $request->post($url, null, $data); + + return $response->isOk(); + } + +} diff --git a/plugins/Facebook/FacebookPlugin.php b/plugins/Facebook/FacebookPlugin.php index de91bf24a..4266b886d 100644 --- a/plugins/Facebook/FacebookPlugin.php +++ b/plugins/Facebook/FacebookPlugin.php @@ -114,6 +114,9 @@ class FacebookPlugin extends Plugin case 'FBCSettingsNav': include_once INSTALLDIR . '/plugins/Facebook/FBCSettingsNav.php'; return false; + case 'FacebookQueueHandler': + include_once INSTALLDIR . '/plugins/Facebook/facebookqueuehandler.php'; + return false; default: return true; } @@ -508,50 +511,15 @@ class FacebookPlugin extends Plugin } /** - * broadcast the message when not using queuehandler + * Register Facebook notice queue handler * - * @param Notice &$notice the notice - * @param array $queue destination queue + * @param QueueManager $manager * * @return boolean hook return */ - - function onUnqueueHandleNotice(&$notice, $queue) - { - if (($queue == 'facebook') && ($this->_isLocal($notice))) { - facebookBroadcastNotice($notice); - return false; - } - return true; - } - - /** - * Determine whether the notice was locally created - * - * @param Notice $notice the notice - * - * @return boolean locality - */ - - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } - - /** - * Add Facebook queuehandler to the list of daemons to start - * - * @param array $daemons the list fo daemons to run - * - * @return boolean hook return - * - */ - - function onGetValidDaemons($daemons) + function onEndInitializeQueueManager($manager) { - array_push($daemons, INSTALLDIR . - '/plugins/Facebook/facebookqueuehandler.php'); + $manager->connect('facebook', 'FacebookQueueHandler'); return true; } diff --git a/plugins/Facebook/facebookqueuehandler.php b/plugins/Facebook/facebookqueuehandler.php old mode 100755 new mode 100644 index e4ae7d4ee..1778690e5 --- a/plugins/Facebook/facebookqueuehandler.php +++ b/plugins/Facebook/facebookqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php . */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/../..')); +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - function handle_notice($notice) { - return facebookBroadcastNotice($notice); + if ($this->_isLocal($notice)) { + return facebookBroadcastNotice($notice); + } + return true; } - function finish() + /** + * Determine whether the notice was locally created + * + * @param Notice $notice the notice + * + * @return boolean locality + */ + function _isLocal($notice) { + return ($notice->is_local == Notice::LOCAL_PUBLIC || + $notice->is_local == Notice::LOCAL_NONPUBLIC); } - } - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new FacebookQueueHandler($id); - -$handler->runOnce(); diff --git a/plugins/MemcachePlugin.php b/plugins/MemcachePlugin.php index 5f93e9a83..fbc2802f7 100644 --- a/plugins/MemcachePlugin.php +++ b/plugins/MemcachePlugin.php @@ -133,6 +133,23 @@ class MemcachePlugin extends Plugin return false; } + function onStartCacheReconnect(&$success) + { + if (empty($this->_conn)) { + // nothing to do + return true; + } + if ($this->persistent) { + common_log(LOG_ERR, "Cannot close persistent memcached connection"); + $success = false; + } else { + common_log(LOG_INFO, "Closing memcached connection"); + $success = $this->_conn->close(); + $this->_conn = null; + } + return false; + } + /** * Ensure that a connection exists * diff --git a/plugins/TwitterBridge/TwitterBridgePlugin.php b/plugins/TwitterBridge/TwitterBridgePlugin.php index a87ee2894..57b3c1c99 100644 --- a/plugins/TwitterBridge/TwitterBridgePlugin.php +++ b/plugins/TwitterBridge/TwitterBridgePlugin.php @@ -112,7 +112,9 @@ class TwitterBridgePlugin extends Plugin strtolower(mb_substr($cls, 0, -6)) . '.php'; return false; case 'TwitterOAuthClient': - include_once INSTALLDIR . '/plugins/TwitterBridge/twitteroauthclient.php'; + case 'TwitterQueueHandler': + include_once INSTALLDIR . '/plugins/TwitterBridge/' . + strtolower($cls) . '.php'; return false; default: return true; @@ -138,48 +140,15 @@ class TwitterBridgePlugin extends Plugin return true; } - /** - * broadcast the message when not using queuehandler - * - * @param Notice &$notice the notice - * @param array $queue destination queue - * - * @return boolean hook return - */ - function onUnqueueHandleNotice(&$notice, $queue) - { - if (($queue == 'twitter') && ($this->_isLocal($notice))) { - broadcast_twitter($notice); - return false; - } - return true; - } - - /** - * Determine whether the notice was locally created - * - * @param Notice $notice - * - * @return boolean locality - */ - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } - /** * Add Twitter bridge daemons to the list of daemons to start * * @param array $daemons the list fo daemons to run * * @return boolean hook return - * */ function onGetValidDaemons($daemons) { - array_push($daemons, INSTALLDIR . - '/plugins/TwitterBridge/daemons/twitterqueuehandler.php'); array_push($daemons, INSTALLDIR . '/plugins/TwitterBridge/daemons/synctwitterfriends.php'); @@ -191,6 +160,19 @@ class TwitterBridgePlugin extends Plugin return true; } + /** + * Register Twitter notice queue handler + * + * @param QueueManager $manager + * + * @return boolean hook return + */ + function onEndInitializeQueueManager($manager) + { + $manager->connect('twitter', 'TwitterQueueHandler'); + return true; + } + function onPluginVersion(&$versions) { $versions[] = array('name' => 'TwitterBridge', diff --git a/plugins/TwitterBridge/daemons/twitterqueuehandler.php b/plugins/TwitterBridge/daemons/twitterqueuehandler.php deleted file mode 100755 index f0e76bb74..000000000 --- a/plugins/TwitterBridge/daemons/twitterqueuehandler.php +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..')); - -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<log(LOG_INFO, "INITIALIZE"); - return true; - } - - function handle_notice($notice) - { - return broadcast_twitter($notice); - } - - function finish() - { - } - -} - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new TwitterQueueHandler($id); - -$handler->runOnce(); diff --git a/plugins/TwitterBridge/twitterqueuehandler.php b/plugins/TwitterBridge/twitterqueuehandler.php new file mode 100644 index 000000000..5089ca7b7 --- /dev/null +++ b/plugins/TwitterBridge/twitterqueuehandler.php @@ -0,0 +1,35 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } + +require_once INSTALLDIR . '/plugins/TwitterBridge/twitter.php'; + +class TwitterQueueHandler extends QueueHandler +{ + function transport() + { + return 'twitter'; + } + + function handle_notice($notice) + { + return broadcast_twitter($notice); + } +} -- cgit v1.2.3-54-g00ecf