diff options
Diffstat (limited to 'scripts')
-rwxr-xr-x | scripts/handlequeued.php | 2 | ||||
-rwxr-xr-x | scripts/queuedaemon.php | 149 | ||||
-rwxr-xr-x | scripts/xmppdaemon.php | 353 |
3 files changed, 53 insertions, 451 deletions
diff --git a/scripts/handlequeued.php b/scripts/handlequeued.php index 9031437aa..815884969 100755 --- a/scripts/handlequeued.php +++ b/scripts/handlequeued.php @@ -50,7 +50,7 @@ if (empty($notice)) { exit(1); } -if (!$handler->handle_notice($notice)) { +if (!$handler->handle($notice)) { print "Failed to handle notice id $noticeId on queue '$queue'.\n"; exit(1); } diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php index 162f617e0..a9cfda6d7 100755 --- a/scripts/queuedaemon.php +++ b/scripts/queuedaemon.php @@ -29,6 +29,8 @@ $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp- * * Recognizes Linux and Mac OS X; others will return default of 1. * + * @fixme move this to SpawningDaemon, but to get the default val for help + * text we seem to need it before loading infrastructure * @return intval */ function getProcessorCount() @@ -83,143 +85,29 @@ define('CLAIM_TIMEOUT', 1200); * We can then pass individual items through the QueueHandler subclasses * they belong to. */ -class QueueDaemon extends Daemon +class QueueDaemon extends SpawningDaemon { - protected $allsites; - protected $threads=1; + protected $allsites = false; function __construct($id=null, $daemonize=true, $threads=1, $allsites=false) { - parent::__construct($daemonize); - - if ($id) { - $this->set_id($id); - } + parent::__construct($id, $daemonize, $threads); $this->all = $allsites; - $this->threads = $threads; - } - - /** - * How many seconds a polling-based queue manager should wait between - * checks for new items to handle. - * - * Defaults to 60 seconds; override to speed up or slow down. - * - * @return int timeout in seconds - */ - function timeout() - { - return 60; - } - - function name() - { - return strtolower(get_class($this).'.'.$this->get_id()); - } - - function run() - { - if ($this->threads > 1) { - return $this->runThreads(); - } else { - return $this->runLoop(); - } - } - - function runThreads() - { - $children = array(); - for ($i = 1; $i <= $this->threads; $i++) { - $pid = pcntl_fork(); - if ($pid < 0) { - print "Couldn't fork for thread $i; aborting\n"; - exit(1); - } else if ($pid == 0) { - $this->runChild($i); - exit(0); - } else { - $this->log(LOG_INFO, "Spawned thread $i as pid $pid"); - $children[$i] = $pid; - } - } - - $this->log(LOG_INFO, "Waiting for children to complete."); - while (count($children) > 0) { - $status = null; - $pid = pcntl_wait($status); - if ($pid > 0) { - $i = array_search($pid, $children); - if ($i === false) { - $this->log(LOG_ERR, "Unrecognized child pid $pid exited!"); - continue; - } - unset($children[$i]); - $this->log(LOG_INFO, "Thread $i pid $pid exited."); - - $pid = pcntl_fork(); - if ($pid < 0) { - print "Couldn't fork to respawn thread $i; aborting thread.\n"; - } else if ($pid == 0) { - $this->runChild($i); - exit(0); - } else { - $this->log(LOG_INFO, "Respawned thread $i as pid $pid"); - $children[$i] = $pid; - } - } - } - $this->log(LOG_INFO, "All child processes complete."); - return true; - } - - function runChild($thread) - { - $this->set_id($this->get_id() . "." . $thread); - $this->resetDb(); - $this->runLoop(); - } - - /** - * Reconnect to the database for each child process, - * or they'll get very confused trying to use the - * same socket. - */ - function resetDb() - { - // @fixme do we need to explicitly open the db too - // or is this implied? - global $_DB_DATAOBJECT; - unset($_DB_DATAOBJECT['CONNECTIONS']); - - // Reconnect main memcached, or threads will stomp on - // each other and corrupt their requests. - $cache = common_memcache(); - if ($cache) { - $cache->reconnect(); - } - - // Also reconnect memcached for status_network table. - if (!empty(Status_network::$cache)) { - Status_network::$cache->close(); - Status_network::$cache = null; - } } /** * Setup and start of run loop for this queue handler as a daemon. * Most of the heavy lifting is passed on to the QueueManager's service() - * method, which passes control on to the QueueHandler's handle_notice() - * method for each notice that comes in on the queue. - * - * Most of the time this won't need to be overridden in a subclass. + * method, which passes control on to the QueueHandler's handle() + * method for each item that comes in on the queue. * * @return boolean true on success, false on failure */ - function runLoop() + function runThread() { $this->log(LOG_INFO, 'checking for queued notices'); - $master = new IoMaster($this->get_id()); + $master = new QueueMaster($this->get_id()); $master->init($this->all); $master->service(); @@ -229,10 +117,25 @@ class QueueDaemon extends Daemon return true; } +} - function log($level, $msg) +class QueueMaster extends IoMaster +{ + /** + * Initialize IoManagers for the currently configured site + * which are appropriate to this instance. + */ + function initManagers() { - common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg); + $classes = array(); + if (Event::handle('StartQueueDaemonIoManagers', array(&$classes))) { + $classes[] = 'QueueManager'; + } + Event::handle('EndQueueDaemonIoManagers', array(&$classes)); + + foreach ($classes as $class) { + $this->instantiate($class); + } } } diff --git a/scripts/xmppdaemon.php b/scripts/xmppdaemon.php index cef9c4bd0..fd7cf055b 100755 --- a/scripts/xmppdaemon.php +++ b/scripts/xmppdaemon.php @@ -33,347 +33,46 @@ END_OF_XMPP_HELP; require_once INSTALLDIR.'/scripts/commandline.inc'; -require_once INSTALLDIR . '/lib/common.php'; require_once INSTALLDIR . '/lib/jabber.php'; -require_once INSTALLDIR . '/lib/daemon.php'; -# This is kind of clunky; we create a class to call the global functions -# in jabber.php, which create a new XMPP class. A more elegant (?) solution -# might be to use make this a subclass of XMPP. - -class XMPPDaemon extends Daemon +class XMPPDaemon extends SpawningDaemon { - function __construct($resource=null, $daemonize=true) - { - parent::__construct($daemonize); - - static $attrs = array('server', 'port', 'user', 'password', 'host'); - - foreach ($attrs as $attr) - { - $this->$attr = common_config('xmpp', $attr); - } - - if ($resource) { - $this->resource = $resource . 'daemon'; - } else { - $this->resource = common_config('xmpp', 'resource') . 'daemon'; - } - - $this->jid = $this->user.'@'.$this->server.'/'.$this->resource; - - $this->log(LOG_INFO, "INITIALIZE XMPPDaemon {$this->jid}"); - } - - function connect() - { - $connect_to = ($this->host) ? $this->host : $this->server; - - $this->log(LOG_INFO, "Connecting to $connect_to on port $this->port"); - - $this->conn = jabber_connect($this->resource); - - if (!$this->conn) { - return false; - } - - $this->log(LOG_INFO, "Connected"); - - $this->conn->setReconnectTimeout(600); - - $this->log(LOG_INFO, "Sending initial presence."); - - jabber_send_presence("Send me a message to post a notice", 'available', - null, 'available', 100); - - $this->log(LOG_INFO, "Done connecting."); - - return !$this->conn->isDisconnected(); - } - - function name() - { - return strtolower('xmppdaemon.'.$this->resource); - } - - function run() + function __construct($id=null, $daemonize=true, $threads=1) { - if ($this->connect()) { - - $this->log(LOG_DEBUG, "Initializing stanza handlers."); - - $this->conn->addEventHandler('message', 'handle_message', $this); - $this->conn->addEventHandler('presence', 'handle_presence', $this); - $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this); - - $this->log(LOG_DEBUG, "Beginning processing loop."); - - while ($this->conn->processTime(60)) { - $this->sendPing(); - } + if ($threads != 1) { + // This should never happen. :) + throw new Exception("XMPPDaemon can must run single-threaded"); } + parent::__construct($id, $daemonize, $threads); } - function sendPing() + function runThread() { - if (!isset($this->pingid)) { - $this->pingid = 0; - } else { - $this->pingid++; - } + common_log(LOG_INFO, 'Waiting to listen to XMPP and queues'); - $this->log(LOG_DEBUG, "Sending ping #{$this->pingid}"); + $master = new XmppMaster($this->get_id()); + $master->init(); + $master->service(); - $this->conn->send("<iq from='{$this->jid}' to='{$this->server}' id='ping_{$this->pingid}' type='get'><ping xmlns='urn:xmpp:ping'/></iq>"); - } + common_log(LOG_INFO, 'terminating normally'); - function handle_reconnect(&$pl) - { - $this->log(LOG_DEBUG, "Got reconnection callback."); - $this->conn->processUntil('session_start'); - $this->log(LOG_DEBUG, "Sending reconnection presence."); - $this->conn->presence('Send me a message to post a notice', 'available', null, 'available', 100); - unset($pl['xml']); - $pl['xml'] = null; - - $pl = null; - unset($pl); - } - - function get_user($from) - { - $user = User::staticGet('jabber', jabber_normalize_jid($from)); - return $user; + return true; } - function handle_message(&$pl) - { - $from = jabber_normalize_jid($pl['from']); - - if ($pl['type'] != 'chat') { - $this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from."); - return; - } - - if (mb_strlen($pl['body']) == 0) { - $this->log(LOG_WARNING, "Ignoring message with empty body from $from."); - return; - } - - # Forwarded from another daemon (probably a broadcaster) for - # us to handle - - if ($this->is_self($from)) { - $this->log(LOG_INFO, "Got forwarded notice from self ($from)."); - $from = $this->get_ofrom($pl); - $this->log(LOG_INFO, "Originally sent by $from."); - if (is_null($from) || $this->is_self($from)) { - $this->log(LOG_INFO, "Ignoring notice originally sent by $from."); - return; - } - } - - $user = $this->get_user($from); - - // For common_current_user to work - global $_cur; - $_cur = $user; - - if (!$user) { - $this->from_site($from, 'Unknown user; go to ' . - common_local_url('imsettings') . - ' to add your address to your account'); - $this->log(LOG_WARNING, 'Message from unknown user ' . $from); - return; - } - if ($this->handle_command($user, $pl['body'])) { - $this->log(LOG_INFO, "Command message by $from handled."); - return; - } else if ($this->is_autoreply($pl['body'])) { - $this->log(LOG_INFO, 'Ignoring auto reply from ' . $from); - return; - } else if ($this->is_otr($pl['body'])) { - $this->log(LOG_INFO, 'Ignoring OTR from ' . $from); - return; - } else { - - $this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname); - - $this->add_notice($user, $pl); - } - - $user->free(); - unset($user); - unset($_cur); - - unset($pl['xml']); - $pl['xml'] = null; - - $pl = null; - unset($pl); - } - - function is_self($from) - { - return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from)); - } - - function get_ofrom($pl) - { - $xml = $pl['xml']; - $addresses = $xml->sub('addresses'); - if (!$addresses) { - $this->log(LOG_WARNING, 'Forwarded message without addresses'); - return null; - } - $address = $addresses->sub('address'); - if (!$address) { - $this->log(LOG_WARNING, 'Forwarded message without address'); - return null; - } - if (!array_key_exists('type', $address->attrs)) { - $this->log(LOG_WARNING, 'No type for forwarded message'); - return null; - } - $type = $address->attrs['type']; - if ($type != 'ofrom') { - $this->log(LOG_WARNING, 'Type of forwarded message is not ofrom'); - return null; - } - if (!array_key_exists('jid', $address->attrs)) { - $this->log(LOG_WARNING, 'No jid for forwarded message'); - return null; - } - $jid = $address->attrs['jid']; - if (!$jid) { - $this->log(LOG_WARNING, 'Could not get jid from address'); - return null; - } - $this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid); - return $jid; - } - - function is_autoreply($txt) - { - if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) { - return true; - } else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) { - return true; - } else { - return false; - } - } - - function is_otr($txt) - { - if (preg_match('/^\?OTR/', $txt)) { - return true; - } else { - return false; - } - } - - function from_site($address, $msg) - { - $text = '['.common_config('site', 'name') . '] ' . $msg; - jabber_send_message($address, $text); - } - - function handle_command($user, $body) - { - $inter = new CommandInterpreter(); - $cmd = $inter->handle_command($user, $body); - if ($cmd) { - $chan = new XMPPChannel($this->conn); - $cmd->execute($chan); - return true; - } else { - return false; - } - } - - function add_notice(&$user, &$pl) - { - $body = trim($pl['body']); - $content_shortened = common_shorten_links($body); - if (Notice::contentTooLong($content_shortened)) { - $from = jabber_normalize_jid($pl['from']); - $this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'), - Notice::maxContent(), - mb_strlen($content_shortened))); - return; - } - - try { - $notice = Notice::saveNew($user->id, $content_shortened, 'xmpp'); - } catch (Exception $e) { - $this->log(LOG_ERR, $e->getMessage()); - $this->from_site($user->jabber, $e->getMessage()); - return; - } - - common_broadcast_notice($notice); - $this->log(LOG_INFO, - 'Added notice ' . $notice->id . ' from user ' . $user->nickname); - $notice->free(); - unset($notice); - } - - function handle_presence(&$pl) - { - $from = jabber_normalize_jid($pl['from']); - switch ($pl['type']) { - case 'subscribe': - # We let anyone subscribe - $this->subscribed($from); - $this->log(LOG_INFO, - 'Accepted subscription from ' . $from); - break; - case 'subscribed': - case 'unsubscribed': - case 'unsubscribe': - $this->log(LOG_INFO, - 'Ignoring "' . $pl['type'] . '" from ' . $from); - break; - default: - if (!$pl['type']) { - $user = User::staticGet('jabber', $from); - if (!$user) { - $this->log(LOG_WARNING, 'Presence from unknown user ' . $from); - return; - } - if ($user->updatefrompresence) { - $this->log(LOG_INFO, 'Updating ' . $user->nickname . - ' status from presence.'); - $this->add_notice($user, $pl); - } - $user->free(); - unset($user); - } - break; - } - unset($pl['xml']); - $pl['xml'] = null; - - $pl = null; - unset($pl); - } - - function log($level, $msg) - { - $text = 'XMPPDaemon('.$this->resource.'): '.$msg; - common_log($level, $text); - if (!$this->daemonize) - { - $line = common_log_line($level, $text); - echo $line; - echo "\n"; - } - } +} - function subscribed($to) - { - jabber_special_presence('subscribed', $to); +class XmppMaster extends IoMaster +{ + /** + * Initialize IoManagers for the currently configured site + * which are appropriate to this instance. + */ + function initManagers() + { + // @fixme right now there's a hack in QueueManager to determine + // which queues to subscribe to based on the master class. + $this->instantiate('QueueManager'); + $this->instantiate('XmppManager'); } } |