summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php286
1 files changed, 214 insertions, 72 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index f059b42f0..3090e0bfb 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -30,46 +30,53 @@
require_once 'Stomp.php';
-class LiberalStomp extends Stomp
-{
- function getSocket()
- {
- return $this->_socket;
- }
-}
-class StompQueueManager
+class StompQueueManager extends QueueManager
{
var $server = null;
var $username = null;
var $password = null;
var $base = null;
var $con = null;
+
+ protected $master = null;
+ protected $sites = array();
function __construct()
{
+ parent::__construct();
$this->server = common_config('queue', 'stomp_server');
$this->username = common_config('queue', 'stomp_username');
$this->password = common_config('queue', 'stomp_password');
$this->base = common_config('queue', 'queue_basename');
}
- function _connect()
+ /**
+ * Tell the i/o master we only need a single instance to cover
+ * all sites running in this process.
+ */
+ public static function multiSite()
{
- if (empty($this->con)) {
- $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
- $this->con = new LiberalStomp($this->server);
+ return IoManager::INSTANCE_PER_PROCESS;
+ }
- if ($this->con->connect($this->username, $this->password)) {
- $this->_log(LOG_INFO, "Connected.");
- } else {
- $this->_log(LOG_ERR, 'Failed to connect to queue server');
- throw new ServerException('Failed to connect to queue server');
- }
- }
+ /**
+ * Record each site we'll be handling input for in this process,
+ * so we can listen to the necessary queues for it.
+ *
+ * @fixme possibly actually do subscription here to save another
+ * loop over all sites later?
+ */
+ public function addSite($server)
+ {
+ $this->sites[] = $server;
}
- function enqueue($object, $queue)
+ /**
+ * Saves a notice object reference into the queue item table.
+ * @return boolean true on success
+ */
+ public function enqueue($object, $queue)
{
$notice = $object;
@@ -77,7 +84,7 @@ class StompQueueManager
// XXX: serialize and send entire notice
- $result = $this->con->send($this->_queueName($queue),
+ $result = $this->con->send($this->queueName($queue),
$notice->id, // BODY of the message
array ('created' => $notice->created));
@@ -88,78 +95,212 @@ class StompQueueManager
common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
. $notice->id . ' for ' . $queue);
+ $this->stats('enqueued', $queue);
}
- function service($queue, $handler)
+ /**
+ * Send any sockets we're listening on to the IO manager
+ * to wait for input.
+ *
+ * @return array of resources
+ */
+ public function getSockets()
{
- $result = null;
-
- $this->_connect();
+ return array($this->con->getSocket());
+ }
- $this->con->setReadTimeout($handler->timeout());
+ /**
+ * We've got input to handle on our socket!
+ * Read any waiting Stomp frame(s) and process them.
+ *
+ * @param resource $socket
+ * @return boolean ok on success
+ */
+ public function handleInput($socket)
+ {
+ assert($socket === $this->con->getSocket());
+ $ok = true;
+ $frames = $this->con->readFrames();
+ foreach ($frames as $frame) {
+ $ok = $ok && $this->_handleNotice($frame);
+ }
+ return $ok;
+ }
- $this->con->subscribe($this->_queueName($queue));
+ /**
+ * Initialize our connection and subscribe to all the queues
+ * we're going to need to handle...
+ *
+ * Side effects: in multi-site mode, may reset site configuration.
+ *
+ * @param IoMaster $master process/event controller
+ * @return bool return false on failure
+ */
+ public function start($master)
+ {
+ parent::start($master);
+ if ($this->sites) {
+ foreach ($this->sites as $server) {
+ StatusNet::init($server);
+ $this->doSubscribe();
+ }
+ } else {
+ $this->doSubscribe();
+ }
+ return true;
+ }
+
+ /**
+ * Subscribe to all the queues we're going to need to handle...
+ *
+ * Side effects: in multi-site mode, may reset site configuration.
+ *
+ * @return bool return false on failure
+ */
+ public function finish()
+ {
+ if ($this->sites) {
+ foreach ($this->sites as $server) {
+ StatusNet::init($server);
+ $this->doUnsubscribe();
+ }
+ } else {
+ $this->doUnsubscribe();
+ }
+ return true;
+ }
+
+ /**
+ * Lazy open connection to Stomp queue server.
+ */
+ protected function _connect()
+ {
+ if (empty($this->con)) {
+ $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
+ $this->con = new LiberalStomp($this->server);
- while (true) {
+ if ($this->con->connect($this->username, $this->password)) {
+ $this->_log(LOG_INFO, "Connected.");
+ } else {
+ $this->_log(LOG_ERR, 'Failed to connect to queue server');
+ throw new ServerException('Failed to connect to queue server');
+ }
+ }
+ }
- // Wait for something on one of our sockets
+ /**
+ * Subscribe to all enabled notice queues for the current site.
+ */
+ protected function doSubscribe()
+ {
+ $this->_connect();
+ foreach ($this->getQueues() as $queue) {
+ $rawqueue = $this->queueName($queue);
+ $this->_log(LOG_INFO, "Subscribing to $rawqueue");
+ $this->con->subscribe($rawqueue);
+ }
+ }
+
+ /**
+ * Subscribe from all enabled notice queues for the current site.
+ */
+ protected function doUnsubscribe()
+ {
+ $this->_connect();
+ foreach ($this->getQueues() as $queue) {
+ $this->con->unsubscribe($this->queueName($queue));
+ }
+ }
- $stompsock = $this->con->getSocket();
+ /**
+ * Handle and acknowledge a notice event that's come in through a queue.
+ *
+ * If the queue handler reports failure, the message is requeued for later.
+ * Missing notices or handler classes will drop the message.
+ *
+ * Side effects: in multi-site mode, may reset site configuration to
+ * match the site that queued the event.
+ *
+ * @param StompFrame $frame
+ * @return bool
+ */
+ protected function _handleNotice($frame)
+ {
+ list($site, $queue) = $this->parseDestination($frame->headers['destination']);
+ if ($site != common_config('site', 'server')) {
+ $this->stats('switch');
+ StatusNet::init($site);
+ }
- $handsocks = $handler->getSockets();
+ $id = intval($frame->body);
+ $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
- $socks = array_merge(array($stompsock), $handsocks);
+ $notice = Notice::staticGet('id', $id);
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->con->ack($frame);
+ $this->stats('badnotice', $queue);
+ return false;
+ }
- $read = $socks;
- $write = array();
- $except = array();
+ $handler = $this->getHandler($queue);
+ if (!$handler) {
+ $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
+ $this->con->ack($frame);
+ $this->stats('badhandler', $queue);
+ return false;
+ }
- $ready = stream_select($read, $write, $except, $handler->timeout(), 0);
+ $ok = $handler->handle_notice($notice);
- if ($ready === false) {
- $this->_log(LOG_ERR, "Error selecting on sockets");
- } else if ($ready > 0) {
- if (in_array($stompsock, $read)) {
- $this->_handleNotice($queue, $handler);
- }
- $handler->idle(QUEUE_HANDLER_HIT_IDLE);
- }
+ if (!$ok) {
+ $this->_log(LOG_WARNING, "Failed handling $info");
+ // FIXME we probably shouldn't have to do
+ // this kind of queue management ourselves;
+ // if we don't ack, it should resend...
+ $this->con->ack($frame);
+ $this->enqueue($notice, $queue);
+ $this->stats('requeued', $queue);
+ return false;
}
- $this->con->unsubscribe($this->_queueName($queue));
+ $this->_log(LOG_INFO, "Successfully handled $info");
+ $this->con->ack($frame);
+ $this->stats('handled', $queue);
+ return true;
}
- function _handleNotice($queue, $handler)
+ /**
+ * Combines the queue_basename from configuration with the
+ * site server name and queue name to give eg:
+ *
+ * /queue/statusnet/identi.ca/sms
+ *
+ * @param string $queue
+ * @return string
+ */
+ protected function queueName($queue)
{
- $frame = $this->con->readFrame();
-
- if (!empty($frame)) {
- $notice = Notice::staticGet('id', $frame->body);
-
- if (empty($notice)) {
- $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
- $this->con->ack($frame);
- } else {
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
- $this->con->ack($frame);
- } else {
- $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
- // FIXME we probably shouldn't have to do
- // this kind of queue management ourselves
- $this->con->ack($frame);
- $this->enqueue($notice, $queue);
- }
- unset($notice);
- }
-
- unset($frame);
- }
+ return common_config('queue', 'queue_basename') .
+ common_config('site', 'server') . '/' . $queue;
}
- function _queueName($queue)
+ /**
+ * Returns the site and queue name from the server-side queue.
+ *
+ * @param string queue destination (eg '/queue/statusnet/identi.ca/sms')
+ * @return array of site and queue: ('identi.ca','sms') or false if unrecognized
+ */
+ protected function parseDestination($dest)
{
- return common_config('queue', 'queue_basename') . $queue;
+ $prefix = common_config('queue', 'queue_basename');
+ if (substr($dest, 0, strlen($prefix)) == $prefix) {
+ $rest = substr($dest, strlen($prefix));
+ return explode("/", $rest, 2);
+ } else {
+ common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest");
+ return array(false, false);
+ }
}
function _log($level, $msg)
@@ -167,3 +308,4 @@ class StompQueueManager
common_log($level, 'StompQueueManager: '.$msg);
}
}
+