summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php122
1 files changed, 96 insertions, 26 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 00590fdb6..8f0091a13 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -39,9 +39,12 @@ class StompQueueManager extends QueueManager
var $base = null;
var $con = null;
- protected $master = null;
protected $sites = array();
+ protected $useTransactions = true;
+ protected $transaction = null;
+ protected $transactionCount = 0;
+
function __construct()
{
parent::__construct();
@@ -104,11 +107,12 @@ class StompQueueManager extends QueueManager
*/
function getQueues()
{
+ $group = $this->activeGroup();
$site = common_config('site', 'server');
- if (empty($this->handlers[$site])) {
+ if (empty($this->groups[$site][$group])) {
return array();
} else {
- return array_keys($this->handlers[$site]);
+ return array_keys($this->groups[$site][$group]);
}
}
@@ -118,10 +122,12 @@ class StompQueueManager extends QueueManager
*
* @param string $transport
* @param string $class
+ * @param string $group
*/
- public function connect($transport, $class)
+ public function connect($transport, $class, $group='queuedaemon')
{
$this->handlers[common_config('site', 'server')][$transport] = $class;
+ $this->groups[common_config('site', 'server')][$group][$transport] = $class;
}
/**
@@ -130,23 +136,23 @@ class StompQueueManager extends QueueManager
*/
public function enqueue($object, $queue)
{
- $notice = $object;
+ $msg = $this->encode($object);
+ $rep = $this->logrep($object);
$this->_connect();
// XXX: serialize and send entire notice
$result = $this->con->send($this->queueName($queue),
- $notice->id, // BODY of the message
- array ('created' => $notice->created));
+ $msg, // BODY of the message
+ array ('created' => common_sql_now()));
if (!$result) {
- common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
+ common_log(LOG_ERR, "Error sending $rep to $queue queue");
return false;
}
- common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
- . $notice->id . ' for ' . $queue);
+ common_log(LOG_DEBUG, "complete remote queueing $rep for $queue");
$this->stats('enqueued', $queue);
}
@@ -174,7 +180,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleNotice($frame);
+ $ok = $ok && $this->_handleItem($frame);
}
return $ok;
}
@@ -199,6 +205,7 @@ class StompQueueManager extends QueueManager
} else {
$this->doSubscribe();
}
+ $this->begin();
return true;
}
@@ -211,6 +218,9 @@ class StompQueueManager extends QueueManager
*/
public function finish()
{
+ // If there are any outstanding delivered messages we haven't processed,
+ // free them for another thread to take.
+ $this->rollback();
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -265,7 +275,7 @@ class StompQueueManager extends QueueManager
}
/**
- * Handle and acknowledge a notice event that's come in through a queue.
+ * Handle and acknowledge an event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
* Missing notices or handler classes will drop the message.
@@ -276,7 +286,7 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleNotice($frame)
+ protected function _handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
@@ -284,40 +294,56 @@ class StompQueueManager extends QueueManager
StatusNet::init($site);
}
- $id = intval($frame->body);
- $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+ if (is_numeric($frame->body)) {
+ $id = intval($frame->body);
+ $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+
+ $notice = Notice::staticGet('id', $id);
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
+ $this->stats('badnotice', $queue);
+ return false;
+ }
- $notice = Notice::staticGet('id', $id);
- if (empty($notice)) {
- $this->_log(LOG_WARNING, "Skipping missing $info");
- $this->con->ack($frame);
- $this->stats('badnotice', $queue);
- return false;
+ $item = $notice;
+ } else {
+ // @fixme should we serialize, or json, or what here?
+ $info = "string posted at {$frame->headers['created']} in queue $queue";
+ $item = $frame->body;
}
$handler = $this->getHandler($queue);
if (!$handler) {
$this->_log(LOG_ERROR, "Missing handler class; skipping $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('badhandler', $queue);
return false;
}
- $ok = $handler->handle_notice($notice);
+ $ok = $handler->handle($item);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
// FIXME we probably shouldn't have to do
// this kind of queue management ourselves;
// if we don't ack, it should resend...
- $this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ $this->ack($frame);
+ $this->enqueue($item, $queue);
+ $this->commit();
+ $this->begin();
$this->stats('requeued', $queue);
return false;
}
$this->_log(LOG_INFO, "Successfully handled $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('handled', $queue);
return true;
}
@@ -359,5 +385,49 @@ class StompQueueManager extends QueueManager
{
common_log($level, 'StompQueueManager: '.$msg);
}
+
+ protected function begin()
+ {
+ if ($this->useTransactions) {
+ if ($this->transaction) {
+ throw new Exception("Tried to start transaction in the middle of a transaction");
+ }
+ $this->transactionCount++;
+ $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time();
+ $this->con->begin($this->transaction);
+ }
+ }
+
+ protected function ack($frame)
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to ack but not in a transaction");
+ }
+ }
+ $this->con->ack($frame, $this->transaction);
+ }
+
+ protected function commit()
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to commit but not in a transaction");
+ }
+ $this->con->commit($this->transaction);
+ $this->transaction = null;
+ }
+ }
+
+ protected function rollback()
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to rollback but not in a transaction");
+ }
+ $this->con->commit($this->transaction);
+ $this->transaction = null;
+ }
+ }
}