diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/cache.php | 26 | ||||
-rw-r--r-- | lib/dbqueuemanager.php | 7 | ||||
-rw-r--r-- | lib/default.php | 8 | ||||
-rw-r--r-- | lib/iomanager.php | 5 | ||||
-rw-r--r-- | lib/iomaster.php | 74 | ||||
-rw-r--r-- | lib/queued_xmpp.php | 2 | ||||
-rw-r--r-- | lib/queuemanager.php | 97 | ||||
-rw-r--r-- | lib/spawningdaemon.php | 16 | ||||
-rw-r--r-- | lib/statusnet.php | 54 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 369 | ||||
-rw-r--r-- | lib/xmppmanager.php | 8 |
11 files changed, 373 insertions, 293 deletions
diff --git a/lib/cache.php b/lib/cache.php index 635c96ad4..b3ec7534f 100644 --- a/lib/cache.php +++ b/lib/cache.php @@ -158,6 +158,32 @@ class Cache } /** + * Atomically increment an existing numeric value. + * Existing expiration time should remain unchanged, if any. + * + * @param string $key The key to use for lookups + * @param int $step Amount to increment (default 1) + * + * @return mixed incremented value, or false if not set. + */ + function increment($key, $step=1) + { + $value = false; + if (Event::handle('StartCacheIncrement', array(&$key, &$step, &$value))) { + // Fallback is not guaranteed to be atomic, + // and may original expiry value. + $value = $this->get($key); + if ($value !== false) { + $value += $step; + $ok = $this->set($key, $value); + $got = $this->get($key); + } + Event::handle('EndCacheIncrement', array($key, $step, $value)); + } + return $value; + } + + /** * Delete the value associated with a key * * @param string $key Key to delete diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c6350fc66..3032e4ec7 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -72,7 +72,7 @@ class DBQueueManager extends QueueManager public function poll() { $this->_log(LOG_DEBUG, 'Checking for notices...'); - $qi = Queue_item::top($this->getQueues()); + $qi = Queue_item::top($this->activeQueues()); if (empty($qi)) { $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); return false; @@ -142,9 +142,4 @@ class DBQueueManager extends QueueManager $this->stats('error', $queue); } - - protected function _log($level, $msg) - { - common_log($level, 'DBQueueManager: '.$msg); - } } diff --git a/lib/default.php b/lib/default.php index bf4b83718..a74cccae1 100644 --- a/lib/default.php +++ b/lib/default.php @@ -81,7 +81,7 @@ $default = 'subsystem' => 'db', # default to database, or 'stomp' 'stomp_server' => null, 'queue_basename' => '/queue/statusnet/', - 'control_channel' => '/topic/statusnet-control', // broadcasts to all queue daemons + 'control_channel' => '/topic/statusnet/control', // broadcasts to all queue daemons 'stomp_username' => null, 'stomp_password' => null, 'stomp_persistent' => true, // keep items across queue server restart, if persistence is enabled @@ -91,6 +91,12 @@ $default = 'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup 'debug_memory' => false, // true to spit memory usage to log 'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue + 'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout + // 'shared': use a shared queue for all sites + // 'handler': share each/this handler over multiple sites + // 'site': break out for each/this handler on this site + 'max_retries' => 10, // drop messages after N failed attempts to process (Stomp) + 'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp) ), 'license' => array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private' diff --git a/lib/iomanager.php b/lib/iomanager.php index ee2ff958b..217599a6d 100644 --- a/lib/iomanager.php +++ b/lib/iomanager.php @@ -59,9 +59,10 @@ abstract class IoManager * your manager about each site you'll have to handle so you * can do any necessary per-site setup. * - * @param string $site target site server name + * The new site will be the currently live configuration during + * this call. */ - public function addSite($site) + public function addSite() { /* no-op */ } diff --git a/lib/iomaster.php b/lib/iomaster.php index bcab3542b..54e2dfe84 100644 --- a/lib/iomaster.php +++ b/lib/iomaster.php @@ -56,9 +56,9 @@ abstract class IoMaster $this->multiSite = $multiSite; } if ($this->multiSite) { - $this->sites = $this->findAllSites(); + $this->sites = StatusNet::findAllSites(); } else { - $this->sites = array(common_config('site', 'server')); + $this->sites = array(StatusNet::currentSite()); } if (empty($this->sites)) { @@ -66,9 +66,7 @@ abstract class IoMaster } foreach ($this->sites as $site) { - if ($site != common_config('site', 'server')) { - StatusNet::init($site); - } + StatusNet::switchSite($site); $this->initManagers(); } } @@ -82,57 +80,31 @@ abstract class IoMaster abstract function initManagers(); /** - * Pull all local sites from status_network table. - * @return array of hostnames - */ - protected function findAllSites() - { - $hosts = array(); - $sn = new Status_network(); - $sn->find(); - while ($sn->fetch()) { - $hosts[] = $sn->getServerName(); - } - return $hosts; - } - - /** * Instantiate an i/o manager class for the current site. * If a multi-site capable handler is already present, * we don't need to build a new one. * - * @param string $class + * @param mixed $manager class name (to run $class::get()) or object */ - protected function instantiate($class) + protected function instantiate($manager) { - if (isset($this->singletons[$class])) { - // Already instantiated a multi-site-capable handler. - // Just let it know it should listen to this site too! - $this->singletons[$class]->addSite(common_config('site', 'server')); - return; + if (is_string($manager)) { + $manager = call_user_func(array($class, 'get')); } - $manager = $this->getManager($class); - - if ($this->multiSite) { - $caps = $manager->multiSite(); - if ($caps == IoManager::SINGLE_ONLY) { + $caps = $manager->multiSite(); + if ($caps == IoManager::SINGLE_ONLY) { + if ($this->multiSite) { throw new Exception("$class can't run with --all; aborting."); } - if ($caps == IoManager::INSTANCE_PER_PROCESS) { - // Save this guy for later! - // We'll only need the one to cover multiple sites. - $this->singletons[$class] = $manager; - $manager->addSite(common_config('site', 'server')); - } + } else if ($caps == IoManager::INSTANCE_PER_PROCESS) { + $manager->addSite(); } - $this->managers[] = $manager; - } - - protected function getManager($class) - { - return call_user_func(array($class, 'get')); + if (!in_array($manager, $this->managers, true)) { + // Only need to save singletons once + $this->managers[] = $manager; + } } /** @@ -146,6 +118,7 @@ abstract class IoMaster { $this->logState('init'); $this->start(); + $this->checkMemory(false); while (!$this->shutdown) { $timeouts = array_values($this->pollTimeouts); @@ -209,17 +182,24 @@ abstract class IoMaster /** * Check runtime memory usage, possibly triggering a graceful shutdown * and thread respawn if we've crossed the soft limit. + * + * @param boolean $respawn if false we'll shut down instead of respawning */ - protected function checkMemory() + protected function checkMemory($respawn=true) { $memoryLimit = $this->softMemoryLimit(); if ($memoryLimit > 0) { $usage = memory_get_usage(); if ($usage > $memoryLimit) { common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); - $this->requestRestart(); + if ($respawn) { + $this->requestRestart(); + } else { + $this->requestShutdown(); + } } else if (common_config('queue', 'debug_memory')) { - common_log(LOG_DEBUG, "Memory usage $usage"); + $fmt = number_format($usage); + common_log(LOG_DEBUG, "Memory usage $fmt"); } } } diff --git a/lib/queued_xmpp.php b/lib/queued_xmpp.php index 4b890c4ca..fdd074db2 100644 --- a/lib/queued_xmpp.php +++ b/lib/queued_xmpp.php @@ -63,7 +63,7 @@ class Queued_XMPP extends XMPPHP_XMPP */ public function send($msg, $timeout=NULL) { - $qm = QueueManager::get(); + $qm = QueueManager::get('xmppout'); $qm->enqueue(strval($msg), 'xmppout'); } diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 149617eb5..8f8c8f133 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,9 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; - public $master = null; - public $handlers = array(); - public $groups = array(); + protected $master = null; + protected $handlers = array(); + protected $groups = array(); + protected $activeGroups = array(); /** * Factory function to pull the appropriate QueueManager object @@ -215,55 +216,64 @@ abstract class QueueManager extends IoManager if (class_exists($class)) { return new $class(); } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); } } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); } return null; } /** * Get a list of registered queue transport names to be used - * for this daemon. + * for listening in this daemon. * * @return array of strings */ - function getQueues() + function activeQueues() { - $group = $this->activeGroup(); - return array_keys($this->groups[$group]); + $queues = array(); + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + $queues = array_merge($queues, $this->groups[$group]); + } + } + + return array_keys($queues); } /** - * Initialize the list of queue handlers + * Initialize the list of queue handlers for the current site. * * @event StartInitializeQueueManager * @event EndInitializeQueueManager */ function initialize() { - // @fixme we'll want to be able to listen to particular queues... + $this->handlers = array(); + $this->groups = array(); + $this->groupsByTransport = array(); + if (Event::handle('StartInitializeQueueManager', array($this))) { - $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('distrib', 'DistribQueueHandler'); $this->connect('omb', 'OmbQueueHandler'); $this->connect('ping', 'PingQueueHandler'); - $this->connect('distrib', 'DistribQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); } // XMPP output handlers... - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - // @fixme this should get an actual queue - //$this->connect('confirm', 'XmppConfirmHandler'); + if (common_config('xmpp', 'enabled')) { + // Delivery prep, read by queuedaemon.php: + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // Raw output, read by xmppdaemon.php: + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp'); + } // For compat with old plugins not registering their own handlers. $this->connect('plugin', 'PluginQueueHandler'); - - $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); - } Event::handle('EndInitializeQueueManager', array($this)); } @@ -276,25 +286,41 @@ abstract class QueueManager extends IoManager * @param string $class * @param string $group */ - public function connect($transport, $class, $group='queuedaemon') + public function connect($transport, $class, $group='main') { $this->handlers[$transport] = $class; $this->groups[$group][$transport] = $class; + $this->groupsByTransport[$transport] = $group; } /** - * @return string queue group to use for this request + * Set the active group which will be used for listening. + * @param string $group */ - function activeGroup() + function setActiveGroup($group) { - $group = 'queuedaemon'; - if ($this->master) { - // hack hack - if ($this->master instanceof XmppMaster) { - return 'xmppdaemon'; - } + $this->activeGroups = array($group); + } + + /** + * Set the active group(s) which will be used for listening. + * @param array $groups + */ + function setActiveGroups($groups) + { + $this->activeGroups = $groups; + } + + /** + * @return string queue group for this queue + */ + function queueGroup($queue) + { + if (isset($this->groupsByTransport[$queue])) { + return $this->groupsByTransport[$queue]; + } else { + throw new Exception("Requested group for unregistered transport $queue"); } - return $group; } /** @@ -318,4 +344,15 @@ abstract class QueueManager extends IoManager $monitor->stats($key, $owners); } } + + protected function _log($level, $msg) + { + $class = get_class($this); + if ($this->activeGroups) { + $groups = ' (' . implode(',', $this->activeGroups) . ')'; + } else { + $groups = ''; + } + common_log($level, "$class$groups: $msg"); + } } diff --git a/lib/spawningdaemon.php b/lib/spawningdaemon.php index 862cbb4fa..fd9ae4355 100644 --- a/lib/spawningdaemon.php +++ b/lib/spawningdaemon.php @@ -90,18 +90,24 @@ abstract class SpawningDaemon extends Daemon while (count($children) > 0) { $status = null; $pid = pcntl_wait($status); - if ($pid > 0 && pcntl_wifexited($status)) { - $exitCode = pcntl_wexitstatus($status); - + if ($pid > 0) { $i = array_search($pid, $children); if ($i === false) { - $this->log(LOG_ERR, "Unrecognized child pid $pid exited with status $exitCode"); + $this->log(LOG_ERR, "Ignoring exit of unrecognized child pid $pid"); continue; } + if (pcntl_wifexited($status)) { + $exitCode = pcntl_wexitstatus($status); + $info = "status $exitCode"; + } else if (pcntl_wifsignaled($status)) { + $exitCode = self::EXIT_ERR; + $signal = pcntl_wtermsig($status); + $info = "signal $signal"; + } unset($children[$i]); if ($this->shouldRespawn($exitCode)) { - $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; respawing."); + $this->log(LOG_INFO, "Thread $i pid $pid exited with $info; respawing."); $pid = pcntl_fork(); if ($pid < 0) { diff --git a/lib/statusnet.php b/lib/statusnet.php index 29e903026..9c7ede5a5 100644 --- a/lib/statusnet.php +++ b/lib/statusnet.php @@ -102,6 +102,60 @@ class StatusNet } /** + * Get identifier of the currently active site configuration + * @return string + */ + public static function currentSite() + { + return common_config('site', 'nickname'); + } + + /** + * Change site configuration to site specified by nickname, + * if set up via Status_network. If not, sites other than + * the current will fail horribly. + * + * May throw exception or trigger a fatal error if the given + * site is missing or configured incorrectly. + * + * @param string $nickname + */ + public static function switchSite($nickname) + { + if ($nickname == StatusNet::currentSite()) { + return true; + } + + $sn = Status_network::staticGet($nickname); + if (empty($sn)) { + return false; + throw new Exception("No such site nickname '$nickname'"); + } + + $server = $sn->getServerName(); + StatusNet::init($server); + } + + /** + * Pull all local sites from status_network table. + * + * Behavior undefined if site is not configured via Status_network. + * + * @return array of nicknames + */ + public static function findAllSites() + { + $sites = array(); + $sn = new Status_network(); + $sn->find(); + while ($sn->fetch()) { + $sites[] = $sn->nickname; + } + return $sites; + } + + + /** * Fire initialization events for all instantiated plugins. */ protected static function initPlugins() diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index cd62c25bd..bfeeb23b7 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -63,6 +63,7 @@ class StompQueueManager extends QueueManager $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); $this->control = common_config('queue', 'control_channel'); + $this->subscriptions = array($this->control => $this->control); } /** @@ -75,17 +76,25 @@ class StompQueueManager extends QueueManager } /** - * Record each site we'll be handling input for in this process, - * so we can listen to the necessary queues for it. - * - * @fixme possibly actually do subscription here to save another - * loop over all sites later? - * @fixme possibly don't assume it's the current site + * Record queue subscriptions we'll need to handle the current site. */ - public function addSite($server) + public function addSite() { - $this->sites[] = $server; + $this->sites[] = StatusNet::currentSite(); + + // Set up handlers active for this site... $this->initialize(); + + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + // Actual queues may be broken out or consolidated... + // Subscribe to all the target queues we'll need. + foreach ($this->groups[$group] as $transport => $class) { + $target = $this->queueName($transport); + $this->subscriptions[$target] = $target; + } + } + } } /** @@ -121,59 +130,11 @@ class StompQueueManager extends QueueManager } /** - * Instantiate the appropriate QueueHandler class for the given queue. + * Saves an object into the queue item table. * + * @param mixed $object * @param string $queue - * @return mixed QueueHandler or null - */ - function getHandler($queue) - { - $handlers = $this->handlers[$this->currentSite()]; - if (isset($handlers[$queue])) { - $class = $handlers[$queue]; - if (class_exists($class)) { - return new $class(); - } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); - } - } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); - } - return null; - } - - /** - * Get a list of all registered queue transport names. - * - * @return array of strings - */ - function getQueues() - { - $group = $this->activeGroup(); - $site = $this->currentSite(); - if (empty($this->groups[$site][$group])) { - return array(); - } else { - return array_keys($this->groups[$site][$group]); - } - } - - /** - * Register a queue transport name and handler class for your plugin. - * Only registered transports will be reliably picked up! * - * @param string $transport - * @param string $class - * @param string $group - */ - public function connect($transport, $class, $group='queuedaemon') - { - $this->handlers[$this->currentSite()][$transport] = $class; - $this->groups[$this->currentSite()][$group][$transport] = $class; - } - - /** - * Saves a notice object reference into the queue item table. * @return boolean true on success * @throws StompException on connection or send error */ @@ -192,8 +153,11 @@ class StompQueueManager extends QueueManager */ protected function _doEnqueue($object, $queue, $idx) { - $msg = $this->encode($object); $rep = $this->logrep($object); + $envelope = array('site' => common_config('site', 'nickname'), + 'handler' => $queue, + 'payload' => $this->encode($object)); + $msg = serialize($envelope); $props = array('created' => common_sql_now()); if ($this->isPersistent($queue)) { @@ -205,11 +169,11 @@ class StompQueueManager extends QueueManager $result = $con->send($this->queueName($queue), $msg, $props); if (!$result) { - common_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); + $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); return false; } - common_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); + $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); $this->stats('enqueued', $queue); return true; } @@ -275,12 +239,14 @@ class StompQueueManager extends QueueManager $idx = $this->connectionFromSocket($socket); $con = $this->cons[$idx]; $host = $con->getServer(); + $this->defaultIdx = $idx; $ok = true; try { $frames = $con->readFrames(); } catch (StompException $e) { - common_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); + $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); + fclose($socket); // ??? $this->cons[$idx] = null; $this->transaction[$idx] = null; $this->disconnect[$idx] = time(); @@ -289,14 +255,17 @@ class StompQueueManager extends QueueManager foreach ($frames as $frame) { $dest = $frame->headers['destination']; if ($dest == $this->control) { - if (!$this->handleControlSignal($idx, $frame)) { + if (!$this->handleControlSignal($frame)) { // We got a control event that requests a shutdown; // close out and stop handling anything else! break; } } else { - $ok = $ok && $this->handleItem($idx, $frame); + $ok = $this->handleItem($frame) && $ok; } + $this->ack($idx, $frame); + $this->commit($idx); + $this->begin($idx); } return $ok; } @@ -333,22 +302,9 @@ class StompQueueManager extends QueueManager parent::start($master); $this->_connectAll(); - common_log(LOG_INFO, "Subscribing to $this->control"); - foreach ($this->cons as $con) { - if ($con) { - $con->subscribe($this->control); - } - } - if ($this->sites) { - foreach ($this->sites as $server) { - StatusNet::init($server); - $this->doSubscribe(); - } - } else { - $this->doSubscribe(); - } foreach ($this->cons as $i => $con) { if ($con) { + $this->doSubscribe($con); $this->begin($i); } } @@ -356,9 +312,7 @@ class StompQueueManager extends QueueManager } /** - * Subscribe to all the queues we're going to need to handle... - * - * Side effects: in multi-site mode, may reset site configuration. + * Close out any active connections. * * @return bool return false on failure */ @@ -377,15 +331,6 @@ class StompQueueManager extends QueueManager } /** - * Get identifier of the currently active site configuration - * @return string - */ - protected function currentSite() - { - return common_config('site', 'server'); // @fixme switch to nickname - } - - /** * Lazy open a single connection to Stomp queue server. * If multiple servers are configured, we let the Stomp client library * worry about finding a working connection among them. @@ -441,6 +386,10 @@ class StompQueueManager extends QueueManager } } + /** + * Attempt to manually reconnect to the Stomp server for the given + * slot. If successful, set up our subscriptions on it. + */ protected function _reconnect($idx) { try { @@ -453,17 +402,7 @@ class StompQueueManager extends QueueManager $this->cons[$idx] = $con; $this->disconnect[$idx] = null; - // now we have to listen to everything... - // @fixme refactor this nicer. :P - $host = $con->getServer(); - $this->_log(LOG_INFO, "Resubscribing to $this->control on $host"); - $con->subscribe($this->control); - foreach ($this->subscriptions as $site => $queues) { - foreach ($queues as $queue) { - $this->_log(LOG_INFO, "Resubscribing to $queue on $host"); - $con->subscribe($queue); - } - } + $this->doSubscribe($con); $this->begin($idx); } else { // Try again later... @@ -487,41 +426,15 @@ class StompQueueManager extends QueueManager } /** - * Subscribe to all enabled notice queues for the current site. - */ - protected function doSubscribe() - { - $site = $this->currentSite(); - $this->_connect(); - foreach ($this->getQueues() as $queue) { - $rawqueue = $this->queueName($queue); - $this->subscriptions[$site][$queue] = $rawqueue; - $this->_log(LOG_INFO, "Subscribing to $rawqueue"); - foreach ($this->cons as $con) { - if ($con) { - $con->subscribe($rawqueue); - } - } - } - } - - /** - * Subscribe from all enabled notice queues for the current site. + * Set up all our raw queue subscriptions on the given connection + * @param LiberalStomp $con */ - protected function doUnsubscribe() + protected function doSubscribe(LiberalStomp $con) { - $site = $this->currentSite(); - $this->_connect(); - if (!empty($this->subscriptions[$site])) { - foreach ($this->subscriptions[$site] as $queue => $rawqueue) { - $this->_log(LOG_INFO, "Unsubscribing from $rawqueue"); - foreach ($this->cons as $con) { - if ($con) { - $con->unsubscribe($rawqueue); - } - } - unset($this->subscriptions[$site][$queue]); - } + $host = $con->getServer(); + foreach ($this->subscriptions as $queue) { + $this->_log(LOG_INFO, "Subscribing to $queue on $host"); + $con->subscribe($queue); } } @@ -534,25 +447,29 @@ class StompQueueManager extends QueueManager * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. * - * @param int $idx connection index * @param StompFrame $frame - * @return bool + * @return bool success */ - protected function handleItem($idx, $frame) + protected function handleItem($frame) { - $this->defaultIdx = $idx; + $host = $this->cons[$this->defaultIdx]->getServer(); + $message = unserialize($frame->body); + $site = $message['site']; + $queue = $message['handler']; - list($site, $queue) = $this->parseDestination($frame->headers['destination']); - if ($site != $this->currentSite()) { - $this->stats('switch'); - StatusNet::init($site); + if ($this->isDeadletter($frame, $message)) { + $this->stats('deadletter', $queue); + return false; } - $host = $this->cons[$idx]->getServer(); - $item = $this->decode($frame->body); + // @fixme detect failing site switches + $this->switchSite($site); + + $item = $this->decode($message['payload']); if (empty($item)) { $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); - return true; + $this->stats('baditem', $queue); + return false; } $info = $this->logrep($item) . " posted at " . $frame->headers['created'] . " in queue $queue from $host"; @@ -561,16 +478,10 @@ class StompQueueManager extends QueueManager $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERR, "Missing handler class; skipping $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); $this->stats('badhandler', $queue); return false; } - // If there's an exception when handling, - // log the error and let it get requeued. - try { $ok = $handler->handle($item); } catch (Exception $e) { @@ -578,25 +489,80 @@ class StompQueueManager extends QueueManager $ok = false; } - if (!$ok) { + if ($ok) { + $this->_log(LOG_INFO, "Successfully handled $info"); + $this->stats('handled', $queue); + } else { $this->_log(LOG_WARNING, "Failed handling $info"); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves; - // if we don't ack, it should resend... - $this->ack($idx, $frame); + // Requeing moves the item to the end of the line for its next try. + // @fixme add a manual retry count $this->enqueue($item, $queue); - $this->commit($idx); - $this->begin($idx); $this->stats('requeued', $queue); - return false; } - $this->_log(LOG_INFO, "Successfully handled $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); - $this->stats('handled', $queue); - return true; + return $ok; + } + + /** + * Check if a redelivered message has been run through enough + * that we're going to give up on it. + * + * @param StompFrame $frame + * @param array $message unserialized message body + * @return boolean true if we should discard + */ + protected function isDeadLetter($frame, $message) + { + if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') { + // Message was redelivered, possibly indicating a previous failure. + $msgId = $frame->headers['message-id']; + $site = $message['site']; + $queue = $message['handler']; + $msgInfo = "message $msgId for $site in queue $queue"; + + $deliveries = $this->incDeliveryCount($msgId); + if ($deliveries > common_config('queue', 'max_retries')) { + $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; + + $outdir = common_config('queue', 'dead_letter_dir'); + if ($outdir) { + $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); + $info .= ": dumping to $filename"; + file_put_contents($filename, $message['payload']); + } + + common_log(LOG_ERR, $info); + return true; + } else { + common_log(LOG_INFO, "retry $deliveries on $msgInfo"); + } + } + return false; + } + + /** + * Update count of times we've re-encountered this message recently, + * triggered when we get a message marked as 'redelivered'. + * + * Requires a CLI-friendly cache configuration. + * + * @param string $msgId message-id header from message + * @return int number of retries recorded + */ + function incDeliveryCount($msgId) + { + $count = 0; + $cache = common_memcache(); + if ($cache) { + $key = 'statusnet:stomp:message-retries:' . $msgId; + $count = $cache->increment($key); + if (!$count) { + $count = 1; + $cache->set($key, $count, null, 3600); + $got = $cache->get($key); + } + } + return $count; } /** @@ -629,14 +595,23 @@ class StompQueueManager extends QueueManager } else { $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message"); } - - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); return $shutdown; } /** + * Switch site, if necessary, and reset current handler assignments + * @param string $site + */ + function switchSite($site) + { + if ($site != StatusNet::currentSite()) { + $this->stats('switch'); + StatusNet::switchSite($site); + $this->initialize(); + } + } + + /** * Set us up with queue subscriptions for a new site added at runtime, * triggered by a broadcast to the 'statusnet-control' topic. * @@ -648,22 +623,17 @@ class StompQueueManager extends QueueManager if (empty($this->sites)) { if ($nickname == common_config('site', 'nickname')) { StatusNet::init(common_config('site', 'server')); - $this->doUnsubscribe(); - $this->doSubscribe(); } else { $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname"); } } else { $sn = Status_network::staticGet($nickname); if ($sn) { - $server = $sn->getServerName(); // @fixme do config-by-nick - StatusNet::init($server); - if (empty($this->sites[$server])) { - $this->addSite($server); + $this->switchSite($nickname); + if (!in_array($nickname, $this->sites)) { + $this->addSite(); } - $this->_log(LOG_INFO, "(Re)subscribing to queues for site $nickname / $server"); - $this->doUnsubscribe(); - $this->doSubscribe(); + // @fixme update subscriptions, if applicable $this->stats('siteupdate'); } else { $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); @@ -673,42 +643,47 @@ class StompQueueManager extends QueueManager /** * Combines the queue_basename from configuration with the - * site server name and queue name to give eg: + * group name for this queue to give eg: * - * /queue/statusnet/identi.ca/sms + * /queue/statusnet/main * * @param string $queue * @return string */ protected function queueName($queue) { - return common_config('queue', 'queue_basename') . - $this->currentSite() . '/' . $queue; + $base = common_config('queue', 'queue_basename'); + $group = $this->queueGroup($queue); + $breakout = $this->breakoutMode($queue); + if ($breakout == 'shared') { + return $base . "$group"; + } else if ($breakout == 'handler') { + return $base . "$group/$queue"; + } else if ($breakout == 'site') { + $site = StatusNet::currentSite(); + return $base . "$group/$queue/$site"; + } + throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'"); } /** - * Returns the site and queue name from the server-side queue. + * Get the breakout mode for the given queue on the current site. * - * @param string queue destination (eg '/queue/statusnet/identi.ca/sms') - * @return array of site and queue: ('identi.ca','sms') or false if unrecognized + * @param string $queue + * @return string one of 'shared', 'handler', 'site' */ - protected function parseDestination($dest) + protected function breakoutMode($queue) { - $prefix = common_config('queue', 'queue_basename'); - if (substr($dest, 0, strlen($prefix)) == $prefix) { - $rest = substr($dest, strlen($prefix)); - return explode("/", $rest, 2); + $breakout = common_config('queue', 'breakout'); + if (isset($breakout[$queue])) { + return $breakout[$queue]; + } else if (isset($breakout['*'])) { + return $breakout['*']; } else { - common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest"); - return array(false, false); + return 'shared'; } } - function _log($level, $msg) - { - common_log($level, 'StompQueueManager: '.$msg); - } - protected function begin($idx) { if ($this->useTransactions) { diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php index 985e7c32e..f37635855 100644 --- a/lib/xmppmanager.php +++ b/lib/xmppmanager.php @@ -48,7 +48,7 @@ class XmppManager extends IoManager public static function get() { if (common_config('xmpp', 'enabled')) { - $site = common_config('site', 'server'); + $site = StatusNet::currentSite(); if (empty(self::$singletons[$site])) { self::$singletons[$site] = new XmppManager(); } @@ -69,7 +69,7 @@ class XmppManager extends IoManager function __construct() { - $this->site = common_config('site', 'server'); + $this->site = StatusNet::currentSite(); $this->resource = common_config('xmpp', 'resource') . 'daemon'; } @@ -476,10 +476,10 @@ class XmppManager extends IoManager */ protected function switchSite() { - if ($this->site != common_config('site', 'server')) { + if ($this->site != StatusNet::currentSite()) { common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site"); $this->stats('switch'); - StatusNet::init($this->site); + StatusNet::switchSite($this->site); } } } |