diff options
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r-- | lib/stompqueuemanager.php | 122 |
1 files changed, 96 insertions, 26 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 00590fdb6..8f0091a13 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -39,9 +39,12 @@ class StompQueueManager extends QueueManager var $base = null; var $con = null; - protected $master = null; protected $sites = array(); + protected $useTransactions = true; + protected $transaction = null; + protected $transactionCount = 0; + function __construct() { parent::__construct(); @@ -104,11 +107,12 @@ class StompQueueManager extends QueueManager */ function getQueues() { + $group = $this->activeGroup(); $site = common_config('site', 'server'); - if (empty($this->handlers[$site])) { + if (empty($this->groups[$site][$group])) { return array(); } else { - return array_keys($this->handlers[$site]); + return array_keys($this->groups[$site][$group]); } } @@ -118,10 +122,12 @@ class StompQueueManager extends QueueManager * * @param string $transport * @param string $class + * @param string $group */ - public function connect($transport, $class) + public function connect($transport, $class, $group='queuedaemon') { $this->handlers[common_config('site', 'server')][$transport] = $class; + $this->groups[common_config('site', 'server')][$group][$transport] = $class; } /** @@ -130,23 +136,23 @@ class StompQueueManager extends QueueManager */ public function enqueue($object, $queue) { - $notice = $object; + $msg = $this->encode($object); + $rep = $this->logrep($object); $this->_connect(); // XXX: serialize and send entire notice $result = $this->con->send($this->queueName($queue), - $notice->id, // BODY of the message - array ('created' => $notice->created)); + $msg, // BODY of the message + array ('created' => common_sql_now())); if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); + common_log(LOG_ERR, "Error sending $rep to $queue queue"); return false; } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' - . $notice->id . ' for ' . $queue); + common_log(LOG_DEBUG, "complete remote queueing $rep for $queue"); $this->stats('enqueued', $queue); } @@ -174,7 +180,7 @@ class StompQueueManager extends QueueManager $ok = true; $frames = $this->con->readFrames(); foreach ($frames as $frame) { - $ok = $ok && $this->_handleNotice($frame); + $ok = $ok && $this->_handleItem($frame); } return $ok; } @@ -199,6 +205,7 @@ class StompQueueManager extends QueueManager } else { $this->doSubscribe(); } + $this->begin(); return true; } @@ -211,6 +218,9 @@ class StompQueueManager extends QueueManager */ public function finish() { + // If there are any outstanding delivered messages we haven't processed, + // free them for another thread to take. + $this->rollback(); if ($this->sites) { foreach ($this->sites as $server) { StatusNet::init($server); @@ -265,7 +275,7 @@ class StompQueueManager extends QueueManager } /** - * Handle and acknowledge a notice event that's come in through a queue. + * Handle and acknowledge an event that's come in through a queue. * * If the queue handler reports failure, the message is requeued for later. * Missing notices or handler classes will drop the message. @@ -276,7 +286,7 @@ class StompQueueManager extends QueueManager * @param StompFrame $frame * @return bool */ - protected function _handleNotice($frame) + protected function _handleItem($frame) { list($site, $queue) = $this->parseDestination($frame->headers['destination']); if ($site != common_config('site', 'server')) { @@ -284,40 +294,56 @@ class StompQueueManager extends QueueManager StatusNet::init($site); } - $id = intval($frame->body); - $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; + if (is_numeric($frame->body)) { + $id = intval($frame->body); + $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; + + $notice = Notice::staticGet('id', $id); + if (empty($notice)) { + $this->_log(LOG_WARNING, "Skipping missing $info"); + $this->ack($frame); + $this->commit(); + $this->begin(); + $this->stats('badnotice', $queue); + return false; + } - $notice = Notice::staticGet('id', $id); - if (empty($notice)) { - $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->con->ack($frame); - $this->stats('badnotice', $queue); - return false; + $item = $notice; + } else { + // @fixme should we serialize, or json, or what here? + $info = "string posted at {$frame->headers['created']} in queue $queue"; + $item = $frame->body; } $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERROR, "Missing handler class; skipping $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('badhandler', $queue); return false; } - $ok = $handler->handle_notice($notice); + $ok = $handler->handle($item); if (!$ok) { $this->_log(LOG_WARNING, "Failed handling $info"); // FIXME we probably shouldn't have to do // this kind of queue management ourselves; // if we don't ack, it should resend... - $this->con->ack($frame); - $this->enqueue($notice, $queue); + $this->ack($frame); + $this->enqueue($item, $queue); + $this->commit(); + $this->begin(); $this->stats('requeued', $queue); return false; } $this->_log(LOG_INFO, "Successfully handled $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('handled', $queue); return true; } @@ -359,5 +385,49 @@ class StompQueueManager extends QueueManager { common_log($level, 'StompQueueManager: '.$msg); } + + protected function begin() + { + if ($this->useTransactions) { + if ($this->transaction) { + throw new Exception("Tried to start transaction in the middle of a transaction"); + } + $this->transactionCount++; + $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time(); + $this->con->begin($this->transaction); + } + } + + protected function ack($frame) + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to ack but not in a transaction"); + } + } + $this->con->ack($frame, $this->transaction); + } + + protected function commit() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to commit but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } + + protected function rollback() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to rollback but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } } |