diff options
author | Sarven Capadisli <csarven@status.net> | 2010-01-13 13:37:04 +0000 |
---|---|---|
committer | Sarven Capadisli <csarven@status.net> | 2010-01-13 13:37:04 +0000 |
commit | 543026b8d1411c7d71a6482d0649ab3523f489b8 (patch) | |
tree | b4e4ac6a8f89411faf7a0cfd11fbe63a4a344a03 /lib | |
parent | 21512910c1d882b62f921154209e6f3c4792f03b (diff) | |
parent | c26246c4c3eb7fb4002e7280c0421ed2fa96b767 (diff) |
Merge branch '0.9.x' of git@gitorious.org:statusnet/mainline into 0.9.x
Diffstat (limited to 'lib')
-rw-r--r-- | lib/api.php | 2 | ||||
-rw-r--r-- | lib/cache.php | 19 | ||||
-rw-r--r-- | lib/command.php | 45 | ||||
-rw-r--r-- | lib/common.php | 194 | ||||
-rw-r--r-- | lib/dbqueuemanager.php | 161 | ||||
-rw-r--r-- | lib/default.php | 2 | ||||
-rw-r--r-- | lib/event.php | 8 | ||||
-rw-r--r-- | lib/iomanager.php | 193 | ||||
-rw-r--r-- | lib/iomaster.php | 361 | ||||
-rw-r--r-- | lib/jabber.php | 120 | ||||
-rw-r--r-- | lib/jabberqueuehandler.php | 47 | ||||
-rw-r--r-- | lib/liberalstomp.php | 133 | ||||
-rw-r--r-- | lib/ombqueuehandler.php | 56 | ||||
-rw-r--r-- | lib/pingqueuehandler.php | 37 | ||||
-rw-r--r-- | lib/pluginqueuehandler.php | 50 | ||||
-rw-r--r-- | lib/publicqueuehandler.php | 47 | ||||
-rw-r--r-- | lib/queuehandler.php | 100 | ||||
-rw-r--r-- | lib/queuemanager.php | 149 | ||||
-rw-r--r-- | lib/queuemonitor.php | 116 | ||||
-rw-r--r-- | lib/smsqueuehandler.php | 39 | ||||
-rw-r--r-- | lib/statusnet.php | 298 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 286 | ||||
-rw-r--r-- | lib/subs.php | 88 | ||||
-rw-r--r-- | lib/unqueuemanager.php | 58 | ||||
-rw-r--r-- | lib/util.php | 25 | ||||
-rw-r--r-- | lib/xmppconfirmmanager.php | 168 | ||||
-rw-r--r-- | lib/xmppmanager.php | 273 | ||||
-rw-r--r-- | lib/xmppqueuehandler.php | 142 |
28 files changed, 2486 insertions, 731 deletions
diff --git a/lib/api.php b/lib/api.php index d21851d50..707e4ac21 100644 --- a/lib/api.php +++ b/lib/api.php @@ -168,7 +168,7 @@ class ApiAction extends Action $timezone = 'UTC'; - if ($user->timezone) { + if (!empty($user) && $user->timezone) { $timezone = $user->timezone; } diff --git a/lib/cache.php b/lib/cache.php index b7b34c050..635c96ad4 100644 --- a/lib/cache.php +++ b/lib/cache.php @@ -179,4 +179,23 @@ class Cache return $success; } + + /** + * Close or reconnect any remote connections, such as to give + * daemon processes a chance to reconnect on a fresh socket. + * + * @return boolean success flag + */ + + function reconnect() + { + $success = false; + + if (Event::handle('StartCacheReconnect', array(&$success))) { + $success = true; + Event::handle('EndCacheReconnect', array()); + } + + return $success; + } } diff --git a/lib/command.php b/lib/command.php index f846fb823..c0a32e1b1 100644 --- a/lib/command.php +++ b/lib/command.php @@ -222,18 +222,15 @@ class JoinCommand extends Command return; } - $member = new Group_member(); - - $member->group_id = $group->id; - $member->profile_id = $cur->id; - $member->created = common_sql_now(); - - $result = $member->insert(); - if (!$result) { - common_log_db_error($member, 'INSERT', __FILE__); - $channel->error($cur, sprintf(_('Could not join user %s to group %s'), - $cur->nickname, $group->nickname)); - return; + try { + if (Event::handle('StartJoinGroup', array($group, $cur))) { + Group_member::join($group->id, $cur->id); + Event::handle('EndJoinGroup', array($group, $cur)); + } + } catch (Exception $e) { + $channel->error($cur, sprintf(_('Could not join user %s to group %s'), + $cur->nickname, $group->nickname)); + return; } $channel->output($cur, sprintf(_('%s joined group %s'), @@ -269,21 +266,15 @@ class DropCommand extends Command return; } - $member = new Group_member(); - - $member->group_id = $group->id; - $member->profile_id = $cur->id; - - if (!$member->find(true)) { - $channel->error($cur,_('Could not find membership record.')); - return; - } - $result = $member->delete(); - if (!$result) { - common_log_db_error($member, 'INSERT', __FILE__); - $channel->error($cur, sprintf(_('Could not remove user %s to group %s'), - $cur->nickname, $group->nickname)); - return; + try { + if (Event::handle('StartLeaveGroup', array($group, $cur))) { + Group_member::leave($group->id, $cur->id); + Event::handle('EndLeaveGroup', array($group, $cur)); + } + } catch (Exception $e) { + $channel->error($cur, sprintf(_('Could not remove user %s to group %s'), + $cur->nickname, $group->nickname)); + return; } $channel->output($cur, sprintf(_('%s left group %s'), diff --git a/lib/common.php b/lib/common.php index b280afec0..61decebb7 100644 --- a/lib/common.php +++ b/lib/common.php @@ -76,159 +76,14 @@ require_once(INSTALLDIR.'/lib/language.php'); require_once(INSTALLDIR.'/lib/event.php'); require_once(INSTALLDIR.'/lib/plugin.php'); -function _sn_to_path($sn) -{ - $past_root = substr($sn, 1); - $last_slash = strrpos($past_root, '/'); - if ($last_slash > 0) { - $p = substr($past_root, 0, $last_slash); - } else { - $p = ''; - } - return $p; -} - -// Save our sanity when code gets loaded through subroutines such as PHPUnit tests -global $default, $config, $_server, $_path; - -// try to figure out where we are. $server and $path -// can be set by including module, else we guess based -// on HTTP info. - -if (isset($server)) { - $_server = $server; -} else { - $_server = array_key_exists('SERVER_NAME', $_SERVER) ? - strtolower($_SERVER['SERVER_NAME']) : - null; -} - -if (isset($path)) { - $_path = $path; -} else { - $_path = (array_key_exists('SERVER_NAME', $_SERVER) && array_key_exists('SCRIPT_NAME', $_SERVER)) ? - _sn_to_path($_SERVER['SCRIPT_NAME']) : - null; -} - -require_once(INSTALLDIR.'/lib/default.php'); - -// Set config values initially to default values - -$config = $default; - -// default configuration, overwritten in config.php - -$config['db'] = &PEAR::getStaticProperty('DB_DataObject','options'); - -$config['db'] = $default['db']; - -// Backward compatibility - -$config['site']['design'] =& $config['design']; - -if (function_exists('date_default_timezone_set')) { - /* Work internally in UTC */ - date_default_timezone_set('UTC'); -} - function addPlugin($name, $attrs = null) { - $name = ucfirst($name); - $pluginclass = "{$name}Plugin"; - - if (!class_exists($pluginclass)) { - - $files = array("local/plugins/{$pluginclass}.php", - "local/plugins/{$name}/{$pluginclass}.php", - "local/{$pluginclass}.php", - "local/{$name}/{$pluginclass}.php", - "plugins/{$pluginclass}.php", - "plugins/{$name}/{$pluginclass}.php"); - - foreach ($files as $file) { - $fullpath = INSTALLDIR.'/'.$file; - if (@file_exists($fullpath)) { - include_once($fullpath); - break; - } - } - } - - $inst = new $pluginclass(); - - if (!empty($attrs)) { - foreach ($attrs as $aname => $avalue) { - $inst->$aname = $avalue; - } - } - return $inst; -} - -// From most general to most specific: -// server-wide, then vhost-wide, then for a path, -// finally for a dir (usually only need one of the last two). - -if (isset($conffile)) { - $_config_files = array($conffile); -} else { - $_config_files = array('/etc/statusnet/statusnet.php', - '/etc/statusnet/laconica.php', - '/etc/laconica/laconica.php', - '/etc/statusnet/'.$_server.'.php', - '/etc/laconica/'.$_server.'.php'); - - if (strlen($_path) > 0) { - $_config_files[] = '/etc/statusnet/'.$_server.'_'.$_path.'.php'; - $_config_files[] = '/etc/laconica/'.$_server.'_'.$_path.'.php'; - } - - $_config_files[] = INSTALLDIR.'/config.php'; -} - -global $_have_a_config; -$_have_a_config = false; - -foreach ($_config_files as $_config_file) { - if (@file_exists($_config_file)) { - include_once($_config_file); - $_have_a_config = true; - } + return StatusNet::addPlugin($name, $attrs); } function _have_config() { - global $_have_a_config; - return $_have_a_config; -} - -// XXX: Throw a conniption if database not installed -// XXX: Find a way to use htmlwriter for this instead of handcoded markup -if (!_have_config()) { - echo '<p>'. _('No configuration file found. ') .'</p>'; - echo '<p>'. _('I looked for configuration files in the following places: ') .'<br /> '. implode($_config_files, '<br />'); - echo '<p>'. _('You may wish to run the installer to fix this.') .'</p>'; - echo '<a href="install.php">'. _('Go to the installer.') .'</a>'; - exit; -} -// Fixup for statusnet.ini - -$_db_name = substr($config['db']['database'], strrpos($config['db']['database'], '/') + 1); - -if ($_db_name != 'statusnet' && !array_key_exists('ini_'.$_db_name, $config['db'])) { - $config['db']['ini_'.$_db_name] = INSTALLDIR.'/classes/statusnet.ini'; -} - -// Backwards compatibility - -if (array_key_exists('memcached', $config)) { - if ($config['memcached']['enabled']) { - addPlugin('Memcache', array('servers' => $config['memcached']['server'])); - } - - if (!empty($config['memcached']['base'])) { - $config['cache']['base'] = $config['memcached']['base']; - } + return StatusNet::haveConfig(); } function __autoload($cls) @@ -247,27 +102,6 @@ function __autoload($cls) } } -// Load default plugins - -foreach ($config['plugins']['default'] as $name => $params) { - if (is_null($params)) { - addPlugin($name); - } else if (is_array($params)) { - if (count($params) == 0) { - addPlugin($name); - } else { - $keys = array_keys($params); - if (is_string($keys[0])) { - addPlugin($name, $params); - } else { - foreach ($params as $paramset) { - addPlugin($name, $paramset); - } - } - } - } -} - // XXX: how many of these could be auto-loaded on use? // XXX: note that these files should not use config options // at compile time since DB config options are not yet loaded. @@ -283,20 +117,20 @@ require_once INSTALLDIR.'/lib/subs.php'; require_once INSTALLDIR.'/lib/clientexception.php'; require_once INSTALLDIR.'/lib/serverexception.php'; -// Load settings from database; note we need autoload for this - -Config::loadSettings(); - -// XXX: if plugins should check the schema at runtime, do that here. - -if ($config['db']['schemacheck'] == 'runtime') { - Event::handle('CheckSchema'); +try { + StatusNet::init(@$server, @$path, @$conffile); +} catch (NoConfigException $e) { + // XXX: Throw a conniption if database not installed + // XXX: Find a way to use htmlwriter for this instead of handcoded markup + echo '<p>'. _('No configuration file found. ') .'</p>'; + echo '<p>'. _('I looked for configuration files in the following places: ') .'<br/> '; + echo implode($e->configFiles, '<br/>'); + echo '<p>'. _('You may wish to run the installer to fix this.') .'</p>'; + echo '<a href="install.php">'. _('Go to the installer.') .'</a>'; + exit; } + // XXX: other formats here define('NICKNAME_FMT', VALIDATE_NUM.VALIDATE_ALPHA_LOWER); - -// Give plugins a chance to initialize in a fully-prepared environment - -Event::handle('InitializePlugin'); diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 750300928..a5c6fd28b 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -22,16 +22,20 @@ * @category QueueManager * @package StatusNet * @author Evan Prodromou <evan@status.net> - * @copyright 2009 StatusNet, Inc. + * @author Brion Vibber <brion@status.net> + * @copyright 2009-2010 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ class DBQueueManager extends QueueManager { - var $qis = array(); - - function enqueue($object, $queue) + /** + * Saves a notice object reference into the queue item table. + * @return boolean true on success + * @throws ServerException on failure + */ + public function enqueue($object, $queue) { $notice = $object; @@ -47,70 +51,95 @@ class DBQueueManager extends QueueManager throw new ServerException('DB error inserting queue item'); } + $this->stats('enqueued', $queue); + return true; } - function service($queue, $handler) + /** + * Poll every minute for new events during idle periods. + * We'll look in more often when there's data available. + * + * @return int seconds + */ + public function pollInterval() + { + return 60; + } + + /** + * Run a polling cycle during idle processing in the input loop. + * @return boolean true if we had a hit + */ + public function poll() { - while (true) { - $this->_log(LOG_DEBUG, 'Checking for notices...'); - $timeout = $handler->timeout(); - $notice = $this->_nextItem($queue, $timeout); - if (empty($notice)) { - $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); - // Nothing in the queue. Do you - // have other tasks, like servicing your - // XMPP connection, to do? - $handler->idle(QUEUE_HANDLER_MISS_IDLE); + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $item = $this->_nextItem(); + if ($item === false) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + return false; + } + if ($item === true) { + // We dequeued an entry for a deleted or invalid notice. + // Consider it a hit for poll rate purposes. + return true; + } + + list($queue, $notice) = $item; + $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue); + + // Yay! Got one! + $handler = $this->getHandler($queue); + if ($handler) { + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice"); + $this->_done($notice, $queue); } else { - $this->_log(LOG_INFO, 'Got notice '. $notice->id); - // Yay! Got one! - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); - $this->_done($notice, $queue); - } else { - $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); - $this->_fail($notice, $queue); - } - // Chance to e.g. service your XMPP connection - $this->_log(LOG_DEBUG, 'Idling after success.'); - $handler->idle(QUEUE_HANDLER_HIT_IDLE); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice"); + $this->_fail($notice, $queue); } - // XXX: when do we give up? + } else { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue"); + $this->_fail($notice, $queue); } + return true; } - function _nextItem($queue, $timeout=null) + /** + * Pop the oldest unclaimed item off the queue set and claim it. + * + * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice) + * giving the queue transport name. + */ + protected function _nextItem() { $start = time(); $result = null; - $sleeptime = 1; + $qi = Queue_item::top(); + if (empty($qi)) { + return false; + } - do { - $qi = Queue_item::top($queue); - if (empty($qi)) { - $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds."); - sleep($sleeptime); - $sleeptime *= 2; - } else { - $notice = Notice::staticGet('id', $qi->notice_id); - if (!empty($notice)) { - $result = $notice; - } else { - $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); - $qi->delete(); - $qi->free(); - $qi = null; - } - $sleeptime = 1; - } - } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + $queue = $qi->transport; + $notice = Notice::staticGet('id', $qi->notice_id); + if (empty($notice)) { + $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice"); + $qi->delete(); + return true; + } - return $result; + $result = $notice; + return array($queue, $notice); } - function _done($object, $queue) + /** + * Delete our claimed item from the queue after successful processing. + * + * @param Notice $object + * @param string $queue + */ + protected function _done($object, $queue) { // XXX: right now, we only handle notices @@ -120,24 +149,29 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. - 'for '.$notice->id.', queue '.$queue); + $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item"); } $qi->delete(); $qi->free(); - $qi = null; } - $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item"); + $this->stats('handled', $queue); $notice->free(); - $notice = null; } - function _fail($object, $queue) + /** + * Free our claimed queue item for later reprocessing in case of + * temporary failure. + * + * @param Notice $object + * @param string $queue + */ + protected function _fail($object, $queue) { // XXX: right now, we only handle notices @@ -147,11 +181,10 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); } else { if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. - 'for '.$notice->id.', queue '.$queue); + $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item"); } else { $orig = clone($qi); $qi->claimed = null; @@ -160,13 +193,13 @@ class DBQueueManager extends QueueManager } } - $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item"); + $this->stats('error', $queue); $notice->free(); - $notice = null; } - function _log($level, $msg) + protected function _log($level, $msg) { common_log($level, 'DBQueueManager: '.$msg); } diff --git a/lib/default.php b/lib/default.php index fa862f3ff..f7f4777a2 100644 --- a/lib/default.php +++ b/lib/default.php @@ -79,6 +79,8 @@ $default = 'queue_basename' => '/queue/statusnet/', 'stomp_username' => null, 'stomp_password' => null, + 'monitor' => null, // URL to monitor ping endpoint (work in progress) + 'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully ), 'license' => array('url' => 'http://creativecommons.org/licenses/by/3.0/', diff --git a/lib/event.php b/lib/event.php index 4819b71b4..41fb53ffe 100644 --- a/lib/event.php +++ b/lib/event.php @@ -138,4 +138,12 @@ class Event { } return false; } + + /** + * Disables any and all handlers that have been set up so far; + * use only if you know it's safe to reinitialize all plugins. + */ + public static function clearHandlers() { + Event::$_handlers = array(); + } } diff --git a/lib/iomanager.php b/lib/iomanager.php new file mode 100644 index 000000000..ee2ff958b --- /dev/null +++ b/lib/iomanager.php @@ -0,0 +1,193 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * Abstract class for i/o managers + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package StatusNet + * @author Evan Prodromou <evan@status.net> + * @author Sarven Capadisli <csarven@status.net> + * @author Brion Vibber <brion@status.net> + * @copyright 2009-2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +abstract class IoManager +{ + const SINGLE_ONLY = 0; + const INSTANCE_PER_SITE = 1; + const INSTANCE_PER_PROCESS = 2; + + /** + * Factory function to get an appropriate subclass. + */ + public abstract static function get(); + + /** + * Tell the i/o queue master if and how we can handle multi-site + * processes. + * + * Return one of: + * IoManager::SINGLE_ONLY + * IoManager::INSTANCE_PER_SITE + * IoManager::INSTANCE_PER_PROCESS + */ + public static function multiSite() + { + return IoManager::SINGLE_ONLY; + } + + /** + * If in a multisite configuration, the i/o master will tell + * your manager about each site you'll have to handle so you + * can do any necessary per-site setup. + * + * @param string $site target site server name + */ + public function addSite($site) + { + /* no-op */ + } + + /** + * This method is called when data is available on one of your + * i/o manager's sockets. The socket with data is passed in, + * in case you have multiple sockets. + * + * If your i/o manager is based on polling during idle processing, + * you don't need to implement this. + * + * @param resource $socket + * @return boolean true on success, false on failure + */ + public function handleInput($socket) + { + return true; + } + + /** + * Return any open sockets that the run loop should listen + * for input on. If input comes in on a listed socket, + * the matching manager's handleInput method will be called. + * + * @return array of resources + */ + function getSockets() + { + return array(); + } + + /** + * Maximum planned time between poll() calls when input isn't waiting. + * Actual time may vary! + * + * When we get a polling hit, the timeout will be cut down to 0 while + * input is coming in, then will back off to this amount if no further + * input shows up. + * + * By default polling is disabled; you must override this to enable + * polling for this manager. + * + * @return int max poll interval in seconds, or 0 to disable polling + */ + function pollInterval() + { + return 0; + } + + /** + * Request a maximum timeout for listeners before the next idle period. + * Actual wait may be shorter, so don't go crazy in your idle()! + * Wait could be longer if other handlers performed some slow activity. + * + * Return 0 to request that listeners return immediately if there's no + * i/o and speed up the idle as much as possible; but don't do that all + * the time as this will burn CPU. + * + * @return int seconds + */ + function timeout() + { + return 60; + } + + /** + * Called by IoManager after each handled item or empty polling cycle. + * This is a good time to e.g. service your XMPP connection. + * + * Doesn't need to be overridden if there's no maintenance to do. + */ + function idle() + { + return true; + } + + /** + * The meat of a polling manager... check for something to do + * and do it! Note that you should not take too long, as other + * i/o managers may need to do some work too! + * + * On a successful hit, the next poll() call will come as soon + * as possible followed by exponential backoff up to pollInterval() + * if no more data is available. + * + * @return boolean true if events were hit + */ + public function poll() + { + return false; + } + + /** + * Initialization, run when the queue manager starts. + * If this function indicates failure, the handler run will be aborted. + * + * @param IoMaster $master process/event controller + * @return boolean true on success, false on failure + */ + public function start($master) + { + $this->master = $master; + return true; + } + + /** + * Cleanup, run when the queue manager ends. + * If this function indicates failure, a warning will be logged. + * + * @return boolean true on success, false on failure + */ + public function finish() + { + return true; + } + + /** + * Ping iomaster's queue status monitor with a stats update. + * Only valid during input loop! + * + * @param string $counter keyword for counter to increment + */ + public function stats($counter, $owners=array()) + { + $this->master->stats($counter, $owners); + } +} + diff --git a/lib/iomaster.php b/lib/iomaster.php new file mode 100644 index 000000000..aff5b145c --- /dev/null +++ b/lib/iomaster.php @@ -0,0 +1,361 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * I/O manager to wrap around socket-reading and polling queue & connection managers. + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package StatusNet + * @author Brion Vibber <brion@status.net> + * @copyright 2009 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class IoMaster +{ + public $id; + + protected $multiSite = false; + protected $managers = array(); + protected $singletons = array(); + + protected $pollTimeouts = array(); + protected $lastPoll = array(); + + /** + * @param string $id process ID to use in logging/monitoring + */ + public function __construct($id) + { + $this->id = $id; + $this->monitor = new QueueMonitor(); + } + + public function init($multiSite=null) + { + if ($multiSite !== null) { + $this->multiSite = $multiSite; + } + if ($this->multiSite) { + $this->sites = $this->findAllSites(); + } else { + $this->sites = array(common_config('site', 'server')); + } + + if (empty($this->sites)) { + throw new Exception("Empty status_network table, cannot init"); + } + + foreach ($this->sites as $site) { + if ($site != common_config('site', 'server')) { + StatusNet::init($site); + } + + $classes = array(); + if (Event::handle('StartIoManagerClasses', array(&$classes))) { + $classes[] = 'QueueManager'; + if (common_config('xmpp', 'enabled')) { + $classes[] = 'XmppManager'; // handles pings/reconnects + $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations + } + } + Event::handle('EndIoManagerClasses', array(&$classes)); + + foreach ($classes as $class) { + $this->instantiate($class); + } + } + } + + /** + * Pull all local sites from status_network table. + * @return array of hostnames + */ + protected function findAllSites() + { + $hosts = array(); + $sn = new Status_network(); + $sn->find(); + while ($sn->fetch()) { + $hosts[] = $sn->hostname; + } + return $hosts; + } + + /** + * Instantiate an i/o manager class for the current site. + * If a multi-site capable handler is already present, + * we don't need to build a new one. + * + * @param string $class + */ + protected function instantiate($class) + { + if (isset($this->singletons[$class])) { + // Already instantiated a multi-site-capable handler. + // Just let it know it should listen to this site too! + $this->singletons[$class]->addSite(common_config('site', 'server')); + return; + } + + $manager = $this->getManager($class); + + if ($this->multiSite) { + $caps = $manager->multiSite(); + if ($caps == IoManager::SINGLE_ONLY) { + throw new Exception("$class can't run with --all; aborting."); + } + if ($caps == IoManager::INSTANCE_PER_PROCESS) { + // Save this guy for later! + // We'll only need the one to cover multiple sites. + $this->singletons[$class] = $manager; + $manager->addSite(common_config('site', 'server')); + } + } + + $this->managers[] = $manager; + } + + protected function getManager($class) + { + return call_user_func(array($class, 'get')); + } + + /** + * Basic run loop... + * + * Initialize all io managers, then sit around waiting for input. + * Between events or timeouts, pass control back to idle() method + * to allow for any additional background processing. + */ + function service() + { + $this->logState('init'); + $this->start(); + + while (true) { + $timeouts = array_values($this->pollTimeouts); + $timeouts[] = 60; // default max timeout + + // Wait for something on one of our sockets + $sockets = array(); + $managers = array(); + foreach ($this->managers as $manager) { + foreach ($manager->getSockets() as $socket) { + $sockets[] = $socket; + $managers[] = $manager; + } + $timeouts[] = intval($manager->timeout()); + } + + $timeout = min($timeouts); + if ($sockets) { + $read = $sockets; + $write = array(); + $except = array(); + $this->logState('listening'); + common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data..."); + $ready = stream_select($read, $write, $except, $timeout, 0); + + if ($ready === false) { + common_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + foreach ($read as $socket) { + $index = array_search($socket, $sockets, true); + if ($index !== false) { + $this->logState('queue'); + $managers[$index]->handleInput($socket); + } else { + common_log(LOG_ERR, "Saw input on a socket we didn't listen to"); + } + } + } + } + + if ($timeout > 0 && empty($sockets)) { + // If we had no listeners, sleep until the pollers' next requested wakeup. + common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle..."); + $this->logState('sleep'); + sleep($timeout); + } + + $this->logState('poll'); + $this->poll(); + + $this->logState('idle'); + $this->idle(); + + $memoryLimit = $this->softMemoryLimit(); + if ($memoryLimit > 0) { + $usage = memory_get_usage(); + if ($usage > $memoryLimit) { + common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); + break; + } + } + } + + $this->logState('shutdown'); + $this->finish(); + } + + /** + * Return fully-parsed soft memory limit in bytes. + * @return intval 0 or -1 if not set + */ + function softMemoryLimit() + { + $softLimit = trim(common_config('queue', 'softlimit')); + if (substr($softLimit, -1) == '%') { + $limit = trim(ini_get('memory_limit')); + $limit = $this->parseMemoryLimit($limit); + if ($limit > 0) { + return intval(substr($softLimit, 0, -1) * $limit / 100); + } else { + return -1; + } + } else { + return $this->parseMemoryLimit($limit); + } + return $softLimit; + } + + /** + * Interpret PHP shorthand for memory_limit and friends. + * Why don't they just expose the actual numeric value? :P + * @param string $mem + * @return int + */ + protected function parseMemoryLimit($mem) + { + // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes + $size = array('k' => 1024, + 'm' => 1024*1024, + 'g' => 1024*1024*1024); + if (empty($mem)) { + return 0; + } else if (is_numeric($mem)) { + return intval($mem); + } else { + $mult = strtolower(substr($mem, -1)); + if (isset($size[$mult])) { + return substr($mem, 0, -1) * $size[$mult]; + } else { + return intval($mem); + } + } + } + + function start() + { + foreach ($this->managers as $index => $manager) { + $manager->start($this); + // @fixme error check + if ($manager->pollInterval()) { + // We'll want to check for input on the first pass + $this->pollTimeouts[$index] = 0; + $this->lastPoll[$index] = 0; + } + } + } + + function finish() + { + foreach ($this->managers as $manager) { + $manager->finish(); + // @fixme error check + } + } + + /** + * Called during the idle portion of the runloop to see which handlers + */ + function poll() + { + foreach ($this->managers as $index => $manager) { + $interval = $manager->pollInterval(); + if ($interval <= 0) { + // Not a polling manager. + continue; + } + + if (isset($this->pollTimeouts[$index])) { + $timeout = $this->pollTimeouts[$index]; + if (time() - $this->lastPoll[$index] < $timeout) { + // Not time to poll yet. + continue; + } + } else { + $timeout = 0; + } + $hit = $manager->poll(); + + $this->lastPoll[$index] = time(); + if ($hit) { + // Do the next poll quickly, there may be more input! + $this->pollTimeouts[$index] = 0; + } else { + // Empty queue. Exponential backoff up to the maximum poll interval. + if ($timeout > 0) { + $timeout = min($timeout * 2, $interval); + } else { + $timeout = 1; + } + $this->pollTimeouts[$index] = $timeout; + } + } + } + + /** + * Called after each handled item or empty polling cycle. + * This is a good time to e.g. service your XMPP connection. + */ + function idle() + { + foreach ($this->managers as $manager) { + $manager->idle(); + } + } + + /** + * Send thread state update to the monitoring server, if configured. + * + * @param string $state ('init', 'queue', 'shutdown' etc) + * @param string $substate (optional, eg queue name 'omb' 'sms' etc) + */ + protected function logState($state, $substate='') + { + $this->monitor->logState($this->id, $state, $substate); + } + + /** + * Send thread stats. + * Thread ID will be implicit; other owners can be listed as well + * for per-queue and per-site records. + * + * @param string $key counter name + * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' + */ + public function stats($key, $owners=array()) + { + $owners[] = "thread:" . $this->id; + $this->monitor->stats($key, $owners); + } +} + diff --git a/lib/jabber.php b/lib/jabber.php index a821856a8..69f0d3570 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -86,7 +86,11 @@ class Sharing_XMPP extends XMPPHP_XMPP } /** - * connect the configured Jabber account to the configured server + * Lazy-connect the configured Jabber account to the configured server; + * if already opened, the same connection will be returned. + * + * In a multi-site background process, each site configuration + * will get its own connection. * * @param string $resource Resource to connect (defaults to configured resource) * @@ -95,16 +99,19 @@ class Sharing_XMPP extends XMPPHP_XMPP function jabber_connect($resource=null) { - static $conn = null; - if (!$conn) { + static $connections = array(); + $site = common_config('site', 'server'); + if (empty($connections[$site])) { + if (empty($resource)) { + $resource = common_config('xmpp', 'resource'); + } $conn = new Sharing_XMPP(common_config('xmpp', 'host') ? common_config('xmpp', 'host') : common_config('xmpp', 'server'), common_config('xmpp', 'port'), common_config('xmpp', 'user'), common_config('xmpp', 'password'), - ($resource) ? $resource : - common_config('xmpp', 'resource'), + $resource, common_config('xmpp', 'server'), common_config('xmpp', 'debug') ? true : false, @@ -115,12 +122,16 @@ function jabber_connect($resource=null) if (!$conn) { return false; } + $connections[$site] = $conn; $conn->autoSubscribe(); $conn->useEncryption(common_config('xmpp', 'encryption')); try { - $conn->connect(true); // true = persistent connection + common_log(LOG_INFO, __METHOD__ . ": connecting " . + common_config('xmpp', 'user') . '/' . $resource); + //$conn->connect(true); // true = persistent connection + $conn->connect(); // persistent connections break multisite } catch (XMPPHP_Exception $e) { common_log(LOG_ERR, $e->getMessage()); return false; @@ -128,7 +139,7 @@ function jabber_connect($resource=null) $conn->processUntil('session_start'); } - return $conn; + return $connections[$site]; } /** @@ -345,77 +356,42 @@ function jabber_broadcast_notice($notice) $conn = jabber_connect(); - // First, get users to whom this is a direct reply - $user = new User(); - $UT = common_config('db','type')=='pgsql'?'"user"':'user'; - $user->query("SELECT $UT.id, $UT.jabber " . - "FROM $UT JOIN reply ON $UT.id = reply.profile_id " . - 'WHERE reply.notice_id = ' . $notice->id . ' ' . - "AND $UT.jabber is not null " . - "AND $UT.jabbernotify = 1 " . - "AND $UT.jabberreplies = 1 "); - - while ($user->fetch()) { + $ni = $notice->whoGets(); + + foreach ($ni as $user_id => $reason) { + $user = User::staticGet('user_id', $user_id); + if (empty($user) || + empty($user->jabber) || + !$user->jabbernotify) { + // either not a local user, or just not found + continue; + } + switch ($reason) { + case NOTICE_INBOX_SOURCE_REPLY: + if (!$user->jabberreplies) { + continue; + } + break; + case NOTICE_INBOX_SOURCE_SUB: + $sub = Subscription::pkeyGet(array('subscriber' => $user->id, + 'subscribed' => $notice->profile_id)); + if (empty($sub) || !$sub->jabber) { + continue; + } + break; + case NOTICE_INBOX_SOURCE_GROUP: + break; + default: + throw new Exception(_("Unknown inbox source.")); + } + common_log(LOG_INFO, - 'Sending reply notice ' . $notice->id . ' to ' . $user->jabber, + 'Sending notice ' . $notice->id . ' to ' . $user->jabber, __FILE__); $conn->message($user->jabber, $msg, 'chat', null, $entry); $conn->processTime(0); - $sent_to[$user->id] = 1; } - $user->free(); - - // Now, get users subscribed to this profile - - $user = new User(); - $user->query("SELECT $UT.id, $UT.jabber " . - "FROM $UT JOIN subscription " . - "ON $UT.id = subscription.subscriber " . - 'WHERE subscription.subscribed = ' . $notice->profile_id . ' ' . - "AND $UT.jabber is not null " . - "AND $UT.jabbernotify = 1 " . - 'AND subscription.jabber = 1 '); - - while ($user->fetch()) { - if (!array_key_exists($user->id, $sent_to)) { - common_log(LOG_INFO, - 'Sending notice ' . $notice->id . ' to ' . $user->jabber, - __FILE__); - $conn->message($user->jabber, $msg, 'chat', null, $entry); - // To keep the incoming queue from filling up, - // we service it after each send. - $conn->processTime(0); - $sent_to[$user->id] = 1; - } - } - - // Now, get users who have it in their inbox because of groups - - $user = new User(); - $user->query("SELECT $UT.id, $UT.jabber " . - "FROM $UT JOIN notice_inbox " . - "ON $UT.id = notice_inbox.user_id " . - 'WHERE notice_inbox.notice_id = ' . $notice->id . ' ' . - 'AND notice_inbox.source = 2 ' . - "AND $UT.jabber is not null " . - "AND $UT.jabbernotify = 1 "); - - while ($user->fetch()) { - if (!array_key_exists($user->id, $sent_to)) { - common_log(LOG_INFO, - 'Sending notice ' . $notice->id . ' to ' . $user->jabber, - __FILE__); - $conn->message($user->jabber, $msg, 'chat', null, $entry); - // To keep the incoming queue from filling up, - // we service it after each send. - $conn->processTime(0); - $sent_to[$user->id] = 1; - } - } - - $user->free(); - return true; } diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php new file mode 100644 index 000000000..b1518866d --- /dev/null +++ b/lib/jabberqueuehandler.php @@ -0,0 +1,47 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for pushing new notices to Jabber users. + * @fixme this exception handling doesn't look very good. + */ +class JabberQueueHandler extends QueueHandler +{ + var $conn = null; + + function transport() + { + return 'jabber'; + } + + function handle_notice($notice) + { + require_once(INSTALLDIR.'/lib/jabber.php'); + try { + return jabber_broadcast_notice($notice); + } catch (XMPPHP_Exception $e) { + $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + exit(1); + } + } +} diff --git a/lib/liberalstomp.php b/lib/liberalstomp.php new file mode 100644 index 000000000..c9233843a --- /dev/null +++ b/lib/liberalstomp.php @@ -0,0 +1,133 @@ +<?php + +/** + * Based on code from Stomp PHP library, working around bugs in the base class. + * + * Original code is copyright 2005-2006 The Apache Software Foundation + * Modifications copyright 2009 StatusNet Inc by Brion Vibber <brion@status.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class LiberalStomp extends Stomp +{ + /** + * We need to be able to get the socket so advanced daemons can + * do a select() waiting for input both from the queue and from + * other sources such as an XMPP connection. + * + * @return resource + */ + function getSocket() + { + return $this->_socket; + } + + /** + * Make socket connection to the server + * We also set the stream to non-blocking mode, since we'll be + * select'ing to wait for updates. In blocking mode it seems + * to get confused sometimes. + * + * @throws StompException + */ + protected function _makeConnection () + { + parent::_makeConnection(); + stream_set_blocking($this->_socket, 0); + } + + /** + * Version 1.0.0 of the Stomp library gets confused if messages + * come in too fast over the connection. This version will read + * out as many frames as are ready to be read from the socket. + * + * Modified from Stomp::readFrame() + * + * @return StompFrame False when no frame to read + */ + public function readFrames () + { + if (!$this->hasFrameToRead()) { + return false; + } + + $rb = 1024; + $data = ''; + $end = false; + $frames = array(); + + do { + // @fixme this sometimes hangs in blocking mode... + // shouldn't we have been idle until we found there's more data? + $read = fread($this->_socket, $rb); + if ($read === false) { + $this->_reconnect(); + // @fixme this will lose prior items + return $this->readFrames(); + } + $data .= $read; + if (strpos($data, "\x00") !== false) { + // Frames are null-delimited, but some servers + // may append an extra \n according to old bug reports. + $data = str_replace("\x00\n", "\x00", $data); + $chunks = explode("\x00", $data); + + $data = array_pop($chunks); + $frames = array_merge($frames, $chunks); + if ($data == '') { + // We're at the end of a frame; stop reading. + break; + } else { + // In the middle of a frame; keep going. + } + } + // @fixme find out why this len < 2 check was there + //$len = strlen($data); + } while (true);//$len < 2 || $end == false); + + return array_map(array($this, 'parseFrame'), $frames); + } + + /** + * Parse a raw Stomp frame into an object. + * Extracted from Stomp::readFrame() + * + * @param string $data + * @return StompFrame + */ + function parseFrame($data) + { + list ($header, $body) = explode("\n\n", $data, 2); + $header = explode("\n", $header); + $headers = array(); + $command = null; + foreach ($header as $v) { + if (isset($command)) { + list ($name, $value) = explode(':', $v, 2); + $headers[$name] = $value; + } else { + $command = $v; + } + } + $frame = new StompFrame($command, $headers, trim($body)); + if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') { + require_once 'Stomp/Message/Map.php'; + return new StompMessageMap($frame); + } else { + return $frame; + } + return $frame; + } +} + diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php new file mode 100644 index 000000000..3ffc1313b --- /dev/null +++ b/lib/ombqueuehandler.php @@ -0,0 +1,56 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for pushing new notices to OpenMicroBlogging subscribers. + */ +class OmbQueueHandler extends QueueHandler +{ + + function transport() + { + return 'omb'; + } + + /** + * @fixme doesn't currently report failure back to the queue manager + * because omb_broadcast_notice() doesn't report it to us + */ + function handle_notice($notice) + { + if ($this->is_remote($notice)) { + $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); + return true; + } else { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_notice($notice); + return true; + } + } + + function is_remote($notice) + { + $user = User::staticGet($notice->profile_id); + return is_null($user); + } +} diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php new file mode 100644 index 000000000..8bb218078 --- /dev/null +++ b/lib/pingqueuehandler.php @@ -0,0 +1,37 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for pushing new notices to ping servers. + */ +class PingQueueHandler extends QueueHandler { + + function transport() { + return 'ping'; + } + + function handle_notice($notice) { + require_once INSTALLDIR . '/lib/ping.php'; + return ping_broadcast_notice($notice); + } +} diff --git a/lib/pluginqueuehandler.php b/lib/pluginqueuehandler.php new file mode 100644 index 000000000..24d504699 --- /dev/null +++ b/lib/pluginqueuehandler.php @@ -0,0 +1,50 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for letting plugins handle stuff. + * + * The plugin queue handler accepts notices over the "plugin" queue + * and simply passes them through the "HandleQueuedNotice" event. + * + * This gives plugins a chance to do background processing without + * actually registering their own queue and ensuring that things + * are queued into it. + * + * Fancier plugins may wish to instead hook the 'GetQueueHandlerClass' + * event with their own class, in which case they must ensure that + * their notices get enqueued when they need them. + */ +class PluginQueueHandler extends QueueHandler +{ + function transport() + { + return 'plugin'; + } + + function handle_notice($notice) + { + Event::handle('HandleQueuedNotice', array(&$notice)); + return true; + } +} diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php new file mode 100644 index 000000000..9ea9ee73a --- /dev/null +++ b/lib/publicqueuehandler.php @@ -0,0 +1,47 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for pushing new notices to public XMPP subscribers. + * @fixme correct this exception handling + */ +class PublicQueueHandler extends QueueHandler +{ + + function transport() + { + return 'public'; + } + + function handle_notice($notice) + { + require_once(INSTALLDIR.'/lib/jabber.php'); + try { + return jabber_public_notice($notice); + } catch (XMPPHP_Exception $e) { + $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + die($e->getMessage()); + } + return true; + } +} diff --git a/lib/queuehandler.php b/lib/queuehandler.php index cd43b1e09..613be6e33 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -19,14 +19,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -require_once(INSTALLDIR.'/lib/daemon.php'); -require_once(INSTALLDIR.'/classes/Queue_item.php'); -require_once(INSTALLDIR.'/classes/Notice.php'); - -define('CLAIM_TIMEOUT', 1200); -define('QUEUE_HANDLER_MISS_IDLE', 10); -define('QUEUE_HANDLER_HIT_IDLE', 0); - /** * Base class for queue handlers. * @@ -36,24 +28,20 @@ define('QUEUE_HANDLER_HIT_IDLE', 0); * * Subclasses must override at least the following methods: * - transport - * - start - * - finish * - handle_notice - * - * Some subclasses will also want to override the idle handler: - * - idle */ -class QueueHandler extends Daemon +#class QueueHandler extends Daemon +class QueueHandler { - function __construct($id=null, $daemonize=true) - { - parent::__construct($daemonize); - - if ($id) { - $this->set_id($id); - } - } +# function __construct($id=null, $daemonize=true) +# { +# parent::__construct($daemonize); +# +# if ($id) { +# $this->set_id($id); +# } +# } /** * How many seconds a polling-based queue manager should wait between @@ -61,22 +49,23 @@ class QueueHandler extends Daemon * * Defaults to 60 seconds; override to speed up or slow down. * + * @fixme not really compatible with global queue manager * @return int timeout in seconds */ - function timeout() - { - return 60; - } +# function timeout() +# { +# return 60; +# } - function class_name() - { - return ucfirst($this->transport()) . 'Handler'; - } +# function class_name() +# { +# return ucfirst($this->transport()) . 'Handler'; +# } - function name() - { - return strtolower($this->class_name().'.'.$this->get_id()); - } +# function name() +# { +# return strtolower($this->class_name().'.'.$this->get_id()); +# } /** * Return transport keyword which identifies items this queue handler @@ -93,30 +82,6 @@ class QueueHandler extends Daemon } /** - * Initialization, run when the queue handler starts. - * If this function indicates failure, the handler run will be aborted. - * - * @fixme run() will abort if this doesn't return true, - * but some subclasses don't bother. - * @return boolean true on success, false on failure - */ - function start() - { - } - - /** - * Cleanup, run when the queue handler ends. - * If this function indicates failure, a warning will be logged. - * - * @fixme run() will throw warnings if this doesn't return true, - * but many subclasses don't bother. - * @return boolean true on success, false on failure - */ - function finish() - { - } - - /** * Here's the meat of your queue handler -- you're handed a Notice * object, which you may do as you will with. * @@ -169,29 +134,10 @@ class QueueHandler extends Daemon return true; } - /** - * Called by QueueHandler after each handled item or empty polling cycle. - * This is a good time to e.g. service your XMPP connection. - * - * Doesn't need to be overridden if there's no maintenance to do. - * - * @param int $timeout seconds to sleep if there's nothing to do - */ - function idle($timeout=0) - { - if ($timeout > 0) { - sleep($timeout); - } - } function log($level, $msg) { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); } - - function getSockets() - { - return array(); - } } diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 43105b7a8..a98c0efff 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -2,7 +2,7 @@ /** * StatusNet, the distributed open-source microblogging tool * - * Abstract class for queue managers + * Abstract class for i/o managers * * PHP version 5 * @@ -23,16 +23,32 @@ * @package StatusNet * @author Evan Prodromou <evan@status.net> * @author Sarven Capadisli <csarven@status.net> - * @copyright 2009 StatusNet, Inc. + * @author Brion Vibber <brion@status.net> + * @copyright 2009-2010 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ -class QueueManager +/** + * Completed child classes must implement the enqueue() method. + * + * For background processing, classes should implement either socket-based + * input (handleInput(), getSockets()) or idle-loop polling (idle()). + */ +abstract class QueueManager extends IoManager { static $qm = null; - static function get() + /** + * Factory function to pull the appropriate QueueManager object + * for this site's configuration. It can then be used to queue + * events for later processing or to spawn a processing loop. + * + * Plugins can add to the built-in types by hooking StartNewQueueManager. + * + * @return QueueManager + */ + public static function get() { if (empty(self::$qm)) { @@ -62,13 +78,130 @@ class QueueManager return self::$qm; } - function enqueue($object, $queue) + /** + * @fixme wouldn't necessarily work with other class types. + * Better to change the interface...? + */ + public static function multiSite() + { + if (common_config('queue', 'subsystem') == 'stomp') { + return IoManager::INSTANCE_PER_PROCESS; + } else { + return IoManager::SINGLE_ONLY; + } + } + + function __construct() { - throw ServerException("Unimplemented function 'enqueue' called"); + $this->initialize(); } - function service($queue, $handler) + /** + * Store an object (usually/always a Notice) into the given queue + * for later processing. No guarantee is made on when it will be + * processed; it could be immediately or at some unspecified point + * in the future. + * + * Must be implemented by any queue manager. + * + * @param Notice $object + * @param string $queue + */ + abstract function enqueue($object, $queue); + + /** + * Instantiate the appropriate QueueHandler class for the given queue. + * + * @param string $queue + * @return mixed QueueHandler or null + */ + function getHandler($queue) { - throw ServerException("Unimplemented function 'service' called"); + if (isset($this->handlers[$queue])) { + $class = $this->handlers[$queue]; + if (class_exists($class)) { + return new $class(); + } else { + common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + } + } else { + common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + } + return null; + } + + /** + * Get a list of all registered queue transport names. + * + * @return array of strings + */ + function getQueues() + { + return array_keys($this->handlers); + } + + /** + * Initialize the list of queue handlers + * + * @event StartInitializeQueueManager + * @event EndInitializeQueueManager + */ + function initialize() + { + if (Event::handle('StartInitializeQueueManager', array($this))) { + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); + } + + // XMPP output handlers... + if (common_config('xmpp', 'enabled')) { + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // @fixme this should move up a level or should get an actual queue + $this->connect('confirm', 'XmppConfirmHandler'); + } + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + } + Event::handle('EndInitializeQueueManager', array($this)); + } + + /** + * Register a queue transport name and handler class for your plugin. + * Only registered transports will be reliably picked up! + * + * @param string $transport + * @param string $class + */ + public function connect($transport, $class) + { + $this->handlers[$transport] = $class; + } + + /** + * Send a statistic ping to the queue monitoring system, + * optionally with a per-queue id. + * + * @param string $key + * @param string $queue + */ + function stats($key, $queue=false) + { + $owners = array(); + if ($queue) { + $owners[] = "queue:$queue"; + $owners[] = "site:" . common_config('site', 'server'); + } + if (isset($this->master)) { + $this->master->stats($key, $owners); + } else { + $monitor = new QueueMonitor(); + $monitor->stats($key, $owners); + } } } diff --git a/lib/queuemonitor.php b/lib/queuemonitor.php new file mode 100644 index 000000000..1c306a629 --- /dev/null +++ b/lib/queuemonitor.php @@ -0,0 +1,116 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * Monitoring output helper for IoMaster and IoManager/QueueManager + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category QueueManager + * @package StatusNet + * @author Brion Vibber <brion@status.net> + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class QueueMonitor +{ + protected $monSocket = null; + + /** + * Increment monitoring statistics for a given counter, if configured. + * Only explicitly listed thread/site/queue owners will be incremented. + * + * @param string $key counter name + * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' + */ + public function stats($key, $owners=array()) + { + $this->ping(array('counter' => $key, + 'owners' => $owners)); + } + + /** + * Send thread state update to the monitoring server, if configured. + * + * @param string $thread ID (eg 'generic.1') + * @param string $state ('init', 'queue', 'shutdown' etc) + * @param string $substate (optional, eg queue name 'omb' 'sms' etc) + */ + public function logState($threadId, $state, $substate='') + { + $this->ping(array('thread_id' => $threadId, + 'state' => $state, + 'substate' => $substate, + 'ts' => microtime(true))); + } + + /** + * General call to the monitoring server + */ + protected function ping($data) + { + $target = common_config('queue', 'monitor'); + if (empty($target)) { + return; + } + + $data = $this->prepMonitorData($data); + + if (substr($target, 0, 4) == 'udp:') { + $this->pingUdp($target, $data); + } else if (substr($target, 0, 5) == 'http:') { + $this->pingHttp($target, $data); + } else { + common_log(LOG_ERR, __METHOD__ . ' unknown monitor target type ' . $target); + } + } + + protected function pingUdp($target, $data) + { + if (!$this->monSocket) { + $this->monSocket = stream_socket_client($target, $errno, $errstr); + } + if ($this->monSocket) { + $post = http_build_query($data, '', '&'); + stream_socket_sendto($this->monSocket, $post); + } else { + common_log(LOG_ERR, __METHOD__ . " UDP logging fail: $errstr"); + } + } + + protected function pingHttp($target, $data) + { + $client = new HTTPClient(); + $result = $client->post($target, array(), $data); + + if (!$result->isOk()) { + common_log(LOG_ERR, __METHOD__ . ' HTTP ' . $result->getStatus() . + ': ' . $result->getBody()); + } + } + + protected function prepMonitorData($data) + { + #asort($data); + #$macdata = http_build_query($data, '', '&'); + #$key = 'This is a nice old key'; + #$data['hmac'] = hash_hmac('sha256', $macdata, $key); + return $data; + } + +} diff --git a/lib/smsqueuehandler.php b/lib/smsqueuehandler.php new file mode 100644 index 000000000..48a96409d --- /dev/null +++ b/lib/smsqueuehandler.php @@ -0,0 +1,39 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Queue handler for pushing new notices to local subscribers using SMS. + */ +class SmsQueueHandler extends QueueHandler +{ + function transport() + { + return 'sms'; + } + + function handle_notice($notice) + { + require_once(INSTALLDIR.'/lib/mail.php'); + return mail_broadcast_notice_sms($notice); + } +} diff --git a/lib/statusnet.php b/lib/statusnet.php new file mode 100644 index 000000000..29e903026 --- /dev/null +++ b/lib/statusnet.php @@ -0,0 +1,298 @@ +<?php +/** + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2009-2010 StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +global $config, $_server, $_path; + +/** + * Global configuration setup and management. + */ +class StatusNet +{ + protected static $have_config; + + /** + * Configure and instantiate a plugin into the current configuration. + * Class definitions will be loaded from standard paths if necessary. + * Note that initialization events won't be fired until later. + * + * @param string $name class name & plugin file/subdir name + * @param array $attrs key/value pairs of public attributes to set on plugin instance + * + * @throws ServerException if plugin can't be found + */ + public static function addPlugin($name, $attrs = null) + { + $name = ucfirst($name); + $pluginclass = "{$name}Plugin"; + + if (!class_exists($pluginclass)) { + + $files = array("local/plugins/{$pluginclass}.php", + "local/plugins/{$name}/{$pluginclass}.php", + "local/{$pluginclass}.php", + "local/{$name}/{$pluginclass}.php", + "plugins/{$pluginclass}.php", + "plugins/{$name}/{$pluginclass}.php"); + + foreach ($files as $file) { + $fullpath = INSTALLDIR.'/'.$file; + if (@file_exists($fullpath)) { + include_once($fullpath); + break; + } + } + if (!class_exists($pluginclass)) { + throw new ServerException(500, "Plugin $name not found."); + } + } + + $inst = new $pluginclass(); + if (!empty($attrs)) { + foreach ($attrs as $aname => $avalue) { + $inst->$aname = $avalue; + } + } + return true; + } + + /** + * Initialize, or re-initialize, StatusNet global configuration + * and plugins. + * + * If switching site configurations during script execution, be + * careful when working with leftover objects -- global settings + * affect many things and they may not behave as you expected. + * + * @param $server optional web server hostname for picking config + * @param $path optional URL path for picking config + * @param $conffile optional configuration file path + * + * @throws NoConfigException if config file can't be found + */ + public static function init($server=null, $path=null, $conffile=null) + { + StatusNet::initDefaults($server, $path); + StatusNet::loadConfigFile($conffile); + + // Load settings from database; note we need autoload for this + Config::loadSettings(); + + self::initPlugins(); + } + + /** + * Fire initialization events for all instantiated plugins. + */ + protected static function initPlugins() + { + // Load default plugins + foreach (common_config('plugins', 'default') as $name => $params) { + if (is_null($params)) { + addPlugin($name); + } else if (is_array($params)) { + if (count($params) == 0) { + addPlugin($name); + } else { + $keys = array_keys($params); + if (is_string($keys[0])) { + addPlugin($name, $params); + } else { + foreach ($params as $paramset) { + addPlugin($name, $paramset); + } + } + } + } + } + + // XXX: if plugins should check the schema at runtime, do that here. + if (common_config('db', 'schemacheck') == 'runtime') { + Event::handle('CheckSchema'); + } + + // Give plugins a chance to initialize in a fully-prepared environment + Event::handle('InitializePlugin'); + } + + /** + * Quick-check if configuration has been established. + * Useful for functions which may get used partway through + * initialization to back off from fancier things. + * + * @return bool + */ + public function haveConfig() + { + return self::$have_config; + } + + /** + * Build default configuration array + * @return array + */ + protected static function defaultConfig() + { + global $_server, $_path; + require(INSTALLDIR.'/lib/default.php'); + return $default; + } + + /** + * Establish default configuration based on given or default server and path + * Sets global $_server, $_path, and $config + */ + protected static function initDefaults($server, $path) + { + global $_server, $_path, $config; + + Event::clearHandlers(); + + // try to figure out where we are. $server and $path + // can be set by including module, else we guess based + // on HTTP info. + + if (isset($server)) { + $_server = $server; + } else { + $_server = array_key_exists('SERVER_NAME', $_SERVER) ? + strtolower($_SERVER['SERVER_NAME']) : + null; + } + + if (isset($path)) { + $_path = $path; + } else { + $_path = (array_key_exists('SERVER_NAME', $_SERVER) && array_key_exists('SCRIPT_NAME', $_SERVER)) ? + self::_sn_to_path($_SERVER['SCRIPT_NAME']) : + null; + } + + // Set config values initially to default values + $default = self::defaultConfig(); + $config = $default; + + // default configuration, overwritten in config.php + // Keep DB_DataObject's db config synced to ours... + + $config['db'] = &PEAR::getStaticProperty('DB_DataObject','options'); + + $config['db'] = $default['db']; + + // Backward compatibility + + $config['site']['design'] =& $config['design']; + + if (function_exists('date_default_timezone_set')) { + /* Work internally in UTC */ + date_default_timezone_set('UTC'); + } + } + + protected function _sn_to_path($sn) + { + $past_root = substr($sn, 1); + $last_slash = strrpos($past_root, '/'); + if ($last_slash > 0) { + $p = substr($past_root, 0, $last_slash); + } else { + $p = ''; + } + return $p; + } + + /** + * Load the default or specified configuration file. + * Modifies global $config and may establish plugins. + * + * @throws NoConfigException + */ + protected function loadConfigFile($conffile=null) + { + global $_server, $_path, $config; + + // From most general to most specific: + // server-wide, then vhost-wide, then for a path, + // finally for a dir (usually only need one of the last two). + + if (isset($conffile)) { + $config_files = array($conffile); + } else { + $config_files = array('/etc/statusnet/statusnet.php', + '/etc/statusnet/laconica.php', + '/etc/laconica/laconica.php', + '/etc/statusnet/'.$_server.'.php', + '/etc/laconica/'.$_server.'.php'); + + if (strlen($_path) > 0) { + $config_files[] = '/etc/statusnet/'.$_server.'_'.$_path.'.php'; + $config_files[] = '/etc/laconica/'.$_server.'_'.$_path.'.php'; + } + + $config_files[] = INSTALLDIR.'/config.php'; + } + + self::$have_config = false; + + foreach ($config_files as $_config_file) { + if (@file_exists($_config_file)) { + include($_config_file); + self::$have_config = true; + } + } + + if (!self::$have_config) { + throw new NoConfigException("No configuration file found.", + $config_files); + } + + // Fixup for statusnet.ini + $_db_name = substr($config['db']['database'], strrpos($config['db']['database'], '/') + 1); + + if ($_db_name != 'statusnet' && !array_key_exists('ini_'.$_db_name, $config['db'])) { + $config['db']['ini_'.$_db_name] = INSTALLDIR.'/classes/statusnet.ini'; + } + + // Backwards compatibility + + if (array_key_exists('memcached', $config)) { + if ($config['memcached']['enabled']) { + addPlugin('Memcache', array('servers' => $config['memcached']['server'])); + } + + if (!empty($config['memcached']['base'])) { + $config['cache']['base'] = $config['memcached']['base']; + } + } + } +} + +class NoConfigException extends Exception +{ + public $config_files; + + function __construct($msg, $config_files) { + parent::__construct($msg); + $this->config_files = $config_files; + } +} diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index f059b42f0..3090e0bfb 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,46 +30,53 @@ require_once 'Stomp.php'; -class LiberalStomp extends Stomp -{ - function getSocket() - { - return $this->_socket; - } -} -class StompQueueManager +class StompQueueManager extends QueueManager { var $server = null; var $username = null; var $password = null; var $base = null; var $con = null; + + protected $master = null; + protected $sites = array(); function __construct() { + parent::__construct(); $this->server = common_config('queue', 'stomp_server'); $this->username = common_config('queue', 'stomp_username'); $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); } - function _connect() + /** + * Tell the i/o master we only need a single instance to cover + * all sites running in this process. + */ + public static function multiSite() { - if (empty($this->con)) { - $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); - $this->con = new LiberalStomp($this->server); + return IoManager::INSTANCE_PER_PROCESS; + } - if ($this->con->connect($this->username, $this->password)) { - $this->_log(LOG_INFO, "Connected."); - } else { - $this->_log(LOG_ERR, 'Failed to connect to queue server'); - throw new ServerException('Failed to connect to queue server'); - } - } + /** + * Record each site we'll be handling input for in this process, + * so we can listen to the necessary queues for it. + * + * @fixme possibly actually do subscription here to save another + * loop over all sites later? + */ + public function addSite($server) + { + $this->sites[] = $server; } - function enqueue($object, $queue) + /** + * Saves a notice object reference into the queue item table. + * @return boolean true on success + */ + public function enqueue($object, $queue) { $notice = $object; @@ -77,7 +84,7 @@ class StompQueueManager // XXX: serialize and send entire notice - $result = $this->con->send($this->_queueName($queue), + $result = $this->con->send($this->queueName($queue), $notice->id, // BODY of the message array ('created' => $notice->created)); @@ -88,78 +95,212 @@ class StompQueueManager common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $queue); + $this->stats('enqueued', $queue); } - function service($queue, $handler) + /** + * Send any sockets we're listening on to the IO manager + * to wait for input. + * + * @return array of resources + */ + public function getSockets() { - $result = null; - - $this->_connect(); + return array($this->con->getSocket()); + } - $this->con->setReadTimeout($handler->timeout()); + /** + * We've got input to handle on our socket! + * Read any waiting Stomp frame(s) and process them. + * + * @param resource $socket + * @return boolean ok on success + */ + public function handleInput($socket) + { + assert($socket === $this->con->getSocket()); + $ok = true; + $frames = $this->con->readFrames(); + foreach ($frames as $frame) { + $ok = $ok && $this->_handleNotice($frame); + } + return $ok; + } - $this->con->subscribe($this->_queueName($queue)); + /** + * Initialize our connection and subscribe to all the queues + * we're going to need to handle... + * + * Side effects: in multi-site mode, may reset site configuration. + * + * @param IoMaster $master process/event controller + * @return bool return false on failure + */ + public function start($master) + { + parent::start($master); + if ($this->sites) { + foreach ($this->sites as $server) { + StatusNet::init($server); + $this->doSubscribe(); + } + } else { + $this->doSubscribe(); + } + return true; + } + + /** + * Subscribe to all the queues we're going to need to handle... + * + * Side effects: in multi-site mode, may reset site configuration. + * + * @return bool return false on failure + */ + public function finish() + { + if ($this->sites) { + foreach ($this->sites as $server) { + StatusNet::init($server); + $this->doUnsubscribe(); + } + } else { + $this->doUnsubscribe(); + } + return true; + } + + /** + * Lazy open connection to Stomp queue server. + */ + protected function _connect() + { + if (empty($this->con)) { + $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); + $this->con = new LiberalStomp($this->server); - while (true) { + if ($this->con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + } + } - // Wait for something on one of our sockets + /** + * Subscribe to all enabled notice queues for the current site. + */ + protected function doSubscribe() + { + $this->_connect(); + foreach ($this->getQueues() as $queue) { + $rawqueue = $this->queueName($queue); + $this->_log(LOG_INFO, "Subscribing to $rawqueue"); + $this->con->subscribe($rawqueue); + } + } + + /** + * Subscribe from all enabled notice queues for the current site. + */ + protected function doUnsubscribe() + { + $this->_connect(); + foreach ($this->getQueues() as $queue) { + $this->con->unsubscribe($this->queueName($queue)); + } + } - $stompsock = $this->con->getSocket(); + /** + * Handle and acknowledge a notice event that's come in through a queue. + * + * If the queue handler reports failure, the message is requeued for later. + * Missing notices or handler classes will drop the message. + * + * Side effects: in multi-site mode, may reset site configuration to + * match the site that queued the event. + * + * @param StompFrame $frame + * @return bool + */ + protected function _handleNotice($frame) + { + list($site, $queue) = $this->parseDestination($frame->headers['destination']); + if ($site != common_config('site', 'server')) { + $this->stats('switch'); + StatusNet::init($site); + } - $handsocks = $handler->getSockets(); + $id = intval($frame->body); + $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; - $socks = array_merge(array($stompsock), $handsocks); + $notice = Notice::staticGet('id', $id); + if (empty($notice)) { + $this->_log(LOG_WARNING, "Skipping missing $info"); + $this->con->ack($frame); + $this->stats('badnotice', $queue); + return false; + } - $read = $socks; - $write = array(); - $except = array(); + $handler = $this->getHandler($queue); + if (!$handler) { + $this->_log(LOG_ERROR, "Missing handler class; skipping $info"); + $this->con->ack($frame); + $this->stats('badhandler', $queue); + return false; + } - $ready = stream_select($read, $write, $except, $handler->timeout(), 0); + $ok = $handler->handle_notice($notice); - if ($ready === false) { - $this->_log(LOG_ERR, "Error selecting on sockets"); - } else if ($ready > 0) { - if (in_array($stompsock, $read)) { - $this->_handleNotice($queue, $handler); - } - $handler->idle(QUEUE_HANDLER_HIT_IDLE); - } + if (!$ok) { + $this->_log(LOG_WARNING, "Failed handling $info"); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves; + // if we don't ack, it should resend... + $this->con->ack($frame); + $this->enqueue($notice, $queue); + $this->stats('requeued', $queue); + return false; } - $this->con->unsubscribe($this->_queueName($queue)); + $this->_log(LOG_INFO, "Successfully handled $info"); + $this->con->ack($frame); + $this->stats('handled', $queue); + return true; } - function _handleNotice($queue, $handler) + /** + * Combines the queue_basename from configuration with the + * site server name and queue name to give eg: + * + * /queue/statusnet/identi.ca/sms + * + * @param string $queue + * @return string + */ + protected function queueName($queue) { - $frame = $this->con->readFrame(); - - if (!empty($frame)) { - $notice = Notice::staticGet('id', $frame->body); - - if (empty($notice)) { - $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); - $this->con->ack($frame); - } else { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - $this->con->ack($frame); - } else { - $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves - $this->con->ack($frame); - $this->enqueue($notice, $queue); - } - unset($notice); - } - - unset($frame); - } + return common_config('queue', 'queue_basename') . + common_config('site', 'server') . '/' . $queue; } - function _queueName($queue) + /** + * Returns the site and queue name from the server-side queue. + * + * @param string queue destination (eg '/queue/statusnet/identi.ca/sms') + * @return array of site and queue: ('identi.ca','sms') or false if unrecognized + */ + protected function parseDestination($dest) { - return common_config('queue', 'queue_basename') . $queue; + $prefix = common_config('queue', 'queue_basename'); + if (substr($dest, 0, strlen($prefix)) == $prefix) { + $rest = substr($dest, strlen($prefix)); + return explode("/", $rest, 2); + } else { + common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest"); + return array(false, false); + } } function _log($level, $msg) @@ -167,3 +308,4 @@ class StompQueueManager common_log($level, 'StompQueueManager: '.$msg); } } + diff --git a/lib/subs.php b/lib/subs.php index 4b6b03967..5ac1a75a5 100644 --- a/lib/subs.php +++ b/lib/subs.php @@ -56,35 +56,44 @@ function subs_subscribe_to($user, $other) return _('User has blocked you.'); } - if (!$user->subscribeTo($other)) { - return _('Could not subscribe.'); - return; - } + try { + if (Event::handle('StartSubscribe', array($user, $other))) { - subs_notify($other, $user); + if (!$user->subscribeTo($other)) { + return _('Could not subscribe.'); + return; + } - $cache = common_memcache(); + subs_notify($other, $user); - if ($cache) { - $cache->delete(common_cache_key('user:notices_with_friends:' . $user->id)); - } + $cache = common_memcache(); - $profile = $user->getProfile(); + if ($cache) { + $cache->delete(common_cache_key('user:notices_with_friends:' . $user->id)); + } - $profile->blowSubscriptionsCount(); - $other->blowSubscribersCount(); + $profile = $user->getProfile(); - if ($other->autosubscribe && !$other->isSubscribed($user) && !$user->hasBlocked($other)) { - if (!$other->subscribeTo($user)) { - return _('Could not subscribe other to you.'); - } - $cache = common_memcache(); + $profile->blowSubscriptionsCount(); + $other->blowSubscribersCount(); + + if ($other->autosubscribe && !$other->isSubscribed($user) && !$user->hasBlocked($other)) { + if (!$other->subscribeTo($user)) { + return _('Could not subscribe other to you.'); + } + $cache = common_memcache(); - if ($cache) { - $cache->delete(common_cache_key('user:notices_with_friends:' . $other->id)); - } + if ($cache) { + $cache->delete(common_cache_key('user:notices_with_friends:' . $other->id)); + } - subs_notify($user, $other); + subs_notify($user, $other); + } + + Event::handle('EndSubscribe', array($user, $other)); + } + } catch (Exception $e) { + return $e->getMessage(); } return true; @@ -133,28 +142,37 @@ function subs_unsubscribe_to($user, $other) return _('Couldn\'t delete self-subscription.'); } - $sub = DB_DataObject::factory('subscription'); + try { + if (Event::handle('StartUnsubscribe', array($user, $other))) { - $sub->subscriber = $user->id; - $sub->subscribed = $other->id; + $sub = DB_DataObject::factory('subscription'); - $sub->find(true); + $sub->subscriber = $user->id; + $sub->subscribed = $other->id; - // note we checked for existence above + $sub->find(true); - if (!$sub->delete()) - return _('Couldn\'t delete subscription.'); + // note we checked for existence above - $cache = common_memcache(); + if (!$sub->delete()) + return _('Couldn\'t delete subscription.'); - if ($cache) { - $cache->delete(common_cache_key('user:notices_with_friends:' . $user->id)); - } + $cache = common_memcache(); - $profile = $user->getProfile(); + if ($cache) { + $cache->delete(common_cache_key('user:notices_with_friends:' . $user->id)); + } - $profile->blowSubscriptionsCount(); - $other->blowSubscribersCount(); + $profile = $user->getProfile(); + + $profile->blowSubscriptionsCount(); + $other->blowSubscribersCount(); + + Event::handle('EndUnsubscribe', array($user, $other)); + } + } catch (Exception $e) { + return $e->getMessage(); + } return true; } diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php index 72dbc4eed..5595eac05 100644 --- a/lib/unqueuemanager.php +++ b/lib/unqueuemanager.php @@ -23,57 +23,35 @@ * @package StatusNet * @author Evan Prodromou <evan@status.net> * @author Sarven Capadisli <csarven@status.net> + * @author Brion Vibber <brion@status.net> * @copyright 2009 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ -class UnQueueManager +class UnQueueManager extends QueueManager { + + /** + * Dummy queue storage manager: instead of saving events for later, + * we just process them immediately. This is only suitable for events + * that can be processed quickly and don't need polling or long-running + * connections to another server such as XMPP. + * + * @param Notice $object + * @param string $queue + */ function enqueue($object, $queue) { $notice = $object; - - switch ($queue) - { - case 'omb': - if ($this->_isLocal($notice)) { - require_once(INSTALLDIR.'/lib/omb.php'); - omb_broadcast_notice($notice); - } - break; - case 'public': - if ($this->_isLocal($notice)) { - require_once(INSTALLDIR.'/lib/jabber.php'); - jabber_public_notice($notice); - } - break; - case 'ping': - if ($this->_isLocal($notice)) { - require_once INSTALLDIR . '/lib/ping.php'; - return ping_broadcast_notice($notice); - } - case 'sms': - require_once(INSTALLDIR.'/lib/mail.php'); - mail_broadcast_notice_sms($notice); - break; - case 'jabber': - require_once(INSTALLDIR.'/lib/jabber.php'); - jabber_broadcast_notice($notice); - break; - case 'plugin': - Event::handle('HandleQueuedNotice', array(&$notice)); - break; - default: + + $handler = $this->getHandler($queue); + if ($handler) { + $handler->handle_notice($notice); + } else { if (Event::handle('UnqueueHandleNotice', array(&$notice, $queue))) { throw new ServerException("UnQueueManager: Unknown queue: $queue"); } } } - - function _isLocal($notice) - { - return ($notice->is_local == Notice::LOCAL_PUBLIC || - $notice->is_local == Notice::LOCAL_NONPUBLIC); - } -}
\ No newline at end of file +} diff --git a/lib/util.php b/lib/util.php index 1237d718b..9255b9b37 100644 --- a/lib/util.php +++ b/lib/util.php @@ -191,7 +191,6 @@ function common_ensure_session() } } } - common_debug("Session ID = " . session_id()); } // Three kinds of arguments: @@ -258,7 +257,6 @@ function common_rememberme($user=null) if (!$user) { $user = common_current_user(); if (!$user) { - common_debug('No current user to remember', __FILE__); return false; } } @@ -276,14 +274,11 @@ function common_rememberme($user=null) if (!$result) { common_log_db_error($rm, 'INSERT', __FILE__); - common_debug('Error adding rememberme record for ' . $user->nickname, __FILE__); return false; } $rm->query('COMMIT'); - common_debug('Inserted rememberme record (' . $rm->code . ', ' . $rm->user_id . '); result = ' . $result . '.', __FILE__); - $cookieval = $rm->user_id . ':' . $rm->code; common_log(LOG_INFO, 'adding rememberme cookie "' . $cookieval . '" for ' . $user->nickname); @@ -391,8 +386,6 @@ function common_current_user() $_cur = common_remembered_user(); if ($_cur) { - common_debug("Got User " . $_cur->nickname); - common_debug("Faking session on remembered user"); // XXX: Is this necessary? $_SESSION['userid'] = $_cur->id; } @@ -838,7 +831,7 @@ function common_path($relative, $ssl=false) } $relative = common_inject_session($relative, $serverpart); - + return $proto.'://'.$serverpart.'/'.$pathpart.$relative; } @@ -849,7 +842,7 @@ function common_inject_session($url, $serverpart = null) if (empty($serverpart)) { $serverpart = parse_url($url, PHP_URL_HOST); } - + $currentServer = $_SERVER['HTTP_HOST']; // Are we pointing to another server (like an SSL server?) @@ -866,7 +859,7 @@ function common_inject_session($url, $serverpart = null) } } } - + return $url; } @@ -1057,7 +1050,12 @@ function common_profile_url($nickname) function common_root_url($ssl=false) { - return common_path('', $ssl); + $url = common_path('', $ssl); + $i = strpos($url, '?'); + if ($i !== false) { + $url = substr($url, 0, $i); + } + return $url; } // returns $bytes bytes of random data as a hexadecimal string @@ -1132,8 +1130,9 @@ function common_log_line($priority, $msg) function common_request_id() { $pid = getmypid(); + $server = common_config('site', 'server'); if (php_sapi_name() == 'cli') { - return $pid; + return "$server:$pid"; } else { static $req_id = null; if (!isset($req_id)) { @@ -1143,7 +1142,7 @@ function common_request_id() $url = $_SERVER['REQUEST_URI']; } $method = $_SERVER['REQUEST_METHOD']; - return "$pid.$req_id $method $url"; + return "$server:$pid.$req_id $method $url"; } } diff --git a/lib/xmppconfirmmanager.php b/lib/xmppconfirmmanager.php new file mode 100644 index 000000000..ee4e294fd --- /dev/null +++ b/lib/xmppconfirmmanager.php @@ -0,0 +1,168 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008-2010 StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +/** + * Event handler for pushing new confirmations to Jabber users. + * @fixme recommend redoing this on a queue-trigger model + * @fixme expiration of old items got dropped in the past, put it back? + */ +class XmppConfirmManager extends IoManager +{ + + /** + * @return mixed XmppConfirmManager, or false if unneeded + */ + public static function get() + { + if (common_config('xmpp', 'enabled')) { + $site = common_config('site', 'server'); + return new XmppConfirmManager(); + } else { + return false; + } + } + + /** + * Tell the i/o master we need one instance for each supporting site + * being handled in this process. + */ + public static function multiSite() + { + return IoManager::INSTANCE_PER_SITE; + } + + function __construct() + { + $this->site = common_config('site', 'server'); + } + + /** + * 10 seconds? Really? That seems a bit frequent. + */ + function pollInterval() + { + return 10; + } + + /** + * Ping! + * @return boolean true if we found something + */ + function poll() + { + $this->switchSite(); + $confirm = $this->next_confirm(); + if ($confirm) { + $this->handle_confirm($confirm); + return true; + } else { + return false; + } + } + + protected function handle_confirm($confirm) + { + require_once INSTALLDIR . '/lib/jabber.php'; + + common_log(LOG_INFO, 'Sending confirmation for ' . $confirm->address); + $user = User::staticGet($confirm->user_id); + if (!$user) { + common_log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id); + return; + } + $success = jabber_confirm_address($confirm->code, + $user->nickname, + $confirm->address); + if (!$success) { + common_log(LOG_ERR, 'Confirmation failed for ' . $confirm->address); + # Just let the claim age out; hopefully things work then + return; + } else { + common_log(LOG_INFO, 'Confirmation sent for ' . $confirm->address); + # Mark confirmation sent; need a dupe so we don't have the WHERE clause + $dupe = Confirm_address::staticGet('code', $confirm->code); + if (!$dupe) { + common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__); + return; + } + $orig = clone($dupe); + $dupe->sent = $dupe->claimed; + $result = $dupe->update($orig); + if (!$result) { + common_log_db_error($dupe, 'UPDATE', __FILE__); + # Just let the claim age out; hopefully things work then + return; + } + } + return true; + } + + protected function next_confirm() + { + $confirm = new Confirm_address(); + $confirm->whereAdd('claimed IS null'); + $confirm->whereAdd('sent IS null'); + # XXX: eventually we could do other confirmations in the queue, too + $confirm->address_type = 'jabber'; + $confirm->orderBy('modified DESC'); + $confirm->limit(1); + if ($confirm->find(true)) { + common_log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address); + # working around some weird DB_DataObject behaviour + $confirm->whereAdd(''); # clears where stuff + $original = clone($confirm); + $confirm->claimed = common_sql_now(); + $result = $confirm->update($original); + if ($result) { + common_log(LOG_INFO, 'Succeeded in claim! '. $result); + return $confirm; + } else { + common_log(LOG_INFO, 'Failed in claim!'); + return false; + } + } + return null; + } + + protected function clear_old_confirm_claims() + { + $confirm = new Confirm(); + $confirm->claimed = null; + $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); + $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY); + $confirm->free(); + unset($confirm); + } + + /** + * Make sure we're on the right site configuration + */ + protected function switchSite() + { + if ($this->site != common_config('site', 'server')) { + common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site"); + $this->stats('switch'); + StatusNet::init($this->site); + } + } +} diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php new file mode 100644 index 000000000..9662e97d1 --- /dev/null +++ b/lib/xmppmanager.php @@ -0,0 +1,273 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2008, 2009, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } + +/** + * XMPP background connection manager for XMPP-using queue handlers, + * allowing them to send outgoing messages on the right connection. + * + * Input is handled during socket select loop, keepalive pings during idle. + * Any incoming messages will be forwarded to the main XmppDaemon process, + * which handles direct user interaction. + * + * In a multi-site queuedaemon.php run, one connection will be instantiated + * for each site being handled by the current process that has XMPP enabled. + */ + +class XmppManager extends IoManager +{ + protected $site = null; + protected $pingid = 0; + protected $lastping = null; + + static protected $singletons = array(); + + const PING_INTERVAL = 120; + + /** + * Fetch the singleton XmppManager for the current site. + * @return mixed XmppManager, or false if unneeded + */ + public static function get() + { + if (common_config('xmpp', 'enabled')) { + $site = common_config('site', 'server'); + if (empty(self::$singletons[$site])) { + self::$singletons[$site] = new XmppManager(); + } + return self::$singletons[$site]; + } else { + return false; + } + } + + /** + * Tell the i/o master we need one instance for each supporting site + * being handled in this process. + */ + public static function multiSite() + { + return IoManager::INSTANCE_PER_SITE; + } + + function __construct() + { + $this->site = common_config('site', 'server'); + } + + /** + * Initialize connection to server. + * @return boolean true on success + */ + public function start($master) + { + parent::start($master); + $this->switchSite(); + + require_once "lib/jabber.php"; + + # Low priority; we don't want to receive messages + + common_log(LOG_INFO, "INITIALIZE"); + $this->conn = jabber_connect($this->resource()); + + if (empty($this->conn)) { + common_log(LOG_ERR, "Couldn't connect to server."); + return false; + } + + $this->conn->addEventHandler('message', 'forward_message', $this); + $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this); + $this->conn->setReconnectTimeout(600); + jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1); + + return !is_null($this->conn); + } + + /** + * Message pump is triggered on socket input, so we only need an idle() + * call often enough to trigger our outgoing pings. + */ + function timeout() + { + return self::PING_INTERVAL; + } + + /** + * Lists the XMPP connection socket to allow i/o master to wake + * when input comes in here as well as from the queue source. + * + * @return array of resources + */ + public function getSockets() + { + return array($this->conn->getSocket()); + } + + /** + * Process XMPP events that have come in over the wire. + * Side effects: may switch site configuration + * @fixme may kill process on XMPP error + * @param resource $socket + */ + public function handleInput($socket) + { + $this->switchSite(); + + # Process the queue for as long as needed + try { + if ($this->conn) { + assert($socket === $this->conn->getSocket()); + + common_log(LOG_DEBUG, "Servicing the XMPP queue."); + $this->stats('xmpp_process'); + $this->conn->processTime(0); + } + } catch (XMPPHP_Exception $e) { + common_log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + die($e->getMessage()); + } + } + + /** + * Idle processing for io manager's execution loop. + * Send keepalive pings to server. + * + * Side effect: kills process on exception from XMPP library. + * + * @fixme non-dying error handling + */ + public function idle($timeout=0) + { + if ($this->conn) { + $now = time(); + if (empty($this->lastping) || $now - $this->lastping > self::PING_INTERVAL) { + $this->switchSite(); + try { + $this->sendPing(); + $this->lastping = $now; + } catch (XMPPHP_Exception $e) { + common_log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); + die($e->getMessage()); + } + } + } + } + + /** + * Send a keepalive ping to the XMPP server. + */ + protected function sendPing() + { + $jid = jabber_daemon_address().'/'.$this->resource(); + $server = common_config('xmpp', 'server'); + + if (!isset($this->pingid)) { + $this->pingid = 0; + } else { + $this->pingid++; + } + + common_log(LOG_DEBUG, "Sending ping #{$this->pingid}"); + + $this->conn->send("<iq from='{$jid}' to='{$server}' id='ping_{$this->pingid}' type='get'><ping xmlns='urn:xmpp:ping'/></iq>"); + } + + /** + * Callback for Jabber reconnect event + * @param $pl + */ + function handle_reconnect(&$pl) + { + common_log(LOG_NOTICE, 'XMPP reconnected'); + + $this->conn->processUntil('session_start'); + $this->conn->presence(null, 'available', null, 'available', -1); + } + + /** + * Callback for Jabber message event. + * + * This connection handles output; if we get a message straight to us, + * forward it on to our XmppDaemon listener for processing. + * + * @param $pl + */ + function forward_message(&$pl) + { + if ($pl['type'] != 'chat') { + common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']); + return; + } + $listener = $this->listener(); + if (strtolower($listener) == strtolower($pl['from'])) { + common_log(LOG_WARNING, 'Ignoring loop message.'); + return; + } + common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener); + $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from'])); + } + + /** + * Build an <addresses> block with an ofrom entry for forwarded messages + * + * @param string $from Jabber ID of original sender + * @return string XML fragment + */ + protected function ofrom($from) + { + $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n"; + $address .= "<address type='ofrom' jid='$from' />\n"; + $address .= "</addresses>\n"; + return $address; + } + + /** + * Build the complete JID of the XmppDaemon process which + * handles primary XMPP input for this site. + * + * @return string Jabber ID + */ + protected function listener() + { + if (common_config('xmpp', 'listener')) { + return common_config('xmpp', 'listener'); + } else { + return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; + } + } + + protected function resource() + { + return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique + } + + /** + * Make sure we're on the right site configuration + */ + protected function switchSite() + { + if ($this->site != common_config('site', 'server')) { + common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site"); + $this->stats('switch'); + StatusNet::init($this->site); + } + } +} diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php deleted file mode 100644 index f28fc9088..000000000 --- a/lib/xmppqueuehandler.php +++ /dev/null @@ -1,142 +0,0 @@ -<?php -/* - * StatusNet - the distributed open-source microblogging tool - * Copyright (C) 2008, 2009, StatusNet, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } - -require_once(INSTALLDIR.'/lib/queuehandler.php'); - -define('PING_INTERVAL', 120); - -/** - * Common superclass for all XMPP-using queue handlers. They all need to - * service their message queues on idle, and forward any incoming messages - * to the XMPP listener connection. So, we abstract out common code to a - * superclass. - */ - -class XmppQueueHandler extends QueueHandler -{ - var $pingid = 0; - var $lastping = null; - - function start() - { - # Low priority; we don't want to receive messages - - $this->log(LOG_INFO, "INITIALIZE"); - $this->conn = jabber_connect($this->_id.$this->transport()); - - if (empty($this->conn)) { - $this->log(LOG_ERR, "Couldn't connect to server."); - return false; - } - - $this->conn->addEventHandler('message', 'forward_message', $this); - $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this); - $this->conn->setReconnectTimeout(600); - jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1); - - return !is_null($this->conn); - } - - function timeout() - { - return 10; - } - - function handle_reconnect(&$pl) - { - $this->log(LOG_NOTICE, 'reconnected'); - - $this->conn->processUntil('session_start'); - $this->conn->presence(null, 'available', null, 'available', -1); - } - - function idle($timeout=0) - { - # Process the queue for as long as needed - try { - if ($this->conn) { - $this->log(LOG_DEBUG, "Servicing the XMPP queue."); - $this->conn->processTime($timeout); - $now = time(); - if (empty($this->lastping) || $now - $this->lastping > PING_INTERVAL) { - $this->sendPing(); - $this->lastping = $now; - } - } - } catch (XMPPHP_Exception $e) { - $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - die($e->getMessage()); - } - } - - function sendPing() - { - $jid = jabber_daemon_address().'/'.$this->_id.$this->transport(); - $server = common_config('xmpp', 'server'); - - if (!isset($this->pingid)) { - $this->pingid = 0; - } else { - $this->pingid++; - } - - $this->log(LOG_DEBUG, "Sending ping #{$this->pingid}"); - - $this->conn->send("<iq from='{$jid}' to='{$server}' id='ping_{$this->pingid}' type='get'><ping xmlns='urn:xmpp:ping'/></iq>"); - } - - function forward_message(&$pl) - { - if ($pl['type'] != 'chat') { - $this->log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']); - return; - } - $listener = $this->listener(); - if (strtolower($listener) == strtolower($pl['from'])) { - $this->log(LOG_WARNING, 'Ignoring loop message.'); - return; - } - $this->log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener); - $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from'])); - } - - function ofrom($from) - { - $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n"; - $address .= "<address type='ofrom' jid='$from' />\n"; - $address .= "</addresses>\n"; - return $address; - } - - function listener() - { - if (common_config('xmpp', 'listener')) { - return common_config('xmpp', 'listener'); - } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; - } - } - - function getSockets() - { - return array($this->conn->getSocket()); - } -} |