diff options
author | Brion Vibber <brion@pobox.com> | 2010-02-17 16:49:00 -0800 |
---|---|---|
committer | Brion Vibber <brion@pobox.com> | 2010-02-17 16:49:00 -0800 |
commit | ce6be4f83624d8c39a93d2b54567cc2f33580812 (patch) | |
tree | c121a3d5666c98649351fa04cc48e4c03a16db76 /lib | |
parent | 73ba26efe3d9d97c478a507d351ac92d28d82655 (diff) |
Queues: redid the breakout control model so we can start up and subscribe to queues without running through the complete site list, which is ok at 1k sites but too slow at 10k.
All breakout queues that we're going to need to listen to now need to be explicitly listed in $config['queue']['breakout'].
Until XMPP is moved to component model, this setting will let the individual processes work with their own queues:
$config['queue']['breakout'][] = 'xmpp/xmppout/' . $config['site']['nickname'];
Diffstat (limited to 'lib')
-rw-r--r-- | lib/default.php | 11 | ||||
-rw-r--r-- | lib/iomaster.php | 23 | ||||
-rw-r--r-- | lib/stompqueuemanager.php | 119 |
3 files changed, 76 insertions, 77 deletions
diff --git a/lib/default.php b/lib/default.php index a74cccae1..c969c3b33 100644 --- a/lib/default.php +++ b/lib/default.php @@ -91,10 +91,13 @@ $default = 'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup 'debug_memory' => false, // true to spit memory usage to log 'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue - 'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout - // 'shared': use a shared queue for all sites - // 'handler': share each/this handler over multiple sites - // 'site': break out for each/this handler on this site + 'breakout' => array(), // List queue specifiers to break out when using Stomp queue. + // Default will share all queues for all sites within each group. + // Specify as <group>/<queue> or <group>/<queue>/<site>, + // using nickname identifier as site. + // + // 'main/distrib' separate "distrib" queue covering all sites + // 'xmpp/xmppout/mysite' separate "xmppout" queue covering just 'mysite' 'max_retries' => 10, // drop messages after N failed attempts to process (Stomp) 'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp) ), diff --git a/lib/iomaster.php b/lib/iomaster.php index 54e2dfe84..d20837ba5 100644 --- a/lib/iomaster.php +++ b/lib/iomaster.php @@ -55,27 +55,18 @@ abstract class IoMaster if ($multiSite !== null) { $this->multiSite = $multiSite; } - if ($this->multiSite) { - $this->sites = StatusNet::findAllSites(); - } else { - $this->sites = array(StatusNet::currentSite()); - } - - if (empty($this->sites)) { - throw new Exception("Empty status_network table, cannot init"); - } - foreach ($this->sites as $site) { - StatusNet::switchSite($site); - $this->initManagers(); - } + $this->initManagers(); } /** - * Initialize IoManagers for the currently configured site - * which are appropriate to this instance. + * Initialize IoManagers which are appropriate to this instance; + * pass class names or instances into $this->instantiate(). + * + * If setup and configuration may vary between sites in multi-site + * mode, it's the subclass's responsibility to set them up here. * - * Pass class names into $this->instantiate() + * Switching site configurations is an acceptable side effect. */ abstract function initManagers(); diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index bfeeb23b7..9af8b2f48 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -63,7 +63,7 @@ class StompQueueManager extends QueueManager $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); $this->control = common_config('queue', 'control_channel'); - $this->subscriptions = array($this->control => $this->control); + $this->breakout = common_config('queue', 'breakout'); } /** @@ -76,28 +76,6 @@ class StompQueueManager extends QueueManager } /** - * Record queue subscriptions we'll need to handle the current site. - */ - public function addSite() - { - $this->sites[] = StatusNet::currentSite(); - - // Set up handlers active for this site... - $this->initialize(); - - foreach ($this->activeGroups as $group) { - if (isset($this->groups[$group])) { - // Actual queues may be broken out or consolidated... - // Subscribe to all the target queues we'll need. - foreach ($this->groups[$group] as $transport => $class) { - $target = $this->queueName($transport); - $this->subscriptions[$target] = $target; - } - } - } - } - - /** * Optional; ping any running queue handler daemons with a notification * such as announcing a new site to handle or requesting clean shutdown. * This avoids having to restart all the daemons manually to update configs @@ -166,14 +144,15 @@ class StompQueueManager extends QueueManager $con = $this->cons[$idx]; $host = $con->getServer(); - $result = $con->send($this->queueName($queue), $msg, $props); + $target = $this->queueName($queue); + $result = $con->send($target, $msg, $props); if (!$result) { - $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); + $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target"); return false; } - $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); + $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target"); $this->stats('enqueued', $queue); return true; } @@ -432,11 +411,42 @@ class StompQueueManager extends QueueManager protected function doSubscribe(LiberalStomp $con) { $host = $con->getServer(); - foreach ($this->subscriptions as $queue) { - $this->_log(LOG_INFO, "Subscribing to $queue on $host"); - $con->subscribe($queue); + foreach ($this->subscriptions() as $sub) { + $this->_log(LOG_INFO, "Subscribing to $sub on $host"); + $con->subscribe($sub); } } + + /** + * Grab a full list of stomp-side queue subscriptions. + * Will include: + * - control broadcast channel + * - shared group queues for active groups + * - per-handler and per-site breakouts from $config['queue']['breakout'] + * that are rooted in the active groups. + * + * @return array of strings + */ + protected function subscriptions() + { + $subs = array(); + $subs[] = $this->control; + + foreach ($this->activeGroups as $group) { + $subs[] = $this->base . $group; + } + + foreach ($this->breakout as $spec) { + $parts = explode('/', $spec); + if (count($parts) < 2 || count($parts) > 3) { + common_log(LOG_ERR, "Bad queue breakout specifier $spec"); + } + if (in_array($parts[0], $this->activeGroups)) { + $subs[] = $this->base . $spec; + } + } + return array_unique($subs); + } /** * Handle and acknowledge an event that's come in through a queue. @@ -612,32 +622,26 @@ class StompQueueManager extends QueueManager } /** - * Set us up with queue subscriptions for a new site added at runtime, + * (Re)load runtime configuration for a given site by nickname, * triggered by a broadcast to the 'statusnet-control' topic. * + * Configuration changes in database should update, but config + * files might not. + * * @param array $frame Stomp frame * @return bool true to continue; false to stop further processing. */ protected function updateSiteConfig($nickname) { - if (empty($this->sites)) { - if ($nickname == common_config('site', 'nickname')) { - StatusNet::init(common_config('site', 'server')); - } else { - $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname"); + $sn = Status_network::staticGet($nickname); + if ($sn) { + $this->switchSite($nickname); + if (!in_array($nickname, $this->sites)) { + $this->addSite(); } + $this->stats('siteupdate'); } else { - $sn = Status_network::staticGet($nickname); - if ($sn) { - $this->switchSite($nickname); - if (!in_array($nickname, $this->sites)) { - $this->addSite(); - } - // @fixme update subscriptions, if applicable - $this->stats('siteupdate'); - } else { - $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); - } + $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); } } @@ -646,24 +650,25 @@ class StompQueueManager extends QueueManager * group name for this queue to give eg: * * /queue/statusnet/main + * /queue/statusnet/main/distrib + * /queue/statusnet/xmpp/xmppout/site01 * * @param string $queue * @return string */ protected function queueName($queue) { - $base = common_config('queue', 'queue_basename'); $group = $this->queueGroup($queue); - $breakout = $this->breakoutMode($queue); - if ($breakout == 'shared') { - return $base . "$group"; - } else if ($breakout == 'handler') { - return $base . "$group/$queue"; - } else if ($breakout == 'site') { - $site = StatusNet::currentSite(); - return $base . "$group/$queue/$site"; - } - throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'"); + $site = StatusNet::currentSite(); + + $specs = array("$group/$queue/$site", + "$group/$queue"); + foreach ($specs as $spec) { + if (in_array($spec, $this->breakout)) { + return $this->base . $spec; + } + } + return $this->base . $group; } /** |