diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/action.php | 66 | ||||
-rw-r--r-- | lib/api.php | 9 | ||||
-rw-r--r-- | lib/apiauth.php | 24 | ||||
-rw-r--r-- | lib/dbqueuemanager.php | 140 | ||||
-rw-r--r-- | lib/default.php | 11 | ||||
-rw-r--r-- | lib/designsettings.php | 2 | ||||
-rw-r--r-- | lib/iomaster.php | 49 | ||||
-rw-r--r-- | lib/jabber.php | 43 | ||||
-rw-r--r-- | lib/jabberqueuehandler.php | 4 | ||||
-rw-r--r-- | lib/ombqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/pingqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/pluginqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/publicqueuehandler.php | 6 | ||||
-rw-r--r-- | lib/queued_xmpp.php | 117 | ||||
-rw-r--r-- | lib/queuehandler.php | 95 | ||||
-rw-r--r-- | lib/queuemanager.php | 131 | ||||
-rw-r--r-- | lib/smsqueuehandler.php | 2 | ||||
-rw-r--r-- | lib/spawningdaemon.php | 159 | ||||
-rw-r--r-- | lib/statusnet.php | 2 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 107 | ||||
-rw-r--r-- | lib/unqueuemanager.php | 2 | ||||
-rw-r--r-- | lib/util.php | 20 | ||||
-rw-r--r-- | lib/xmppconfirmmanager.php | 168 | ||||
-rw-r--r-- | lib/xmppmanager.php | 286 | ||||
-rw-r--r-- | lib/xmppoutqueuehandler.php | 55 |
25 files changed, 960 insertions, 544 deletions
diff --git a/lib/action.php b/lib/action.php index a521bcb50..e24277558 100644 --- a/lib/action.php +++ b/lib/action.php @@ -199,10 +199,6 @@ class Action extends HTMLOutputter // lawsuit if (Event::handle('StartShowStatusNetStyles', array($this)) && Event::handle('StartShowLaconicaStyles', array($this))) { $this->cssLink('css/display.css',null,'screen, projection, tv'); - if (common_config('site', 'mobile')) { - // TODO: "handheld" CSS for other mobile devices - $this->cssLink('css/mobile.css','base','only screen and (max-device-width: 480px)'); // Mobile WebKit - } $this->cssLink('css/print.css','base','print'); Event::handle('EndShowStatusNetStyles', array($this)); Event::handle('EndShowLaconicaStyles', array($this)); @@ -373,7 +369,11 @@ class Action extends HTMLOutputter // lawsuit $this->elementStart('div', array('id' => 'header')); $this->showLogo(); $this->showPrimaryNav(); - $this->showSiteNotice(); + if (Event::handle('StartShowSiteNotice', array($this))) { + $this->showSiteNotice(); + + Event::handle('EndShowSiteNotice', array($this)); + } if (common_logged_in()) { $this->showNoticeForm(); } else { @@ -791,23 +791,45 @@ class Action extends HTMLOutputter // lawsuit */ function showContentLicense() { - $this->element('dt', array('id' => 'site_content_license'), _('Site content license')); - $this->elementStart('dd', array('id' => 'site_content_license_cc')); - $this->elementStart('p'); - $this->element('img', array('id' => 'license_cc', - 'src' => common_config('license', 'image'), - 'alt' => common_config('license', 'title'), - 'width' => '80', - 'height' => '15')); - //TODO: This is dirty: i18n - $this->text(_('All '.common_config('site', 'name').' content and data are available under the ')); - $this->element('a', array('class' => 'license', - 'rel' => 'external license', - 'href' => common_config('license', 'url')), - common_config('license', 'title')); - $this->text(_('license.')); - $this->elementEnd('p'); - $this->elementEnd('dd'); + if (Event::handle('StartShowContentLicense', array($this))) { + $this->element('dt', array('id' => 'site_content_license'), _('Site content license')); + $this->elementStart('dd', array('id' => 'site_content_license_cc')); + + switch (common_config('license', 'type')) { + case 'private': + $this->element('p', null, sprintf(_('Content and data of %1$s are private and confidential.'), + common_config('site', 'name'))); + // fall through + case 'allrightsreserved': + if (common_config('license', 'owner')) { + $this->element('p', null, sprintf(_('Content and data copyright by %1$s. All rights reserved.'), + common_config('license', 'owner'))); + } else { + $this->element('p', null, _('Content and data copyright by contributors. All rights reserved.')); + } + break; + case 'cc': // fall through + default: + $this->elementStart('p'); + $this->element('img', array('id' => 'license_cc', + 'src' => common_config('license', 'image'), + 'alt' => common_config('license', 'title'), + 'width' => '80', + 'height' => '15')); + //TODO: This is dirty: i18n + $this->text(_('All '.common_config('site', 'name').' content and data are available under the ')); + $this->element('a', array('class' => 'license', + 'rel' => 'external license', + 'href' => common_config('license', 'url')), + common_config('license', 'title')); + $this->text(_('license.')); + $this->elementEnd('p'); + break; + } + + $this->elementEnd('dd'); + Event::handle('EndShowContentLicense', array($this)); + } } /** diff --git a/lib/api.php b/lib/api.php index 794b14050..825262b4c 100644 --- a/lib/api.php +++ b/lib/api.php @@ -288,11 +288,12 @@ class ApiAction extends Action $twitter_status['attachments'] = array(); foreach ($attachments as $attachment) { - if ($attachment->isEnclosure()) { + $enclosure_o=$attachment->getEnclosure(); + if ($enclosure_o) { $enclosure = array(); - $enclosure['url'] = $attachment->url; - $enclosure['mimetype'] = $attachment->mimetype; - $enclosure['size'] = $attachment->size; + $enclosure['url'] = $enclosure_o->url; + $enclosure['mimetype'] = $enclosure_o->mimetype; + $enclosure['size'] = $enclosure_o->size; $twitter_status['attachments'][] = $enclosure; } } diff --git a/lib/apiauth.php b/lib/apiauth.php index 691db584b..927dcad6a 100644 --- a/lib/apiauth.php +++ b/lib/apiauth.php @@ -79,10 +79,13 @@ class ApiAuthAction extends ApiAction $this->checkOAuthRequest(); } else { $this->checkBasicAuthUser(); - // By default, all basic auth users have read and write access - - $this->access = self::READ_WRITE; } + } else { + + // Check to see if a basic auth user is there even + // if one's not required + + $this->checkBasicAuthUser(false); } return true; @@ -145,7 +148,10 @@ class ApiAuthAction extends ApiAction $this->access = ($appUser->access_type & Oauth_application::$writeAccess) ? self::READ_WRITE : self::READ_ONLY; - $this->auth_user = User::staticGet('id', $appUser->profile_id); + if (Event::handle('StartSetApiUser', array(&$user))) { + $this->auth_user = User::staticGet('id', $appUser->profile_id); + Event::handle('EndSetApiUser', array($user)); + } $msg = "API OAuth authentication for user '%s' (id: %d) on behalf of " . "application '%s' (id: %d)."; @@ -198,13 +204,13 @@ class ApiAuthAction extends ApiAction * @return boolean true or false */ - function checkBasicAuthUser() + function checkBasicAuthUser($required = true) { $this->basicAuthProcessHeader(); $realm = common_config('site', 'name') . ' API'; - if (!isset($this->auth_user)) { + if (!isset($this->auth_user) && $required) { header('WWW-Authenticate: Basic realm="' . $realm . '"'); // show error if the user clicks 'cancel' @@ -212,12 +218,16 @@ class ApiAuthAction extends ApiAction $this->showBasicAuthError(); exit; - } else { + } else if (isset($this->auth_user)) { $nickname = $this->auth_user; $password = $this->auth_pw; $user = common_check_user($nickname, $password); if (Event::handle('StartSetApiUser', array(&$user))) { $this->auth_user = $user; + + // By default, all basic auth users have read and write access + $this->access = self::READ_WRITE; + Event::handle('EndSetApiUser', array($user)); } diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 889365b64..c6350fc66 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -31,19 +31,17 @@ class DBQueueManager extends QueueManager { /** - * Saves a notice object reference into the queue item table. + * Saves an object reference into the queue item table. * @return boolean true on success * @throws ServerException on failure */ public function enqueue($object, $queue) { - $notice = $object; - $qi = new Queue_item(); - $qi->notice_id = $notice->id; + $qi->frame = $this->encode($object); $qi->transport = $queue; - $qi->created = $notice->created; + $qi->created = common_sql_now(); $result = $qi->insert(); if (!$result) { @@ -57,146 +55,92 @@ class DBQueueManager extends QueueManager } /** - * Poll every minute for new events during idle periods. + * Poll every 10 seconds for new events during idle periods. * We'll look in more often when there's data available. * * @return int seconds */ public function pollInterval() { - return 60; + return 10; } /** * Run a polling cycle during idle processing in the input loop. - * @return boolean true if we had a hit + * @return boolean true if we should poll again for more data immediately */ public function poll() { $this->_log(LOG_DEBUG, 'Checking for notices...'); - $item = $this->_nextItem(); - if ($item === false) { + $qi = Queue_item::top($this->getQueues()); + if (empty($qi)) { $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); return false; } - if ($item === true) { - // We dequeued an entry for a deleted or invalid notice. - // Consider it a hit for poll rate purposes. - return true; - } - list($queue, $notice) = $item; - $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue); - - // Yay! Got one! - $handler = $this->getHandler($queue); - if ($handler) { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice"); - $this->_done($notice, $queue); + $queue = $qi->transport; + $item = $this->decode($qi->frame); + + if ($item) { + $rep = $this->logrep($item); + $this->_log(LOG_INFO, "Got $rep for transport $queue"); + + $handler = $this->getHandler($queue); + if ($handler) { + if ($handler->handle($item)) { + $this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item"); + $this->_done($qi); + } else { + $this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item"); + $this->_fail($qi); + } } else { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice"); - $this->_fail($notice, $queue); + $this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding."); + $this->_done($qi); } } else { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding."); - $this->_done($notice, $queue); + $this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding"); + $this->_fail($qi); } return true; } /** - * Pop the oldest unclaimed item off the queue set and claim it. - * - * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice) - * giving the queue transport name. - */ - protected function _nextItem() - { - $start = time(); - $result = null; - - $qi = Queue_item::top(); - if (empty($qi)) { - return false; - } - - $queue = $qi->transport; - $notice = Notice::staticGet('id', $qi->notice_id); - if (empty($notice)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice"); - $qi->delete(); - return true; - } - - $result = $notice; - return array($queue, $notice); - } - - /** * Delete our claimed item from the queue after successful processing. * - * @param Notice $object - * @param string $queue + * @param QueueItem $qi */ - protected function _done($object, $queue) + protected function _done($qi) { - // XXX: right now, we only handle notices - - $notice = $object; - - $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, - 'transport' => $queue)); + $queue = $qi->transport; - if (empty($qi)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); - } else { - if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item"); - } - $qi->delete(); - $qi->free(); + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue"); } + $qi->delete(); - $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item"); $this->stats('handled', $queue); - - $notice->free(); } /** * Free our claimed queue item for later reprocessing in case of * temporary failure. * - * @param Notice $object - * @param string $queue + * @param QueueItem $qi */ - protected function _fail($object, $queue) + protected function _fail($qi) { - // XXX: right now, we only handle notices - - $notice = $object; - - $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, - 'transport' => $queue)); + $queue = $qi->transport; - if (empty($qi)) { - $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item"); + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item"); } else { - if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item"); - } else { - $orig = clone($qi); - $qi->claimed = null; - $qi->update($orig); - $qi = null; - } + $orig = clone($qi); + $qi->claimed = null; + $qi->update($orig); } - $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item"); $this->stats('error', $queue); - - $notice->free(); } protected function _log($level, $msg) diff --git a/lib/default.php b/lib/default.php index 82578fea8..d258bbaf4 100644 --- a/lib/default.php +++ b/lib/default.php @@ -67,7 +67,9 @@ $default = 'db_driver' => 'DB', # XXX: JanRain libs only work with DB 'quote_identifiers' => false, 'type' => 'mysql', - 'schemacheck' => 'runtime'), // 'runtime' or 'script' + 'schemacheck' => 'runtime', // 'runtime' or 'script' + 'log_queries' => false, // true to log all DB queries + 'log_slow_queries' => 0), // if set, log queries taking over N seconds 'syslog' => array('appname' => 'statusnet', # for syslog 'priority' => 'debug', # XXX: currently ignored @@ -81,9 +83,12 @@ $default = 'stomp_password' => null, 'monitor' => null, // URL to monitor ping endpoint (work in progress) 'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully + 'debug_memory' => false, // true to spit memory usage to log ), 'license' => - array('url' => 'http://creativecommons.org/licenses/by/3.0/', + array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private' + 'owner' => null, # can be name of content owner e.g. for enterprise + 'url' => 'http://creativecommons.org/licenses/by/3.0/', 'title' => 'Creative Commons Attribution 3.0', 'image' => 'http://i.creativecommons.org/l/by/3.0/80x15.png'), 'mail' => @@ -249,6 +254,8 @@ $default = 'Mapstraction' => null, 'Linkback' => null, 'WikiHashtags' => null, + 'PubSubHubBub' => null, + 'RSSCloud' => null, 'OpenID' => null), ), 'admin' => diff --git a/lib/designsettings.php b/lib/designsettings.php index b70ba0dfc..8e44c03a9 100644 --- a/lib/designsettings.php +++ b/lib/designsettings.php @@ -314,7 +314,7 @@ class DesignSettingsAction extends AccountSettingsAction function showStylesheets() { parent::showStylesheets(); - $this->cssLink('css/farbtastic.css','base','screen, projection, tv'); + $this->cssLink('js/farbtastic/farbtastic.css',null,'screen, projection, tv'); } /** diff --git a/lib/iomaster.php b/lib/iomaster.php index ce77b53b2..29bd677bd 100644 --- a/lib/iomaster.php +++ b/lib/iomaster.php @@ -27,7 +27,7 @@ * @link http://status.net/ */ -class IoMaster +abstract class IoMaster { public $id; @@ -66,24 +66,19 @@ class IoMaster if ($site != common_config('site', 'server')) { StatusNet::init($site); } - - $classes = array(); - if (Event::handle('StartIoManagerClasses', array(&$classes))) { - $classes[] = 'QueueManager'; - if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { - $classes[] = 'XmppManager'; // handles pings/reconnects - $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations - } - } - Event::handle('EndIoManagerClasses', array(&$classes)); - - foreach ($classes as $class) { - $this->instantiate($class); - } + $this->initManagers(); } } /** + * Initialize IoManagers for the currently configured site + * which are appropriate to this instance. + * + * Pass class names into $this->instantiate() + */ + abstract function initManagers(); + + /** * Pull all local sites from status_network table. * @return array of hostnames */ @@ -107,7 +102,7 @@ class IoMaster */ protected function instantiate($class) { - if (isset($this->singletons[$class])) { + if (is_string($class) && isset($this->singletons[$class])) { // Already instantiated a multi-site-capable handler. // Just let it know it should listen to this site too! $this->singletons[$class]->addSite(common_config('site', 'server')); @@ -134,7 +129,11 @@ class IoMaster protected function getManager($class) { - return call_user_func(array($class, 'get')); + if(is_object($class)){ + return $class; + } else { + return call_user_func(array($class, 'get')); + } } /** @@ -170,7 +169,7 @@ class IoMaster $write = array(); $except = array(); $this->logState('listening'); - common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data..."); + common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data..."); $ready = stream_select($read, $write, $except, $timeout, 0); if ($ready === false) { @@ -190,7 +189,7 @@ class IoMaster if ($timeout > 0 && empty($sockets)) { // If we had no listeners, sleep until the pollers' next requested wakeup. - common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle..."); + common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle..."); $this->logState('sleep'); sleep($timeout); } @@ -207,6 +206,8 @@ class IoMaster if ($usage > $memoryLimit) { common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); break; + } else if (common_config('queue', 'debug_memory')) { + common_log(LOG_DEBUG, "Memory usage $usage"); } } } @@ -223,8 +224,7 @@ class IoMaster { $softLimit = trim(common_config('queue', 'softlimit')); if (substr($softLimit, -1) == '%') { - $limit = trim(ini_get('memory_limit')); - $limit = $this->parseMemoryLimit($limit); + $limit = $this->parseMemoryLimit(ini_get('memory_limit')); if ($limit > 0) { return intval(substr($softLimit, 0, -1) * $limit / 100); } else { @@ -242,9 +242,10 @@ class IoMaster * @param string $mem * @return int */ - protected function parseMemoryLimit($mem) + public function parseMemoryLimit($mem) { // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes + $mem = strtolower(trim($mem)); $size = array('k' => 1024, 'm' => 1024*1024, 'g' => 1024*1024*1024); @@ -253,7 +254,7 @@ class IoMaster } else if (is_numeric($mem)) { return intval($mem); } else { - $mult = strtolower(substr($mem, -1)); + $mult = substr($mem, -1); if (isset($size[$mult])) { return substr($mem, 0, -1) * $size[$mult]; } else { @@ -350,7 +351,7 @@ class IoMaster * for per-queue and per-site records. * * @param string $key counter name - * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' + * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01' */ public function stats($key, $owners=array()) { diff --git a/lib/jabber.php b/lib/jabber.php index 4cdfa6746..b6b23521b 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -86,6 +86,27 @@ class Sharing_XMPP extends XMPPHP_XMPP } /** + * Build an XMPP proxy connection that'll save outgoing messages + * to the 'xmppout' queue to be picked up by xmppdaemon later. + */ +function jabber_proxy() +{ + $proxy = new Queued_XMPP(common_config('xmpp', 'host') ? + common_config('xmpp', 'host') : + common_config('xmpp', 'server'), + common_config('xmpp', 'port'), + common_config('xmpp', 'user'), + common_config('xmpp', 'password'), + common_config('xmpp', 'resource') . 'daemon', + common_config('xmpp', 'server'), + common_config('xmpp', 'debug') ? + true : false, + common_config('xmpp', 'debug') ? + XMPPHP_Log::LEVEL_VERBOSE : null); + return $proxy; +} + +/** * Lazy-connect the configured Jabber account to the configured server; * if already opened, the same connection will be returned. * @@ -143,7 +164,7 @@ function jabber_connect($resource=null) } /** - * send a single notice to a given Jabber address + * Queue send for a single notice to a given Jabber address * * @param string $to JID to send the notice to * @param Notice $notice notice to send @@ -153,10 +174,7 @@ function jabber_connect($resource=null) function jabber_send_notice($to, $notice) { - $conn = jabber_connect(); - if (!$conn) { - return false; - } + $conn = jabber_proxy(); $profile = Profile::staticGet($notice->profile_id); if (!$profile) { common_log(LOG_WARNING, 'Refusing to send notice with ' . @@ -221,10 +239,7 @@ function jabber_format_entry($profile, $notice) function jabber_send_message($to, $body, $type='chat', $subject=null) { - $conn = jabber_connect(); - if (!$conn) { - return false; - } + $conn = jabber_proxy(); $conn->message($to, $body, $type, $subject); return true; } @@ -319,7 +334,7 @@ function jabber_special_presence($type, $to=null, $show=null, $status=null) } /** - * broadcast a notice to all subscribers and reply recipients + * Queue broadcast of a notice to all subscribers and reply recipients * * This function will send a notice to all subscribers on the local server * who have Jabber addresses, and have Jabber notification enabled, and @@ -354,7 +369,7 @@ function jabber_broadcast_notice($notice) $sent_to = array(); - $conn = jabber_connect(); + $conn = jabber_proxy(); $ni = $notice->whoGets(); @@ -389,14 +404,13 @@ function jabber_broadcast_notice($notice) 'Sending notice ' . $notice->id . ' to ' . $user->jabber, __FILE__); $conn->message($user->jabber, $msg, 'chat', null, $entry); - $conn->processTime(0); } return true; } /** - * send a notice to all public listeners + * Queue send of a notice to all public listeners * * For notices that are generated on the local system (by users), we can optionally * forward them to remote listeners by XMPP. @@ -429,7 +443,7 @@ function jabber_public_notice($notice) $msg = jabber_format_notice($profile, $notice); $entry = jabber_format_entry($profile, $notice); - $conn = jabber_connect(); + $conn = jabber_proxy(); foreach ($public as $address) { common_log(LOG_INFO, @@ -437,7 +451,6 @@ function jabber_public_notice($notice) ' to public listener ' . $address, __FILE__); $conn->message($address, $msg, 'chat', null, $entry); - $conn->processTime(0); } $profile->free(); } diff --git a/lib/jabberqueuehandler.php b/lib/jabberqueuehandler.php index b1518866d..83471f2df 100644 --- a/lib/jabberqueuehandler.php +++ b/lib/jabberqueuehandler.php @@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler return 'jabber'; } - function handle_notice($notice) + function handle($notice) { require_once(INSTALLDIR.'/lib/jabber.php'); try { return jabber_broadcast_notice($notice); } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - exit(1); + return false; } } } diff --git a/lib/ombqueuehandler.php b/lib/ombqueuehandler.php index 3ffc1313b..24896c784 100644 --- a/lib/ombqueuehandler.php +++ b/lib/ombqueuehandler.php @@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler * @fixme doesn't currently report failure back to the queue manager * because omb_broadcast_notice() doesn't report it to us */ - function handle_notice($notice) + function handle($notice) { if ($this->is_remote($notice)) { $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id); diff --git a/lib/pingqueuehandler.php b/lib/pingqueuehandler.php index 8bb218078..4e4d74cb1 100644 --- a/lib/pingqueuehandler.php +++ b/lib/pingqueuehandler.php @@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler { return 'ping'; } - function handle_notice($notice) { + function handle($notice) { require_once INSTALLDIR . '/lib/ping.php'; return ping_broadcast_notice($notice); } diff --git a/lib/pluginqueuehandler.php b/lib/pluginqueuehandler.php index 24d504699..9653ccad4 100644 --- a/lib/pluginqueuehandler.php +++ b/lib/pluginqueuehandler.php @@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler return 'plugin'; } - function handle_notice($notice) + function handle($notice) { Event::handle('HandleQueuedNotice', array(&$notice)); return true; diff --git a/lib/publicqueuehandler.php b/lib/publicqueuehandler.php index 9ea9ee73a..c9edb8d5d 100644 --- a/lib/publicqueuehandler.php +++ b/lib/publicqueuehandler.php @@ -23,7 +23,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { /** * Queue handler for pushing new notices to public XMPP subscribers. - * @fixme correct this exception handling */ class PublicQueueHandler extends QueueHandler { @@ -33,15 +32,14 @@ class PublicQueueHandler extends QueueHandler return 'public'; } - function handle_notice($notice) + function handle($notice) { require_once(INSTALLDIR.'/lib/jabber.php'); try { return jabber_public_notice($notice); } catch (XMPPHP_Exception $e) { $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage()); - die($e->getMessage()); + return false; } - return true; } } diff --git a/lib/queued_xmpp.php b/lib/queued_xmpp.php new file mode 100644 index 000000000..4b890c4ca --- /dev/null +++ b/lib/queued_xmpp.php @@ -0,0 +1,117 @@ +<?php +/** + * StatusNet, the distributed open-source microblogging tool + * + * Queue-mediated proxy class for outgoing XMPP messages. + * + * PHP version 5 + * + * LICENCE: This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * @category Network + * @package StatusNet + * @author Brion Vibber <brion@status.net> + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { + exit(1); +} + +require_once INSTALLDIR . '/lib/jabber.php'; + +class Queued_XMPP extends XMPPHP_XMPP +{ + /** + * Constructor + * + * @param string $host + * @param integer $port + * @param string $user + * @param string $password + * @param string $resource + * @param string $server + * @param boolean $printlog + * @param string $loglevel + */ + public function __construct($host, $port, $user, $password, $resource, $server = null, $printlog = false, $loglevel = null) + { + parent::__construct($host, $port, $user, $password, $resource, $server, $printlog, $loglevel); + // Normally the fulljid isn't filled out until resource binding time; + // we need to save it here since we're not talking to a real server. + $this->fulljid = "{$this->basejid}/{$this->resource}"; + } + + /** + * Send a formatted message to the outgoing queue for later forwarding + * to a real XMPP connection. + * + * @param string $msg + */ + public function send($msg, $timeout=NULL) + { + $qm = QueueManager::get(); + $qm->enqueue(strval($msg), 'xmppout'); + } + + /** + * Since we'll be getting input through a queue system's run loop, + * we'll process one standalone message at a time rather than our + * own XMPP message pump. + * + * @param string $message + */ + public function processMessage($message) { + $frame = array_shift($this->frames); + xml_parse($this->parser, $frame->body, false); + } + + //@{ + /** + * Stream i/o functions disabled; push input through processMessage() + */ + public function connect($timeout = 30, $persistent = false, $sendinit = true) + { + throw new Exception("Can't connect to server from XMPP queue proxy."); + } + + public function disconnect() + { + throw new Exception("Can't connect to server from XMPP queue proxy."); + } + + public function process() + { + throw new Exception("Can't read stream from XMPP queue proxy."); + } + + public function processUntil($event, $timeout=-1) + { + throw new Exception("Can't read stream from XMPP queue proxy."); + } + + public function read() + { + throw new Exception("Can't read stream from XMPP queue proxy."); + } + + public function readyToProcess() + { + throw new Exception("Can't read stream from XMPP queue proxy."); + } + //@} +} + diff --git a/lib/queuehandler.php b/lib/queuehandler.php index 613be6e33..2909cd83b 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -22,51 +22,20 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } /** * Base class for queue handlers. * - * As extensions of the Daemon class, each queue handler has the ability - * to launch itself in the background, at which point it'll pass control - * to the configured QueueManager class to poll for updates. + * As of 0.9, queue handlers are short-lived for items as they are + * dequeued by a QueueManager running in an IoMaster in a daemon + * such as queuedaemon.php. + * + * Extensions requiring long-running maintenance or polling should + * register an IoManager. * * Subclasses must override at least the following methods: * - transport - * - handle_notice + * - handle */ -#class QueueHandler extends Daemon class QueueHandler { -# function __construct($id=null, $daemonize=true) -# { -# parent::__construct($daemonize); -# -# if ($id) { -# $this->set_id($id); -# } -# } - - /** - * How many seconds a polling-based queue manager should wait between - * checks for new items to handle. - * - * Defaults to 60 seconds; override to speed up or slow down. - * - * @fixme not really compatible with global queue manager - * @return int timeout in seconds - */ -# function timeout() -# { -# return 60; -# } - -# function class_name() -# { -# return ucfirst($this->transport()) . 'Handler'; -# } - -# function name() -# { -# return strtolower($this->class_name().'.'.$this->get_id()); -# } - /** * Return transport keyword which identifies items this queue handler * services; must be defined for all subclasses. @@ -83,61 +52,17 @@ class QueueHandler /** * Here's the meat of your queue handler -- you're handed a Notice - * object, which you may do as you will with. + * or other object, which you may do as you will with. * * If this function indicates failure, a warning will be logged * and the item is placed back in the queue to be re-run. * - * @param Notice $notice - * @return boolean true on success, false on failure - */ - function handle_notice($notice) - { - return true; - } - - /** - * Setup and start of run loop for this queue handler as a daemon. - * Most of the heavy lifting is passed on to the QueueManager's service() - * method, which passes control back to our handle_notice() method for - * each notice that comes in on the queue. - * - * Most of the time this won't need to be overridden in a subclass. - * + * @param mixed $object * @return boolean true on success, false on failure */ - function run() + function handle($object) { - if (!$this->start()) { - $this->log(LOG_WARNING, 'failed to start'); - return false; - } - - $this->log(LOG_INFO, 'checking for queued notices'); - - $queue = $this->transport(); - $timeout = $this->timeout(); - - $qm = QueueManager::get(); - - $qm->service($queue, $this); - - $this->log(LOG_INFO, 'finished servicing the queue'); - - if (!$this->finish()) { - $this->log(LOG_WARNING, 'failed to clean up'); - return false; - } - - $this->log(LOG_INFO, 'terminating normally'); - return true; } - - - function log($level, $msg) - { - common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); - } } diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 291174d3c..b2e86b127 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; + public $master = null; + public $handlers = array(); + public $groups = array(); + /** * Factory function to pull the appropriate QueueManager object * for this site's configuration. It can then be used to queue @@ -110,6 +114,64 @@ abstract class QueueManager extends IoManager abstract function enqueue($object, $queue); /** + * Build a representation for an object for logging + * @param mixed + * @return string + */ + function logrep($object) { + if (is_object($object)) { + $class = get_class($object); + if (isset($object->id)) { + return "$class $object->id"; + } + return $class; + } + if (is_string($object)) { + $len = strlen($object); + $fragment = mb_substr($object, 0, 32); + if (mb_strlen($object) > 32) { + $fragment .= '...'; + } + return "string '$fragment' ($len bytes)"; + } + return strval($object); + } + + /** + * Encode an object for queued storage. + * Next gen may use serialization. + * + * @param mixed $object + * @return string + */ + protected function encode($object) + { + if ($object instanceof Notice) { + return $object->id; + } else if (is_string($object)) { + return $object; + } else { + throw new ServerException("Can't queue this type", 500); + } + } + + /** + * Decode an object from queued storage. + * Accepts back-compat notice reference entries and strings for now. + * + * @param string + * @return mixed + */ + protected function decode($frame) + { + if (is_numeric($frame)) { + return Notice::staticGet(intval($frame)); + } else { + return $frame; + } + } + + /** * Instantiate the appropriate QueueHandler class for the given queue. * * @param string $queue @@ -119,7 +181,9 @@ abstract class QueueManager extends IoManager { if (isset($this->handlers[$queue])) { $class = $this->handlers[$queue]; - if (class_exists($class)) { + if(is_object($class)) { + return $class; + } else if (class_exists($class)) { return new $class(); } else { common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); @@ -131,13 +195,15 @@ abstract class QueueManager extends IoManager } /** - * Get a list of all registered queue transport names. + * Get a list of registered queue transport names to be used + * for this daemon. * * @return array of strings */ function getQueues() { - return array_keys($this->handlers); + $group = $this->activeGroup(); + return array_keys($this->groups[$group]); } /** @@ -148,33 +214,29 @@ abstract class QueueManager extends IoManager */ function initialize() { + // @fixme we'll want to be able to listen to particular queues... if (Event::handle('StartInitializeQueueManager', array($this))) { - if (!defined('XMPP_ONLY_FLAG')) { // hack! - $this->connect('plugin', 'PluginQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); - $this->connect('ping', 'PingQueueHandler'); - if (common_config('sms', 'enabled')) { - $this->connect('sms', 'SmsQueueHandler'); - } + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); } // XMPP output handlers... - if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - - // @fixme this should move up a level or should get an actual queue - $this->connect('confirm', 'XmppConfirmHandler'); - } + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // @fixme this should get an actual queue + //$this->connect('confirm', 'XmppConfirmHandler'); + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); - if (!defined('XMPP_ONLY_FLAG')) { // hack! - // For compat with old plugins not registering their own handlers. - $this->connect('plugin', 'PluginQueueHandler'); - } - } - if (!defined('XMPP_ONLY_FLAG')) { // hack! - Event::handle('EndInitializeQueueManager', array($this)); } + Event::handle('EndInitializeQueueManager', array($this)); } /** @@ -182,11 +244,28 @@ abstract class QueueManager extends IoManager * Only registered transports will be reliably picked up! * * @param string $transport - * @param string $class + * @param string $class class name or object instance + * @param string $group */ - public function connect($transport, $class) + public function connect($transport, $class, $group='queuedaemon') { $this->handlers[$transport] = $class; + $this->groups[$group][$transport] = $class; + } + + /** + * @return string queue group to use for this request + */ + function activeGroup() + { + $group = 'queuedaemon'; + if ($this->master) { + // hack hack + if ($this->master instanceof XmppMaster) { + return 'xmppdaemon'; + } + } + return $group; } /** diff --git a/lib/smsqueuehandler.php b/lib/smsqueuehandler.php index 48a96409d..6085d2b4a 100644 --- a/lib/smsqueuehandler.php +++ b/lib/smsqueuehandler.php @@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler return 'sms'; } - function handle_notice($notice) + function handle($notice) { require_once(INSTALLDIR.'/lib/mail.php'); return mail_broadcast_notice_sms($notice); diff --git a/lib/spawningdaemon.php b/lib/spawningdaemon.php new file mode 100644 index 000000000..8baefe88e --- /dev/null +++ b/lib/spawningdaemon.php @@ -0,0 +1,159 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2010, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** + * Base class for daemon that can launch one or more processing threads, + * respawning them if they exit. + * + * This is mainly intended for indefinite workloads such as monitoring + * a queue or maintaining an IM channel. + * + * Child classes should implement the + * + * We can then pass individual items through the QueueHandler subclasses + * they belong to. We additionally can handle queues for multiple sites. + * + * @package QueueHandler + * @author Brion Vibber <brion@status.net> + */ +abstract class SpawningDaemon extends Daemon +{ + protected $threads=1; + + function __construct($id=null, $daemonize=true, $threads=1) + { + parent::__construct($daemonize); + + if ($id) { + $this->set_id($id); + } + $this->threads = $threads; + } + + /** + * Perform some actual work! + * + * @return boolean true on success, false on failure + */ + public abstract function runThread(); + + /** + * Spawn one or more background processes and let them start running. + * Each individual process will execute whatever's in the runThread() + * method, which should be overridden. + * + * Child processes will be automatically respawned when they exit. + * + * @todo possibly allow for not respawning on "normal" exits... + * though ParallelizingDaemon is probably better for workloads + * that have forseeable endpoints. + */ + function run() + { + $children = array(); + for ($i = 1; $i <= $this->threads; $i++) { + $pid = pcntl_fork(); + if ($pid < 0) { + $this->log(LOG_ERROR, "Couldn't fork for thread $i; aborting\n"); + exit(1); + } else if ($pid == 0) { + $this->initAndRunChild($i); + } else { + $this->log(LOG_INFO, "Spawned thread $i as pid $pid"); + $children[$i] = $pid; + } + } + + $this->log(LOG_INFO, "Waiting for children to complete."); + while (count($children) > 0) { + $status = null; + $pid = pcntl_wait($status); + if ($pid > 0) { + $i = array_search($pid, $children); + if ($i === false) { + $this->log(LOG_ERR, "Unrecognized child pid $pid exited!"); + continue; + } + unset($children[$i]); + $this->log(LOG_INFO, "Thread $i pid $pid exited."); + + $pid = pcntl_fork(); + if ($pid < 0) { + $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n"); + } else if ($pid == 0) { + $this->initAndRunChild($i); + } else { + $this->log(LOG_INFO, "Respawned thread $i as pid $pid"); + $children[$i] = $pid; + } + } + } + $this->log(LOG_INFO, "All child processes complete."); + return true; + } + + /** + * Initialize things for a fresh thread, call runThread(), and + * exit at completion with appropriate return value. + */ + protected function initAndRunChild($thread) + { + $this->set_id($this->get_id() . "." . $thread); + $this->resetDb(); + $ok = $this->runThread(); + exit($ok ? 0 : 1); + } + + /** + * Reconnect to the database for each child process, + * or they'll get very confused trying to use the + * same socket. + */ + protected function resetDb() + { + // @fixme do we need to explicitly open the db too + // or is this implied? + global $_DB_DATAOBJECT; + unset($_DB_DATAOBJECT['CONNECTIONS']); + + // Reconnect main memcached, or threads will stomp on + // each other and corrupt their requests. + $cache = common_memcache(); + if ($cache) { + $cache->reconnect(); + } + + // Also reconnect memcached for status_network table. + if (!empty(Status_network::$cache)) { + Status_network::$cache->close(); + Status_network::$cache = null; + } + } + + function log($level, $msg) + { + common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg); + } + + function name() + { + return strtolower(get_class($this).'.'.$this->get_id()); + } +} + diff --git a/lib/statusnet.php b/lib/statusnet.php index 29e903026..beeb26ccc 100644 --- a/lib/statusnet.php +++ b/lib/statusnet.php @@ -63,7 +63,7 @@ class StatusNet } } if (!class_exists($pluginclass)) { - throw new ServerException(500, "Plugin $name not found."); + throw new ServerException("Plugin $name not found.", 500); } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 00590fdb6..4bbdeedc2 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -39,9 +39,12 @@ class StompQueueManager extends QueueManager var $base = null; var $con = null; - protected $master = null; protected $sites = array(); + protected $useTransactions = true; + protected $transaction = null; + protected $transactionCount = 0; + function __construct() { parent::__construct(); @@ -104,11 +107,12 @@ class StompQueueManager extends QueueManager */ function getQueues() { + $group = $this->activeGroup(); $site = common_config('site', 'server'); - if (empty($this->handlers[$site])) { + if (empty($this->groups[$site][$group])) { return array(); } else { - return array_keys($this->handlers[$site]); + return array_keys($this->groups[$site][$group]); } } @@ -118,10 +122,12 @@ class StompQueueManager extends QueueManager * * @param string $transport * @param string $class + * @param string $group */ - public function connect($transport, $class) + public function connect($transport, $class, $group='queuedaemon') { $this->handlers[common_config('site', 'server')][$transport] = $class; + $this->groups[common_config('site', 'server')][$group][$transport] = $class; } /** @@ -130,23 +136,23 @@ class StompQueueManager extends QueueManager */ public function enqueue($object, $queue) { - $notice = $object; + $msg = $this->encode($object); + $rep = $this->logrep($object); $this->_connect(); // XXX: serialize and send entire notice $result = $this->con->send($this->queueName($queue), - $notice->id, // BODY of the message - array ('created' => $notice->created)); + $msg, // BODY of the message + array ('created' => common_sql_now())); if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); + common_log(LOG_ERR, "Error sending $rep to $queue queue"); return false; } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' - . $notice->id . ' for ' . $queue); + common_log(LOG_DEBUG, "complete remote queueing $rep for $queue"); $this->stats('enqueued', $queue); } @@ -174,7 +180,7 @@ class StompQueueManager extends QueueManager $ok = true; $frames = $this->con->readFrames(); foreach ($frames as $frame) { - $ok = $ok && $this->_handleNotice($frame); + $ok = $ok && $this->_handleItem($frame); } return $ok; } @@ -199,6 +205,7 @@ class StompQueueManager extends QueueManager } else { $this->doSubscribe(); } + $this->begin(); return true; } @@ -211,6 +218,9 @@ class StompQueueManager extends QueueManager */ public function finish() { + // If there are any outstanding delivered messages we haven't processed, + // free them for another thread to take. + $this->rollback(); if ($this->sites) { foreach ($this->sites as $server) { StatusNet::init($server); @@ -265,7 +275,7 @@ class StompQueueManager extends QueueManager } /** - * Handle and acknowledge a notice event that's come in through a queue. + * Handle and acknowledge an event that's come in through a queue. * * If the queue handler reports failure, the message is requeued for later. * Missing notices or handler classes will drop the message. @@ -276,7 +286,7 @@ class StompQueueManager extends QueueManager * @param StompFrame $frame * @return bool */ - protected function _handleNotice($frame) + protected function _handleItem($frame) { list($site, $queue) = $this->parseDestination($frame->headers['destination']); if ($site != common_config('site', 'server')) { @@ -284,40 +294,37 @@ class StompQueueManager extends QueueManager StatusNet::init($site); } - $id = intval($frame->body); - $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; - - $notice = Notice::staticGet('id', $id); - if (empty($notice)) { - $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->con->ack($frame); - $this->stats('badnotice', $queue); - return false; - } + $item = $this->decode($frame->body); $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERROR, "Missing handler class; skipping $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('badhandler', $queue); return false; } - $ok = $handler->handle_notice($notice); + $ok = $handler->handle($item); if (!$ok) { $this->_log(LOG_WARNING, "Failed handling $info"); // FIXME we probably shouldn't have to do // this kind of queue management ourselves; // if we don't ack, it should resend... - $this->con->ack($frame); - $this->enqueue($notice, $queue); + $this->ack($frame); + $this->enqueue($item, $queue); + $this->commit(); + $this->begin(); $this->stats('requeued', $queue); return false; } $this->_log(LOG_INFO, "Successfully handled $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('handled', $queue); return true; } @@ -359,5 +366,49 @@ class StompQueueManager extends QueueManager { common_log($level, 'StompQueueManager: '.$msg); } + + protected function begin() + { + if ($this->useTransactions) { + if ($this->transaction) { + throw new Exception("Tried to start transaction in the middle of a transaction"); + } + $this->transactionCount++; + $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time(); + $this->con->begin($this->transaction); + } + } + + protected function ack($frame) + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to ack but not in a transaction"); + } + } + $this->con->ack($frame, $this->transaction); + } + + protected function commit() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to commit but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } + + protected function rollback() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to rollback but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } } diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php index 5595eac05..785de7c8c 100644 --- a/lib/unqueuemanager.php +++ b/lib/unqueuemanager.php @@ -47,7 +47,7 @@ class UnQueueManager extends QueueManager $handler = $this->getHandler($queue); if ($handler) { - $handler->handle_notice($notice); + $handler->handle($notice); } else { if (Event::handle('UnqueueHandleNotice', array(&$notice, $queue))) { throw new ServerException("UnQueueManager: Unknown queue: $queue"); diff --git a/lib/util.php b/lib/util.php index ef8a5d1f0..01b159ac1 100644 --- a/lib/util.php +++ b/lib/util.php @@ -596,20 +596,13 @@ function common_linkify($url) { } if (!empty($f)) { - if ($f->isEnclosure()) { + if ($f->getEnclosure()) { $is_attachment = true; $attachment_id = $f->id; - } else { - $foe = File_oembed::staticGet('file_id', $f->id); - if (!empty($foe)) { - // if it has OEmbed info, it's an attachment, too - $is_attachment = true; - $attachment_id = $f->id; - - $thumb = File_thumbnail::staticGet('file_id', $f->id); - if (!empty($thumb)) { - $has_thumb = true; - } + + $thumb = File_thumbnail::staticGet('file_id', $f->id); + if (!empty($thumb)) { + $has_thumb = true; } } } @@ -1130,7 +1123,8 @@ function common_request_id() $pid = getmypid(); $server = common_config('site', 'server'); if (php_sapi_name() == 'cli') { - return "$server:$pid"; + $script = basename($_SERVER['PHP_SELF']); + return "$server:$script:$pid"; } else { static $req_id = null; if (!isset($req_id)) { diff --git a/lib/xmppconfirmmanager.php b/lib/xmppconfirmmanager.php deleted file mode 100644 index ee4e294fd..000000000 --- a/lib/xmppconfirmmanager.php +++ /dev/null @@ -1,168 +0,0 @@ -<?php -/* - * StatusNet - the distributed open-source microblogging tool - * Copyright (C) 2008-2010 StatusNet, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -if (!defined('STATUSNET') && !defined('LACONICA')) { - exit(1); -} - -/** - * Event handler for pushing new confirmations to Jabber users. - * @fixme recommend redoing this on a queue-trigger model - * @fixme expiration of old items got dropped in the past, put it back? - */ -class XmppConfirmManager extends IoManager -{ - - /** - * @return mixed XmppConfirmManager, or false if unneeded - */ - public static function get() - { - if (common_config('xmpp', 'enabled')) { - $site = common_config('site', 'server'); - return new XmppConfirmManager(); - } else { - return false; - } - } - - /** - * Tell the i/o master we need one instance for each supporting site - * being handled in this process. - */ - public static function multiSite() - { - return IoManager::INSTANCE_PER_SITE; - } - - function __construct() - { - $this->site = common_config('site', 'server'); - } - - /** - * 10 seconds? Really? That seems a bit frequent. - */ - function pollInterval() - { - return 10; - } - - /** - * Ping! - * @return boolean true if we found something - */ - function poll() - { - $this->switchSite(); - $confirm = $this->next_confirm(); - if ($confirm) { - $this->handle_confirm($confirm); - return true; - } else { - return false; - } - } - - protected function handle_confirm($confirm) - { - require_once INSTALLDIR . '/lib/jabber.php'; - - common_log(LOG_INFO, 'Sending confirmation for ' . $confirm->address); - $user = User::staticGet($confirm->user_id); - if (!$user) { - common_log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id); - return; - } - $success = jabber_confirm_address($confirm->code, - $user->nickname, - $confirm->address); - if (!$success) { - common_log(LOG_ERR, 'Confirmation failed for ' . $confirm->address); - # Just let the claim age out; hopefully things work then - return; - } else { - common_log(LOG_INFO, 'Confirmation sent for ' . $confirm->address); - # Mark confirmation sent; need a dupe so we don't have the WHERE clause - $dupe = Confirm_address::staticGet('code', $confirm->code); - if (!$dupe) { - common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__); - return; - } - $orig = clone($dupe); - $dupe->sent = $dupe->claimed; - $result = $dupe->update($orig); - if (!$result) { - common_log_db_error($dupe, 'UPDATE', __FILE__); - # Just let the claim age out; hopefully things work then - return; - } - } - return true; - } - - protected function next_confirm() - { - $confirm = new Confirm_address(); - $confirm->whereAdd('claimed IS null'); - $confirm->whereAdd('sent IS null'); - # XXX: eventually we could do other confirmations in the queue, too - $confirm->address_type = 'jabber'; - $confirm->orderBy('modified DESC'); - $confirm->limit(1); - if ($confirm->find(true)) { - common_log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address); - # working around some weird DB_DataObject behaviour - $confirm->whereAdd(''); # clears where stuff - $original = clone($confirm); - $confirm->claimed = common_sql_now(); - $result = $confirm->update($original); - if ($result) { - common_log(LOG_INFO, 'Succeeded in claim! '. $result); - return $confirm; - } else { - common_log(LOG_INFO, 'Failed in claim!'); - return false; - } - } - return null; - } - - protected function clear_old_confirm_claims() - { - $confirm = new Confirm(); - $confirm->claimed = null; - $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY); - $confirm->free(); - unset($confirm); - } - - /** - * Make sure we're on the right site configuration - */ - protected function switchSite() - { - if ($this->site != common_config('site', 'server')) { - common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site"); - $this->stats('switch'); - StatusNet::init($this->site); - } - } -} diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php index dfff63a30..299175dd7 100644 --- a/lib/xmppmanager.php +++ b/lib/xmppmanager.php @@ -70,6 +70,7 @@ class XmppManager extends IoManager function __construct() { $this->site = common_config('site', 'server'); + $this->resource = common_config('xmpp', 'resource') . 'daemon'; } /** @@ -86,15 +87,19 @@ class XmppManager extends IoManager # Low priority; we don't want to receive messages common_log(LOG_INFO, "INITIALIZE"); - $this->conn = jabber_connect($this->resource()); + $this->conn = jabber_connect($this->resource); if (empty($this->conn)) { common_log(LOG_ERR, "Couldn't connect to server."); return false; } - $this->conn->addEventHandler('message', 'forward_message', $this); + $this->log(LOG_DEBUG, "Initializing stanza handlers."); + + $this->conn->addEventHandler('message', 'handle_message', $this); + $this->conn->addEventHandler('presence', 'handle_presence', $this); $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this); + $this->conn->setReconnectTimeout(600); jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1); @@ -176,11 +181,36 @@ class XmppManager extends IoManager } /** + * For queue handlers to pass us a message to push out, + * if we're active. + * + * @fixme should this be blocking etc? + * + * @param string $msg XML stanza to send + * @return boolean success + */ + public function send($msg) + { + if ($this->conn && !$this->conn->isDisconnected()) { + $bytes = $this->conn->send($msg); + if ($bytes > 0) { + $this->conn->processTime(0); + return true; + } else { + return false; + } + } else { + // Can't send right now... + return false; + } + } + + /** * Send a keepalive ping to the XMPP server. */ protected function sendPing() { - $jid = jabber_daemon_address().'/'.$this->resource(); + $jid = jabber_daemon_address().'/'.$this->resource; $server = common_config('xmpp', 'server'); if (!isset($this->pingid)) { @@ -206,61 +236,239 @@ class XmppManager extends IoManager $this->conn->presence(null, 'available', null, 'available', -1); } + + function get_user($from) + { + $user = User::staticGet('jabber', jabber_normalize_jid($from)); + return $user; + } + /** - * Callback for Jabber message event. - * - * This connection handles output; if we get a message straight to us, - * forward it on to our XmppDaemon listener for processing. - * - * @param $pl + * XMPP callback for handling message input... + * @param array $pl XMPP payload */ - function forward_message(&$pl) + function handle_message(&$pl) { + $from = jabber_normalize_jid($pl['from']); + if ($pl['type'] != 'chat') { - common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']); + $this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from."); return; } - $listener = $this->listener(); - if (strtolower($listener) == strtolower($pl['from'])) { - common_log(LOG_WARNING, 'Ignoring loop message.'); + + if (mb_strlen($pl['body']) == 0) { + $this->log(LOG_WARNING, "Ignoring message with empty body from $from."); return; } - common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener); - $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from'])); + + // Forwarded from another daemon for us to handle; this shouldn't + // happen any more but we might get some legacy items. + if ($this->is_self($from)) { + $this->log(LOG_INFO, "Got forwarded notice from self ($from)."); + $from = $this->get_ofrom($pl); + $this->log(LOG_INFO, "Originally sent by $from."); + if (is_null($from) || $this->is_self($from)) { + $this->log(LOG_INFO, "Ignoring notice originally sent by $from."); + return; + } + } + + $user = $this->get_user($from); + + // For common_current_user to work + global $_cur; + $_cur = $user; + + if (!$user) { + $this->from_site($from, 'Unknown user; go to ' . + common_local_url('imsettings') . + ' to add your address to your account'); + $this->log(LOG_WARNING, 'Message from unknown user ' . $from); + return; + } + if ($this->handle_command($user, $pl['body'])) { + $this->log(LOG_INFO, "Command message by $from handled."); + return; + } else if ($this->is_autoreply($pl['body'])) { + $this->log(LOG_INFO, 'Ignoring auto reply from ' . $from); + return; + } else if ($this->is_otr($pl['body'])) { + $this->log(LOG_INFO, 'Ignoring OTR from ' . $from); + return; + } else { + + $this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname); + + $this->add_notice($user, $pl); + } + + $user->free(); + unset($user); + unset($_cur); + + unset($pl['xml']); + $pl['xml'] = null; + + $pl = null; + unset($pl); } - /** - * Build an <addresses> block with an ofrom entry for forwarded messages - * - * @param string $from Jabber ID of original sender - * @return string XML fragment - */ - protected function ofrom($from) + + function is_self($from) { - $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n"; - $address .= "<address type='ofrom' jid='$from' />\n"; - $address .= "</addresses>\n"; - return $address; + return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from)); } - /** - * Build the complete JID of the XmppDaemon process which - * handles primary XMPP input for this site. - * - * @return string Jabber ID - */ - protected function listener() + function get_ofrom($pl) + { + $xml = $pl['xml']; + $addresses = $xml->sub('addresses'); + if (!$addresses) { + $this->log(LOG_WARNING, 'Forwarded message without addresses'); + return null; + } + $address = $addresses->sub('address'); + if (!$address) { + $this->log(LOG_WARNING, 'Forwarded message without address'); + return null; + } + if (!array_key_exists('type', $address->attrs)) { + $this->log(LOG_WARNING, 'No type for forwarded message'); + return null; + } + $type = $address->attrs['type']; + if ($type != 'ofrom') { + $this->log(LOG_WARNING, 'Type of forwarded message is not ofrom'); + return null; + } + if (!array_key_exists('jid', $address->attrs)) { + $this->log(LOG_WARNING, 'No jid for forwarded message'); + return null; + } + $jid = $address->attrs['jid']; + if (!$jid) { + $this->log(LOG_WARNING, 'Could not get jid from address'); + return null; + } + $this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid); + return $jid; + } + + function is_autoreply($txt) + { + if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) { + return true; + } else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) { + return true; + } else { + return false; + } + } + + function is_otr($txt) + { + if (preg_match('/^\?OTR/', $txt)) { + return true; + } else { + return false; + } + } + + function from_site($address, $msg) + { + $text = '['.common_config('site', 'name') . '] ' . $msg; + jabber_send_message($address, $text); + } + + function handle_command($user, $body) { - if (common_config('xmpp', 'listener')) { - return common_config('xmpp', 'listener'); + $inter = new CommandInterpreter(); + $cmd = $inter->handle_command($user, $body); + if ($cmd) { + $chan = new XMPPChannel($this->conn); + $cmd->execute($chan); + return true; } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; + return false; + } + } + + function add_notice(&$user, &$pl) + { + $body = trim($pl['body']); + $content_shortened = common_shorten_links($body); + if (Notice::contentTooLong($content_shortened)) { + $from = jabber_normalize_jid($pl['from']); + $this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'), + Notice::maxContent(), + mb_strlen($content_shortened))); + return; + } + + try { + $notice = Notice::saveNew($user->id, $content_shortened, 'xmpp'); + } catch (Exception $e) { + $this->log(LOG_ERR, $e->getMessage()); + $this->from_site($user->jabber, $e->getMessage()); + return; } + + common_broadcast_notice($notice); + $this->log(LOG_INFO, + 'Added notice ' . $notice->id . ' from user ' . $user->nickname); + $notice->free(); + unset($notice); + } + + function handle_presence(&$pl) + { + $from = jabber_normalize_jid($pl['from']); + switch ($pl['type']) { + case 'subscribe': + # We let anyone subscribe + $this->subscribed($from); + $this->log(LOG_INFO, + 'Accepted subscription from ' . $from); + break; + case 'subscribed': + case 'unsubscribed': + case 'unsubscribe': + $this->log(LOG_INFO, + 'Ignoring "' . $pl['type'] . '" from ' . $from); + break; + default: + if (!$pl['type']) { + $user = User::staticGet('jabber', $from); + if (!$user) { + $this->log(LOG_WARNING, 'Presence from unknown user ' . $from); + return; + } + if ($user->updatefrompresence) { + $this->log(LOG_INFO, 'Updating ' . $user->nickname . + ' status from presence.'); + $this->add_notice($user, $pl); + } + $user->free(); + unset($user); + } + break; + } + unset($pl['xml']); + $pl['xml'] = null; + + $pl = null; + unset($pl); + } + + function log($level, $msg) + { + $text = 'XMPPDaemon('.$this->resource.'): '.$msg; + common_log($level, $text); } - protected function resource() + function subscribed($to) { - return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique + jabber_special_presence('subscribed', $to); } /** diff --git a/lib/xmppoutqueuehandler.php b/lib/xmppoutqueuehandler.php new file mode 100644 index 000000000..2afa260f1 --- /dev/null +++ b/lib/xmppoutqueuehandler.php @@ -0,0 +1,55 @@ +<?php +/* + * StatusNet - the distributed open-source microblogging tool + * Copyright (C) 2010, StatusNet, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** + * Queue handler for pre-processed outgoing XMPP messages. + * Formatted XML stanzas will have been pushed into the queue + * via the Queued_XMPP connection proxy, probably from some + * other queue processor. + * + * Here, the XML stanzas are simply pulled out of the queue and + * pushed out over the wire; an XmppManager is needed to set up + * and maintain the actual server connection. + * + * This queue will be run via XmppDaemon rather than QueueDaemon. + * + * @author Brion Vibber <brion@status.net> + */ +class XmppOutQueueHandler extends QueueHandler +{ + function transport() { + return 'xmppout'; + } + + /** + * Take a previously-queued XMPP stanza and send it out ot the server. + * @param string $msg + * @return boolean true on success + */ + function handle($msg) + { + assert(is_string($msg)); + + $xmpp = XmppManager::get(); + $ok = $xmpp->send($msg); + + return $ok; + } +} + |