summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php162
1 files changed, 148 insertions, 14 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 8f0091a13..19e8c49b5 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -38,8 +38,10 @@ class StompQueueManager extends QueueManager
var $password = null;
var $base = null;
var $con = null;
+ protected $control;
protected $sites = array();
+ protected $subscriptions = array();
protected $useTransactions = true;
protected $transaction = null;
@@ -52,6 +54,7 @@ class StompQueueManager extends QueueManager
$this->username = common_config('queue', 'stomp_username');
$this->password = common_config('queue', 'stomp_password');
$this->base = common_config('queue', 'queue_basename');
+ $this->control = common_config('queue', 'control_channel');
}
/**
@@ -77,6 +80,36 @@ class StompQueueManager extends QueueManager
$this->initialize();
}
+ /**
+ * Optional; ping any running queue handler daemons with a notification
+ * such as announcing a new site to handle or requesting clean shutdown.
+ * This avoids having to restart all the daemons manually to update configs
+ * and such.
+ *
+ * Currently only relevant for multi-site queue managers such as Stomp.
+ *
+ * @param string $event event key
+ * @param string $param optional parameter to append to key
+ * @return boolean success
+ */
+ public function sendControlSignal($event, $param='')
+ {
+ $message = $event;
+ if ($param != '') {
+ $message .= ':' . $param;
+ }
+ $this->_connect();
+ $result = $this->con->send($this->control,
+ $message,
+ array ('created' => common_sql_now()));
+ if ($result) {
+ $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
+ return true;
+ } else {
+ $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message");
+ return false;
+ }
+ }
/**
* Instantiate the appropriate QueueHandler class for the given queue.
@@ -86,7 +119,7 @@ class StompQueueManager extends QueueManager
*/
function getHandler($queue)
{
- $handlers = $this->handlers[common_config('site', 'server')];
+ $handlers = $this->handlers[$this->currentSite()];
if (isset($handlers[$queue])) {
$class = $handlers[$queue];
if (class_exists($class)) {
@@ -108,7 +141,7 @@ class StompQueueManager extends QueueManager
function getQueues()
{
$group = $this->activeGroup();
- $site = common_config('site', 'server');
+ $site = $this->currentSite();
if (empty($this->groups[$site][$group])) {
return array();
} else {
@@ -126,8 +159,8 @@ class StompQueueManager extends QueueManager
*/
public function connect($transport, $class, $group='queuedaemon')
{
- $this->handlers[common_config('site', 'server')][$transport] = $class;
- $this->groups[common_config('site', 'server')][$group][$transport] = $class;
+ $this->handlers[$this->currentSite()][$transport] = $class;
+ $this->groups[$this->currentSite()][$group][$transport] = $class;
}
/**
@@ -145,7 +178,8 @@ class StompQueueManager extends QueueManager
$result = $this->con->send($this->queueName($queue),
$msg, // BODY of the message
- array ('created' => common_sql_now()));
+ array ('created' => common_sql_now(),
+ 'persistent' => 'true'));
if (!$result) {
common_log(LOG_ERR, "Error sending $rep to $queue queue");
@@ -180,7 +214,16 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleItem($frame);
+ $dest = $frame->headers['destination'];
+ if ($dest == $this->control) {
+ if (!$this->handleControlSignal($frame)) {
+ // We got a control event that requests a shutdown;
+ // close out and stop handling anything else!
+ break;
+ }
+ } else {
+ $ok = $ok && $this->handleItem($frame);
+ }
}
return $ok;
}
@@ -197,6 +240,9 @@ class StompQueueManager extends QueueManager
public function start($master)
{
parent::start($master);
+ $this->_connect();
+
+ $this->con->subscribe($this->control);
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -221,6 +267,7 @@ class StompQueueManager extends QueueManager
// If there are any outstanding delivered messages we haven't processed,
// free them for another thread to take.
$this->rollback();
+ $this->con->unsubscribe($this->control);
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -231,7 +278,16 @@ class StompQueueManager extends QueueManager
}
return true;
}
-
+
+ /**
+ * Get identifier of the currently active site configuration
+ * @return string
+ */
+ protected function currentSite()
+ {
+ return common_config('site', 'server'); // @fixme switch to nickname
+ }
+
/**
* Lazy open connection to Stomp queue server.
*/
@@ -255,22 +311,29 @@ class StompQueueManager extends QueueManager
*/
protected function doSubscribe()
{
+ $site = $this->currentSite();
$this->_connect();
foreach ($this->getQueues() as $queue) {
$rawqueue = $this->queueName($queue);
+ $this->subscriptions[$site][$queue] = $rawqueue;
$this->_log(LOG_INFO, "Subscribing to $rawqueue");
$this->con->subscribe($rawqueue);
}
}
-
+
/**
* Subscribe from all enabled notice queues for the current site.
*/
protected function doUnsubscribe()
{
+ $site = $this->currentSite();
$this->_connect();
- foreach ($this->getQueues() as $queue) {
- $this->con->unsubscribe($this->queueName($queue));
+ if (!empty($this->subscriptions[$site])) {
+ foreach ($this->subscriptions[$site] as $queue => $rawqueue) {
+ $this->_log(LOG_INFO, "Unsubscribing from $rawqueue");
+ $this->con->unsubscribe($rawqueue);
+ unset($this->subscriptions[$site][$queue]);
+ }
}
}
@@ -286,10 +349,10 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleItem($frame)
+ protected function handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
- if ($site != common_config('site', 'server')) {
+ if ($site != $this->currentSite()) {
$this->stats('switch');
StatusNet::init($site);
}
@@ -317,7 +380,7 @@ class StompQueueManager extends QueueManager
$handler = $this->getHandler($queue);
if (!$handler) {
- $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
+ $this->_log(LOG_ERR, "Missing handler class; skipping $info");
$this->ack($frame);
$this->commit();
$this->begin();
@@ -349,6 +412,77 @@ class StompQueueManager extends QueueManager
}
/**
+ * Process a control signal broadcast.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function handleControlSignal($frame)
+ {
+ $message = trim($frame->body);
+ if (strpos($message, ':') !== false) {
+ list($event, $param) = explode(':', $message, 2);
+ } else {
+ $event = $message;
+ $param = '';
+ }
+
+ $shutdown = false;
+
+ if ($event == 'shutdown') {
+ $this->master->requestShutdown();
+ $shutdown = true;
+ } else if ($event == 'restart') {
+ $this->master->requestRestart();
+ $shutdown = true;
+ } else if ($event == 'update') {
+ $this->updateSiteConfig($param);
+ } else {
+ $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
+ }
+
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
+ return $shutdown;
+ }
+
+ /**
+ * Set us up with queue subscriptions for a new site added at runtime,
+ * triggered by a broadcast to the 'statusnet-control' topic.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function updateSiteConfig($nickname)
+ {
+ if (empty($this->sites)) {
+ if ($nickname == common_config('site', 'nickname')) {
+ StatusNet::init(common_config('site', 'server'));
+ $this->doUnsubscribe();
+ $this->doSubscribe();
+ } else {
+ $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname");
+ }
+ } else {
+ $sn = Status_network::staticGet($nickname);
+ if ($sn) {
+ $server = $sn->getServerName(); // @fixme do config-by-nick
+ StatusNet::init($server);
+ if (empty($this->sites[$server])) {
+ $this->addSite($server);
+ }
+ $this->_log(LOG_INFO, "(Re)subscribing to queues for site $nickname / $server");
+ $this->doUnsubscribe();
+ $this->doSubscribe();
+ $this->stats('siteupdate');
+ } else {
+ $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
+ }
+ }
+ }
+
+ /**
* Combines the queue_basename from configuration with the
* site server name and queue name to give eg:
*
@@ -360,7 +494,7 @@ class StompQueueManager extends QueueManager
protected function queueName($queue)
{
return common_config('queue', 'queue_basename') .
- common_config('site', 'server') . '/' . $queue;
+ $this->currentSite() . '/' . $queue;
}
/**