diff options
author | Brion Vibber <brion@status.net> | 2010-01-12 19:57:15 -0800 |
---|---|---|
committer | Brion Vibber <brion@status.net> | 2010-01-12 20:45:09 -0800 |
commit | ec145b73fc91dd54695dd374c8a71a11e233b8c0 (patch) | |
tree | d4e718b0f5bdc917ea7eb6b951b2364e88078dec /plugins | |
parent | 2b10e359fea9d6aabc5ab35557954a503bea730b (diff) |
Major refactoring of queue handlers to support running multiple sites in one daemon.
Key changes:
* Initialization code moved from common.php to StatusNet class;
can now switch configurations during runtime.
* As a consequence, configuration files must now be idempotent...
Be careful with constant, function or class definitions.
* Control structure for daemons/QueueManager/QueueHandler has been refactored;
the run loop is now managed by IoMaster run via scripts/queuedaemon.php
IoManager subclasses are woken to handle socket input or polling, and may
cover multiple sites.
* Plugins can implement notice queue handlers more easily by registering a
QueueHandler class; no more need to add a daemon.
The new QueueDaemon runs from scripts/queuedaemon.php:
* This replaces most of the old *handler.php scripts; they've been refactored
to the bare handler classes.
* Spawns multiple child processes to spread load; defaults to CPU count on
Linux and Mac OS X systems, or override with --threads=N
* When multithreaded, child processes are automatically respawned on failure.
* Threads gracefully shut down and restart when passing a soft memory limit
(defaults to 90% of memory_limit), limiting damage from memory leaks.
* Support for UDP-based monitoring: http://www.gitorious.org/snqmon
Rough control flow diagram:
QueueDaemon -> IoMaster -> IoManager
QueueManager [listen or poll] -> QueueHandler
XmppManager [ping & keepalive]
XmppConfirmManager [poll updates]
Todo:
* Respawning features not currently available running single-threaded.
* When running single-site, configuration changes aren't picked up.
* New sites or config changes affecting queue subscriptions are not yet
handled without a daemon restart.
* SNMP monitoring output to integrate with general tools (nagios, ganglia)
* Convert XMPP confirmation message sends to use stomp queue instead of polling
* Convert xmppdaemon.php to IoManager?
* Convert Twitter status, friends import polling daemons to IoManager
* Clean up some error reporting and failure modes
* May need to adjust queue priorities for best perf in backlog/flood cases
Detailed code history available in my daemon-work branch:
http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/Enjit/README | 5 | ||||
-rw-r--r-- | plugins/Enjit/enjitqueuehandler.php | 91 | ||||
-rw-r--r-- | plugins/Facebook/FacebookPlugin.php | 46 | ||||
-rw-r--r--[-rwxr-xr-x] | plugins/Facebook/facebookqueuehandler.php | 52 | ||||
-rw-r--r-- | plugins/MemcachePlugin.php | 17 | ||||
-rw-r--r-- | plugins/TwitterBridge/TwitterBridgePlugin.php | 50 | ||||
-rw-r--r--[-rwxr-xr-x] | plugins/TwitterBridge/twitterqueuehandler.php (renamed from plugins/TwitterBridge/daemons/twitterqueuehandler.php) | 40 |
7 files changed, 152 insertions, 149 deletions
diff --git a/plugins/Enjit/README b/plugins/Enjit/README new file mode 100644 index 000000000..03f989490 --- /dev/null +++ b/plugins/Enjit/README @@ -0,0 +1,5 @@ +This doesn't seem to have been functional for a while; can't find other references +to the enjit configuration or transport enqueuing. Keeping it in case someone +wants to bring it up to date. + +-- brion vibber <brion@status.net> 2009-12-03 diff --git a/plugins/Enjit/enjitqueuehandler.php b/plugins/Enjit/enjitqueuehandler.php new file mode 100644 index 000000000..f0e706b92 --- /dev/null +++ b/plugins/Enjit/enjitqueuehandler.php @@ -0,0 +1,91 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for watching new notices and posting to enjit. + * @fixme is this actually being used/functional atm? + */ +class EnjitQueueHandler extends QueueHandler +{ + function transport() + { + return 'enjit'; + } + + function start() + { + $this->log(LOG_INFO, "Starting EnjitQueueHandler"); + $this->log(LOG_INFO, "Broadcasting to ".common_config('enjit', 'apiurl')); + return true; + } + + function handle_notice($notice) + { + + $profile = Profile::staticGet($notice->profile_id); + + $this->log(LOG_INFO, "Posting Notice ".$notice->id." from ".$profile->nickname); + + if ( ! $notice->is_local ) { + $this->log(LOG_INFO, "Skipping remote notice"); + return "skipped"; + } + + # + # Build an Atom message from the notice + # + $noticeurl = common_local_url('shownotice', array('notice' => $notice->id)); + $msg = $profile->nickname . ': ' . $notice->content; + + $atom = "<entry xmlns='http://www.w3.org/2005/Atom'>\n"; + $atom .= "<apisource>".common_config('enjit','source')."</apisource>\n"; + $atom .= "<source>\n"; + $atom .= "<title>" . $profile->nickname . " - " . common_config('site', 'name') . "</title>\n"; + $atom .= "<link href='" . $profile->profileurl . "'/>\n"; + $atom .= "<link rel='self' type='application/rss+xml' href='" . common_local_url('userrss', array('nickname' => $profile->nickname)) . "'/>\n"; + $atom .= "<author><name>" . $profile->nickname . "</name></author>\n"; + $atom .= "<icon>" . $profile->avatarUrl(AVATAR_PROFILE_SIZE) . "</icon>\n"; + $atom .= "</source>\n"; + $atom .= "<title>" . htmlspecialchars($msg) . "</title>\n"; + $atom .= "<summary>" . htmlspecialchars($msg) . "</summary>\n"; + $atom .= "<link rel='alternate' href='" . $noticeurl . "' />\n"; + $atom .= "<id>". $notice->uri . "</id>\n"; + $atom .= "<published>".common_date_w3dtf($notice->created)."</published>\n"; + $atom .= "<updated>".common_date_w3dtf($notice->modified)."</updated>\n"; + $atom .= "</entry>\n"; + + $url = common_config('enjit', 'apiurl') . "/submit/". common_config('enjit','apikey'); + $data = array( + 'msg' => $atom, + ); + + # + # POST the message to $config['enjit']['apiurl'] + # + $request = HTTPClient::start(); + $response = $request->post($url, null, $data); + + return $response->isOk(); + } + +} diff --git a/plugins/Facebook/FacebookPlugin.php b/plugins/Facebook/FacebookPlugin.php index de91bf24a..4266b886d 100644 --- a/plugins/Facebook/FacebookPlugin.php +++ b/plugins/Facebook/FacebookPlugin.php @@ -114,6 +114,9 @@ class FacebookPlugin extends Plugin case 'FBCSettingsNav': include_once INSTALLDIR . '/plugins/Facebook/FBCSettingsNav.php'; return false; + case 'FacebookQueueHandler': + include_once INSTALLDIR . '/plugins/Facebook/facebookqueuehandler.php'; + return false; default: return true; } @@ -508,50 +511,15 @@ class FacebookPlugin extends Plugin } /** - * broadcast the message when not using queuehandler + * Register Facebook notice queue handler * - * @param Notice &$notice the notice - * @param array $queue destination queue + * @param QueueManager $manager * * @return boolean hook return */ - - function onUnqueueHandleNotice(&$notice, $queue) - { - if (($queue == 'facebook') && ($this->_isLocal($notice))) { - facebookBroadcastNotice($notice); - return false; - } - return true; - } - - /** - * Determine whether the notice was locally created - * - * @param Notice $notice the notice - * - * @return boolean locality - */ - - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } - - /** - * Add Facebook queuehandler to the list of daemons to start - * - * @param array $daemons the list fo daemons to run - * - * @return boolean hook return - * - */ - - function onGetValidDaemons($daemons) + function onEndInitializeQueueManager($manager) { - array_push($daemons, INSTALLDIR . - '/plugins/Facebook/facebookqueuehandler.php'); + $manager->connect('facebook', 'FacebookQueueHandler'); return true; } diff --git a/plugins/Facebook/facebookqueuehandler.php b/plugins/Facebook/facebookqueuehandler.php index e4ae7d4ee..1778690e5 100755..100644 --- a/plugins/Facebook/facebookqueuehandler.php +++ b/plugins/Facebook/facebookqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php <?php /* * StatusNet - the distributed open-source microblogging tool @@ -18,21 +17,9 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/../..')); +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<<END_OF_FACEBOOK_HELP -Daemon script for pushing new notices to Facebook. - - -i --id Identity (default none) - -END_OF_FACEBOOK_HELP; - -require_once INSTALLDIR . '/scripts/commandline.inc'; require_once INSTALLDIR . '/plugins/Facebook/facebookutil.php'; -require_once INSTALLDIR . '/lib/queuehandler.php'; class FacebookQueueHandler extends QueueHandler { @@ -41,33 +28,24 @@ class FacebookQueueHandler extends QueueHandler return 'facebook'; } - function start() - { - $this->log(LOG_INFO, "INITIALIZE"); - return true; - } - function handle_notice($notice) { - return facebookBroadcastNotice($notice); + if ($this->_isLocal($notice)) { + return facebookBroadcastNotice($notice); + } + return true; } - function finish() + /** + * Determine whether the notice was locally created + * + * @param Notice $notice the notice + * + * @return boolean locality + */ + function _isLocal($notice) { + return ($notice->is_local == Notice::LOCAL_PUBLIC || + $notice->is_local == Notice::LOCAL_NONPUBLIC); } - } - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new FacebookQueueHandler($id); - -$handler->runOnce(); diff --git a/plugins/MemcachePlugin.php b/plugins/MemcachePlugin.php index 5f93e9a83..fbc2802f7 100644 --- a/plugins/MemcachePlugin.php +++ b/plugins/MemcachePlugin.php @@ -133,6 +133,23 @@ class MemcachePlugin extends Plugin return false; } + function onStartCacheReconnect(&$success) + { + if (empty($this->_conn)) { + // nothing to do + return true; + } + if ($this->persistent) { + common_log(LOG_ERR, "Cannot close persistent memcached connection"); + $success = false; + } else { + common_log(LOG_INFO, "Closing memcached connection"); + $success = $this->_conn->close(); + $this->_conn = null; + } + return false; + } + /** * Ensure that a connection exists * diff --git a/plugins/TwitterBridge/TwitterBridgePlugin.php b/plugins/TwitterBridge/TwitterBridgePlugin.php index a87ee2894..57b3c1c99 100644 --- a/plugins/TwitterBridge/TwitterBridgePlugin.php +++ b/plugins/TwitterBridge/TwitterBridgePlugin.php @@ -112,7 +112,9 @@ class TwitterBridgePlugin extends Plugin strtolower(mb_substr($cls, 0, -6)) . '.php'; return false; case 'TwitterOAuthClient': - include_once INSTALLDIR . '/plugins/TwitterBridge/twitteroauthclient.php'; + case 'TwitterQueueHandler': + include_once INSTALLDIR . '/plugins/TwitterBridge/' . + strtolower($cls) . '.php'; return false; default: return true; @@ -139,48 +141,15 @@ class TwitterBridgePlugin extends Plugin } /** - * broadcast the message when not using queuehandler - * - * @param Notice &$notice the notice - * @param array $queue destination queue - * - * @return boolean hook return - */ - function onUnqueueHandleNotice(&$notice, $queue) - { - if (($queue == 'twitter') && ($this->_isLocal($notice))) { - broadcast_twitter($notice); - return false; - } - return true; - } - - /** - * Determine whether the notice was locally created - * - * @param Notice $notice - * - * @return boolean locality - */ - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } - - /** * Add Twitter bridge daemons to the list of daemons to start * * @param array $daemons the list fo daemons to run * * @return boolean hook return - * */ function onGetValidDaemons($daemons) { array_push($daemons, INSTALLDIR . - '/plugins/TwitterBridge/daemons/twitterqueuehandler.php'); - array_push($daemons, INSTALLDIR . '/plugins/TwitterBridge/daemons/synctwitterfriends.php'); if (common_config('twitterimport', 'enabled')) { @@ -191,6 +160,19 @@ class TwitterBridgePlugin extends Plugin return true; } + /** + * Register Twitter notice queue handler + * + * @param QueueManager $manager + * + * @return boolean hook return + */ + function onEndInitializeQueueManager($manager) + { + $manager->connect('twitter', 'TwitterQueueHandler'); + return true; + } + function onPluginVersion(&$versions) { $versions[] = array('name' => 'TwitterBridge', diff --git a/plugins/TwitterBridge/daemons/twitterqueuehandler.php b/plugins/TwitterBridge/twitterqueuehandler.php index f0e76bb74..5089ca7b7 100755..100644 --- a/plugins/TwitterBridge/daemons/twitterqueuehandler.php +++ b/plugins/TwitterBridge/twitterqueuehandler.php @@ -1,4 +1,3 @@ -#!/usr/bin/env php <?php /* * StatusNet - the distributed open-source microblogging tool @@ -18,20 +17,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..')); +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -$shortoptions = 'i::'; -$longoptions = array('id::'); - -$helptext = <<<END_OF_ENJIT_HELP -Daemon script for pushing new notices to Twitter. - - -i --id Identity (default none) - -END_OF_ENJIT_HELP; - -require_once INSTALLDIR . '/scripts/commandline.inc'; -require_once INSTALLDIR . '/lib/queuehandler.php'; require_once INSTALLDIR . '/plugins/TwitterBridge/twitter.php'; class TwitterQueueHandler extends QueueHandler @@ -41,33 +28,8 @@ class TwitterQueueHandler extends QueueHandler return 'twitter'; } - function start() - { - $this->log(LOG_INFO, "INITIALIZE"); - return true; - } - function handle_notice($notice) { return broadcast_twitter($notice); } - - function finish() - { - } - } - -if (have_option('i')) { - $id = get_option_value('i'); -} else if (have_option('--id')) { - $id = get_option_value('--id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$handler = new TwitterQueueHandler($id); - -$handler->runOnce(); |