summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--classes/Status_network.php11
-rw-r--r--lib/default.php1
-rw-r--r--lib/iomaster.php53
-rw-r--r--lib/queuemanager.php17
-rw-r--r--lib/spawningdaemon.php58
-rw-r--r--lib/stompqueuemanager.php159
-rwxr-xr-xscripts/queuectl.php85
-rwxr-xr-xscripts/queuedaemon.php2
-rwxr-xr-xscripts/xmppdaemon.php2
9 files changed, 347 insertions, 41 deletions
diff --git a/classes/Status_network.php b/classes/Status_network.php
index f1314d615..4bda24b6a 100644
--- a/classes/Status_network.php
+++ b/classes/Status_network.php
@@ -42,7 +42,16 @@ class Status_network extends DB_DataObject
public $tags; // text
/* Static get */
- function staticGet($k,$v=NULL) { return DB_DataObject::staticGet('Status_network',$k,$v); }
+ function staticGet($k,$v=NULL) {
+ $i = DB_DataObject::staticGet('Status_network',$k,$v);
+
+ // Don't use local process cache; if we're fetching multiple
+ // times it's because we're reloading it in a long-running
+ // process; we need a fresh copy!
+ global $_DB_DATAOBJECT;
+ unset($_DB_DATAOBJECT['CACHE']['status_network']);
+ return $i;
+ }
/* the code above is auto generated do not remove the tag below */
###END_AUTOCODE
diff --git a/lib/default.php b/lib/default.php
index 35115542f..d2dd8ab33 100644
--- a/lib/default.php
+++ b/lib/default.php
@@ -81,6 +81,7 @@ $default =
'subsystem' => 'db', # default to database, or 'stomp'
'stomp_server' => null,
'queue_basename' => '/queue/statusnet/',
+ 'control_channel' => '/topic/statusnet-control', // broadcasts to all queue daemons
'stomp_username' => null,
'stomp_password' => null,
'monitor' => null, // URL to monitor ping endpoint (work in progress)
diff --git a/lib/iomaster.php b/lib/iomaster.php
index 3bf82bc6b..bcab3542b 100644
--- a/lib/iomaster.php
+++ b/lib/iomaster.php
@@ -38,6 +38,9 @@ abstract class IoMaster
protected $pollTimeouts = array();
protected $lastPoll = array();
+ public $shutdown = false; // Did we do a graceful shutdown?
+ public $respawn = true; // Should we respawn after shutdown?
+
/**
* @param string $id process ID to use in logging/monitoring
*/
@@ -144,7 +147,7 @@ abstract class IoMaster
$this->logState('init');
$this->start();
- while (true) {
+ while (!$this->shutdown) {
$timeouts = array_values($this->pollTimeouts);
$timeouts[] = 60; // default max timeout
@@ -196,16 +199,7 @@ abstract class IoMaster
$this->logState('idle');
$this->idle();
- $memoryLimit = $this->softMemoryLimit();
- if ($memoryLimit > 0) {
- $usage = memory_get_usage();
- if ($usage > $memoryLimit) {
- common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
- break;
- } else if (common_config('queue', 'debug_memory')) {
- common_log(LOG_DEBUG, "Memory usage $usage");
- }
- }
+ $this->checkMemory();
}
$this->logState('shutdown');
@@ -213,6 +207,24 @@ abstract class IoMaster
}
/**
+ * Check runtime memory usage, possibly triggering a graceful shutdown
+ * and thread respawn if we've crossed the soft limit.
+ */
+ protected function checkMemory()
+ {
+ $memoryLimit = $this->softMemoryLimit();
+ if ($memoryLimit > 0) {
+ $usage = memory_get_usage();
+ if ($usage > $memoryLimit) {
+ common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
+ $this->requestRestart();
+ } else if (common_config('queue', 'debug_memory')) {
+ common_log(LOG_DEBUG, "Memory usage $usage");
+ }
+ }
+ }
+
+ /**
* Return fully-parsed soft memory limit in bytes.
* @return intval 0 or -1 if not set
*/
@@ -354,5 +366,24 @@ abstract class IoMaster
$owners[] = "thread:" . $this->id;
$this->monitor->stats($key, $owners);
}
+
+ /**
+ * For IoManagers to request a graceful shutdown at end of event loop.
+ */
+ public function requestShutdown()
+ {
+ $this->shutdown = true;
+ $this->respawn = false;
+ }
+
+ /**
+ * For IoManagers to request a graceful restart at end of event loop.
+ */
+ public function requestRestart()
+ {
+ $this->shutdown = true;
+ $this->respawn = true;
+ }
+
}
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index 1bc8de1f7..afe710e88 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -101,6 +101,23 @@ abstract class QueueManager extends IoManager
}
/**
+ * Optional; ping any running queue handler daemons with a notification
+ * such as announcing a new site to handle or requesting clean shutdown.
+ * This avoids having to restart all the daemons manually to update configs
+ * and such.
+ *
+ * Called from scripts/queuectl.php controller utility.
+ *
+ * @param string $event event key
+ * @param string $param optional parameter to append to key
+ * @return boolean success
+ */
+ public function sendControlSignal($event, $param='')
+ {
+ throw new Exception(get_class($this) . " does not support control signals.");
+ }
+
+ /**
* Store an object (usually/always a Notice) into the given queue
* for later processing. No guarantee is made on when it will be
* processed; it could be immediately or at some unspecified point
diff --git a/lib/spawningdaemon.php b/lib/spawningdaemon.php
index 8baefe88e..b1961d688 100644
--- a/lib/spawningdaemon.php
+++ b/lib/spawningdaemon.php
@@ -36,6 +36,11 @@ abstract class SpawningDaemon extends Daemon
{
protected $threads=1;
+ const EXIT_OK = 0;
+ const EXIT_ERR = 1;
+ const EXIT_SHUTDOWN = 100;
+ const EXIT_RESTART = 101;
+
function __construct($id=null, $daemonize=true, $threads=1)
{
parent::__construct($daemonize);
@@ -49,7 +54,7 @@ abstract class SpawningDaemon extends Daemon
/**
* Perform some actual work!
*
- * @return boolean true on success, false on failure
+ * @return int exit code; use self::EXIT_SHUTDOWN to request not to respawn.
*/
public abstract function runThread();
@@ -84,23 +89,30 @@ abstract class SpawningDaemon extends Daemon
while (count($children) > 0) {
$status = null;
$pid = pcntl_wait($status);
- if ($pid > 0) {
+ if ($pid > 0 && pcntl_wifexited($status)) {
+ $exitCode = pcntl_wexitstatus($status);
+
$i = array_search($pid, $children);
if ($i === false) {
- $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
+ $this->log(LOG_ERR, "Unrecognized child pid $pid exited with status $exitCode");
continue;
}
unset($children[$i]);
- $this->log(LOG_INFO, "Thread $i pid $pid exited.");
-
- $pid = pcntl_fork();
- if ($pid < 0) {
- $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
- } else if ($pid == 0) {
- $this->initAndRunChild($i);
+
+ if ($this->shouldRespawn($exitCode)) {
+ $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; respawing.");
+
+ $pid = pcntl_fork();
+ if ($pid < 0) {
+ $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
+ } else if ($pid == 0) {
+ $this->initAndRunChild($i);
+ } else {
+ $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
+ $children[$i] = $pid;
+ }
} else {
- $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
- $children[$i] = $pid;
+ $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
}
}
}
@@ -109,6 +121,24 @@ abstract class SpawningDaemon extends Daemon
}
/**
+ * Determine whether to respawn an exited subprocess based on its exit code.
+ * Otherwise we'll respawn all exits by default.
+ *
+ * @param int $exitCode
+ * @return boolean true to respawn
+ */
+ protected function shouldRespawn($exitCode)
+ {
+ if ($exitCode == self::EXIT_SHUTDOWN) {
+ // Thread requested a clean shutdown.
+ return false;
+ } else {
+ // Otherwise we should always respawn!
+ return true;
+ }
+ }
+
+ /**
* Initialize things for a fresh thread, call runThread(), and
* exit at completion with appropriate return value.
*/
@@ -116,8 +146,8 @@ abstract class SpawningDaemon extends Daemon
{
$this->set_id($this->get_id() . "." . $thread);
$this->resetDb();
- $ok = $this->runThread();
- exit($ok ? 0 : 1);
+ $exitCode = $this->runThread();
+ exit($exitCode);
}
/**
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 8f0091a13..89f3d74cc 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -38,8 +38,10 @@ class StompQueueManager extends QueueManager
var $password = null;
var $base = null;
var $con = null;
+ protected $control;
protected $sites = array();
+ protected $subscriptions = array();
protected $useTransactions = true;
protected $transaction = null;
@@ -52,6 +54,7 @@ class StompQueueManager extends QueueManager
$this->username = common_config('queue', 'stomp_username');
$this->password = common_config('queue', 'stomp_password');
$this->base = common_config('queue', 'queue_basename');
+ $this->control = common_config('queue', 'control_channel');
}
/**
@@ -77,6 +80,36 @@ class StompQueueManager extends QueueManager
$this->initialize();
}
+ /**
+ * Optional; ping any running queue handler daemons with a notification
+ * such as announcing a new site to handle or requesting clean shutdown.
+ * This avoids having to restart all the daemons manually to update configs
+ * and such.
+ *
+ * Currently only relevant for multi-site queue managers such as Stomp.
+ *
+ * @param string $event event key
+ * @param string $param optional parameter to append to key
+ * @return boolean success
+ */
+ public function sendControlSignal($event, $param='')
+ {
+ $message = $event;
+ if ($param != '') {
+ $message .= ':' . $param;
+ }
+ $this->_connect();
+ $result = $this->con->send($this->control,
+ $message,
+ array ('created' => common_sql_now()));
+ if ($result) {
+ $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
+ return true;
+ } else {
+ $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message");
+ return false;
+ }
+ }
/**
* Instantiate the appropriate QueueHandler class for the given queue.
@@ -86,7 +119,7 @@ class StompQueueManager extends QueueManager
*/
function getHandler($queue)
{
- $handlers = $this->handlers[common_config('site', 'server')];
+ $handlers = $this->handlers[$this->currentSite()];
if (isset($handlers[$queue])) {
$class = $handlers[$queue];
if (class_exists($class)) {
@@ -108,7 +141,7 @@ class StompQueueManager extends QueueManager
function getQueues()
{
$group = $this->activeGroup();
- $site = common_config('site', 'server');
+ $site = $this->currentSite();
if (empty($this->groups[$site][$group])) {
return array();
} else {
@@ -126,8 +159,8 @@ class StompQueueManager extends QueueManager
*/
public function connect($transport, $class, $group='queuedaemon')
{
- $this->handlers[common_config('site', 'server')][$transport] = $class;
- $this->groups[common_config('site', 'server')][$group][$transport] = $class;
+ $this->handlers[$this->currentSite()][$transport] = $class;
+ $this->groups[$this->currentSite()][$group][$transport] = $class;
}
/**
@@ -180,7 +213,16 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleItem($frame);
+ $dest = $frame->headers['destination'];
+ if ($dest == $this->control) {
+ if (!$this->handleControlSignal($frame)) {
+ // We got a control event that requests a shutdown;
+ // close out and stop handling anything else!
+ break;
+ }
+ } else {
+ $ok = $ok && $this->handleItem($frame);
+ }
}
return $ok;
}
@@ -197,6 +239,9 @@ class StompQueueManager extends QueueManager
public function start($master)
{
parent::start($master);
+ $this->_connect();
+
+ $this->con->subscribe($this->control);
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -221,6 +266,7 @@ class StompQueueManager extends QueueManager
// If there are any outstanding delivered messages we haven't processed,
// free them for another thread to take.
$this->rollback();
+ $this->con->unsubscribe($this->control);
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -231,7 +277,16 @@ class StompQueueManager extends QueueManager
}
return true;
}
-
+
+ /**
+ * Get identifier of the currently active site configuration
+ * @return string
+ */
+ protected function currentSite()
+ {
+ return common_config('site', 'server'); // @fixme switch to nickname
+ }
+
/**
* Lazy open connection to Stomp queue server.
*/
@@ -255,22 +310,29 @@ class StompQueueManager extends QueueManager
*/
protected function doSubscribe()
{
+ $site = $this->currentSite();
$this->_connect();
foreach ($this->getQueues() as $queue) {
$rawqueue = $this->queueName($queue);
+ $this->subscriptions[$site][$queue] = $rawqueue;
$this->_log(LOG_INFO, "Subscribing to $rawqueue");
$this->con->subscribe($rawqueue);
}
}
-
+
/**
* Subscribe from all enabled notice queues for the current site.
*/
protected function doUnsubscribe()
{
+ $site = $this->currentSite();
$this->_connect();
- foreach ($this->getQueues() as $queue) {
- $this->con->unsubscribe($this->queueName($queue));
+ if (!empty($this->subscriptions[$site])) {
+ foreach ($this->subscriptions[$site] as $queue => $rawqueue) {
+ $this->_log(LOG_INFO, "Unsubscribing from $rawqueue");
+ $this->con->unsubscribe($rawqueue);
+ unset($this->subscriptions[$site][$queue]);
+ }
}
}
@@ -286,10 +348,10 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleItem($frame)
+ protected function handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
- if ($site != common_config('site', 'server')) {
+ if ($site != $this->currentSite()) {
$this->stats('switch');
StatusNet::init($site);
}
@@ -317,7 +379,7 @@ class StompQueueManager extends QueueManager
$handler = $this->getHandler($queue);
if (!$handler) {
- $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
+ $this->_log(LOG_ERR, "Missing handler class; skipping $info");
$this->ack($frame);
$this->commit();
$this->begin();
@@ -349,6 +411,77 @@ class StompQueueManager extends QueueManager
}
/**
+ * Process a control signal broadcast.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function handleControlSignal($frame)
+ {
+ $message = trim($frame->body);
+ if (strpos($message, ':') !== false) {
+ list($event, $param) = explode(':', $message, 2);
+ } else {
+ $event = $message;
+ $param = '';
+ }
+
+ $shutdown = false;
+
+ if ($event == 'shutdown') {
+ $this->master->requestShutdown();
+ $shutdown = true;
+ } else if ($event == 'restart') {
+ $this->master->requestRestart();
+ $shutdown = true;
+ } else if ($event == 'update') {
+ $this->updateSiteConfig($param);
+ } else {
+ $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
+ }
+
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
+ return $shutdown;
+ }
+
+ /**
+ * Set us up with queue subscriptions for a new site added at runtime,
+ * triggered by a broadcast to the 'statusnet-control' topic.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function updateSiteConfig($nickname)
+ {
+ if (empty($this->sites)) {
+ if ($nickname == common_config('site', 'nickname')) {
+ StatusNet::init(common_config('site', 'server'));
+ $this->doUnsubscribe();
+ $this->doSubscribe();
+ } else {
+ $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname");
+ }
+ } else {
+ $sn = Status_network::staticGet($nickname);
+ if ($sn) {
+ $server = $sn->getServerName(); // @fixme do config-by-nick
+ StatusNet::init($server);
+ if (empty($this->sites[$server])) {
+ $this->addSite($server);
+ }
+ $this->_log(LOG_INFO, "(Re)subscribing to queues for site $nickname / $server");
+ $this->doUnsubscribe();
+ $this->doSubscribe();
+ $this->stats('siteupdate');
+ } else {
+ $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
+ }
+ }
+ }
+
+ /**
* Combines the queue_basename from configuration with the
* site server name and queue name to give eg:
*
@@ -360,7 +493,7 @@ class StompQueueManager extends QueueManager
protected function queueName($queue)
{
return common_config('queue', 'queue_basename') .
- common_config('site', 'server') . '/' . $queue;
+ $this->currentSite() . '/' . $queue;
}
/**
diff --git a/scripts/queuectl.php b/scripts/queuectl.php
new file mode 100755
index 000000000..1c9ea3353
--- /dev/null
+++ b/scripts/queuectl.php
@@ -0,0 +1,85 @@
+#!/usr/bin/env php
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Sends control signals to running queue daemons.
+ *
+ * @author Brion Vibber <brion@status.net>
+ * @package QueueHandler
+ */
+
+define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
+
+$shortoptions = 'ur';
+$longoptions = array('update', 'restart', 'stop');
+
+$helptext = <<<END_OF_QUEUECTL_HELP
+Send broadcast events to control any running queue handlers.
+(Currently for Stomp queues only.)
+
+Events relating to current site (as selected with -s etc)
+ -u --update Announce new site or updated configuration. Running
+ daemons will start subscribing to any new queues needed
+ for this site.
+
+Global events:
+ -r --restart Graceful restart of all threads
+ --stop Graceful shutdown of all threads
+
+END_OF_QUEUECTL_HELP;
+
+require_once INSTALLDIR.'/scripts/commandline.inc';
+
+function doSendControl($message, $event, $param='')
+{
+ print $message;
+ $qm = QueueManager::get();
+ if ($qm->sendControlSignal($event, $param)) {
+ print " sent.\n";
+ } else {
+ print " FAILED.\n";
+ }
+}
+
+$actions = 0;
+
+if (have_option('u') || have_option('--update')) {
+ $nickname = common_config('site', 'nickname');
+ doSendControl("Sending site update signal to queue daemons for $nickname",
+ "update", $nickname);
+ $actions++;
+}
+
+if (have_option('r') || have_option('--restart')) {
+ doSendControl("Sending graceful restart signal to queue daemons...",
+ "restart");
+ $actions++;
+}
+
+if (have_option('--stop')) {
+ doSendControl("Sending graceful shutdown signal to queue daemons...",
+ "shutdown");
+ $actions++;
+}
+
+if (!$actions) {
+ show_help();
+}
+
diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php
index bedd14b1a..c2e2351c3 100755
--- a/scripts/queuedaemon.php
+++ b/scripts/queuedaemon.php
@@ -115,7 +115,7 @@ class QueueDaemon extends SpawningDaemon
$this->log(LOG_INFO, 'terminating normally');
- return true;
+ return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
}
}
diff --git a/scripts/xmppdaemon.php b/scripts/xmppdaemon.php
index fd7cf055b..46dd9b90c 100755
--- a/scripts/xmppdaemon.php
+++ b/scripts/xmppdaemon.php
@@ -56,7 +56,7 @@ class XMPPDaemon extends SpawningDaemon
common_log(LOG_INFO, 'terminating normally');
- return true;
+ return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
}
}