summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
authorZach Copley <zach@status.net>2010-01-27 14:27:22 -0800
committerZach Copley <zach@status.net>2010-01-27 14:27:22 -0800
commit78079f34e273357d03ceee13269f9a388e66c4e3 (patch)
tree8f29f6fb7de5b6b6627a7eba3e16b86fa9b2ae1f /lib/stompqueuemanager.php
parent656d95418c6d7f8b884c4c8af14ad6952032ace6 (diff)
parent2494d3fa25a44b3cacf85c594683675ae9e6d0cb (diff)
Merge branch 'testing' into -1.9.x
* testing: (130 commits) HTTP auth provided is evaluated even if it's not required Rename rc3to09.sql to rc3torc4.sql to avoid confusion if we add a last-minute change after this! Add new oauth tables and modifications to 'consumer' table for rc4 Centred leaderboard ad camelcase the uap param names move leaderboard to after the header Moved rectangle ad into aside and leaderboard to the right in header. Aligning wide skyscraper to the right instead of left CSS ids and classes fixed in UAPPlugin wrong height for rectangle in BlankAd Add the moved BlankAdPlugin make BlankAd dir and change to use a 1x1 image move BlankAdPlugin to its own dir Add BlankAdPlugin to test ad layout in different themes make uapplugin an abstract class move UAP plugin to core Lowercased switch cases in UAP Plugin Plugin for Universal Ad Package. Outputs four most widely used ad types. Add persistent:true property to Stomp messages so ActiveMQ doesn't decide to discard them even though persistence is enabled on the broker. :) (Thanks Aric!) quick fix: use common_path() on realtime update JS so it works with the new JS path code (will pull from main server for now) ... Conflicts: actions/apioauthaccesstoken.php actions/apioauthauthorize.php actions/apioauthrequesttoken.php actions/editapplication.php actions/newapplication.php lib/apiauth.php lib/queuemanager.php lib/router.php
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php162
1 files changed, 148 insertions, 14 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 8f0091a13..19e8c49b5 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -38,8 +38,10 @@ class StompQueueManager extends QueueManager
var $password = null;
var $base = null;
var $con = null;
+ protected $control;
protected $sites = array();
+ protected $subscriptions = array();
protected $useTransactions = true;
protected $transaction = null;
@@ -52,6 +54,7 @@ class StompQueueManager extends QueueManager
$this->username = common_config('queue', 'stomp_username');
$this->password = common_config('queue', 'stomp_password');
$this->base = common_config('queue', 'queue_basename');
+ $this->control = common_config('queue', 'control_channel');
}
/**
@@ -77,6 +80,36 @@ class StompQueueManager extends QueueManager
$this->initialize();
}
+ /**
+ * Optional; ping any running queue handler daemons with a notification
+ * such as announcing a new site to handle or requesting clean shutdown.
+ * This avoids having to restart all the daemons manually to update configs
+ * and such.
+ *
+ * Currently only relevant for multi-site queue managers such as Stomp.
+ *
+ * @param string $event event key
+ * @param string $param optional parameter to append to key
+ * @return boolean success
+ */
+ public function sendControlSignal($event, $param='')
+ {
+ $message = $event;
+ if ($param != '') {
+ $message .= ':' . $param;
+ }
+ $this->_connect();
+ $result = $this->con->send($this->control,
+ $message,
+ array ('created' => common_sql_now()));
+ if ($result) {
+ $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
+ return true;
+ } else {
+ $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message");
+ return false;
+ }
+ }
/**
* Instantiate the appropriate QueueHandler class for the given queue.
@@ -86,7 +119,7 @@ class StompQueueManager extends QueueManager
*/
function getHandler($queue)
{
- $handlers = $this->handlers[common_config('site', 'server')];
+ $handlers = $this->handlers[$this->currentSite()];
if (isset($handlers[$queue])) {
$class = $handlers[$queue];
if (class_exists($class)) {
@@ -108,7 +141,7 @@ class StompQueueManager extends QueueManager
function getQueues()
{
$group = $this->activeGroup();
- $site = common_config('site', 'server');
+ $site = $this->currentSite();
if (empty($this->groups[$site][$group])) {
return array();
} else {
@@ -126,8 +159,8 @@ class StompQueueManager extends QueueManager
*/
public function connect($transport, $class, $group='queuedaemon')
{
- $this->handlers[common_config('site', 'server')][$transport] = $class;
- $this->groups[common_config('site', 'server')][$group][$transport] = $class;
+ $this->handlers[$this->currentSite()][$transport] = $class;
+ $this->groups[$this->currentSite()][$group][$transport] = $class;
}
/**
@@ -145,7 +178,8 @@ class StompQueueManager extends QueueManager
$result = $this->con->send($this->queueName($queue),
$msg, // BODY of the message
- array ('created' => common_sql_now()));
+ array ('created' => common_sql_now(),
+ 'persistent' => 'true'));
if (!$result) {
common_log(LOG_ERR, "Error sending $rep to $queue queue");
@@ -180,7 +214,16 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleItem($frame);
+ $dest = $frame->headers['destination'];
+ if ($dest == $this->control) {
+ if (!$this->handleControlSignal($frame)) {
+ // We got a control event that requests a shutdown;
+ // close out and stop handling anything else!
+ break;
+ }
+ } else {
+ $ok = $ok && $this->handleItem($frame);
+ }
}
return $ok;
}
@@ -197,6 +240,9 @@ class StompQueueManager extends QueueManager
public function start($master)
{
parent::start($master);
+ $this->_connect();
+
+ $this->con->subscribe($this->control);
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -221,6 +267,7 @@ class StompQueueManager extends QueueManager
// If there are any outstanding delivered messages we haven't processed,
// free them for another thread to take.
$this->rollback();
+ $this->con->unsubscribe($this->control);
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -231,7 +278,16 @@ class StompQueueManager extends QueueManager
}
return true;
}
-
+
+ /**
+ * Get identifier of the currently active site configuration
+ * @return string
+ */
+ protected function currentSite()
+ {
+ return common_config('site', 'server'); // @fixme switch to nickname
+ }
+
/**
* Lazy open connection to Stomp queue server.
*/
@@ -255,22 +311,29 @@ class StompQueueManager extends QueueManager
*/
protected function doSubscribe()
{
+ $site = $this->currentSite();
$this->_connect();
foreach ($this->getQueues() as $queue) {
$rawqueue = $this->queueName($queue);
+ $this->subscriptions[$site][$queue] = $rawqueue;
$this->_log(LOG_INFO, "Subscribing to $rawqueue");
$this->con->subscribe($rawqueue);
}
}
-
+
/**
* Subscribe from all enabled notice queues for the current site.
*/
protected function doUnsubscribe()
{
+ $site = $this->currentSite();
$this->_connect();
- foreach ($this->getQueues() as $queue) {
- $this->con->unsubscribe($this->queueName($queue));
+ if (!empty($this->subscriptions[$site])) {
+ foreach ($this->subscriptions[$site] as $queue => $rawqueue) {
+ $this->_log(LOG_INFO, "Unsubscribing from $rawqueue");
+ $this->con->unsubscribe($rawqueue);
+ unset($this->subscriptions[$site][$queue]);
+ }
}
}
@@ -286,10 +349,10 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleItem($frame)
+ protected function handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
- if ($site != common_config('site', 'server')) {
+ if ($site != $this->currentSite()) {
$this->stats('switch');
StatusNet::init($site);
}
@@ -317,7 +380,7 @@ class StompQueueManager extends QueueManager
$handler = $this->getHandler($queue);
if (!$handler) {
- $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
+ $this->_log(LOG_ERR, "Missing handler class; skipping $info");
$this->ack($frame);
$this->commit();
$this->begin();
@@ -349,6 +412,77 @@ class StompQueueManager extends QueueManager
}
/**
+ * Process a control signal broadcast.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function handleControlSignal($frame)
+ {
+ $message = trim($frame->body);
+ if (strpos($message, ':') !== false) {
+ list($event, $param) = explode(':', $message, 2);
+ } else {
+ $event = $message;
+ $param = '';
+ }
+
+ $shutdown = false;
+
+ if ($event == 'shutdown') {
+ $this->master->requestShutdown();
+ $shutdown = true;
+ } else if ($event == 'restart') {
+ $this->master->requestRestart();
+ $shutdown = true;
+ } else if ($event == 'update') {
+ $this->updateSiteConfig($param);
+ } else {
+ $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
+ }
+
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
+ return $shutdown;
+ }
+
+ /**
+ * Set us up with queue subscriptions for a new site added at runtime,
+ * triggered by a broadcast to the 'statusnet-control' topic.
+ *
+ * @param array $frame Stomp frame
+ * @return bool true to continue; false to stop further processing.
+ */
+ protected function updateSiteConfig($nickname)
+ {
+ if (empty($this->sites)) {
+ if ($nickname == common_config('site', 'nickname')) {
+ StatusNet::init(common_config('site', 'server'));
+ $this->doUnsubscribe();
+ $this->doSubscribe();
+ } else {
+ $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname");
+ }
+ } else {
+ $sn = Status_network::staticGet($nickname);
+ if ($sn) {
+ $server = $sn->getServerName(); // @fixme do config-by-nick
+ StatusNet::init($server);
+ if (empty($this->sites[$server])) {
+ $this->addSite($server);
+ }
+ $this->_log(LOG_INFO, "(Re)subscribing to queues for site $nickname / $server");
+ $this->doUnsubscribe();
+ $this->doSubscribe();
+ $this->stats('siteupdate');
+ } else {
+ $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
+ }
+ }
+ }
+
+ /**
* Combines the queue_basename from configuration with the
* site server name and queue name to give eg:
*
@@ -360,7 +494,7 @@ class StompQueueManager extends QueueManager
protected function queueName($queue)
{
return common_config('queue', 'queue_basename') .
- common_config('site', 'server') . '/' . $queue;
+ $this->currentSite() . '/' . $queue;
}
/**