summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-01-22 12:35:05 -0800
committerBrion Vibber <brion@pobox.com>2010-01-22 12:52:56 -0800
commit6e4cad71e546505b4bb4a168c1b11b84a45b7889 (patch)
tree933939d1750c1085ae19ac729fc85837f21c7816 /lib/stompqueuemanager.php
parentc7507e7e9dafa6d6e054978e720e4fce3abc9929 (diff)
Fix for stuck queue messages: wrap processing in stomp transactions so our lack of an ACK if PHP dies actually triggers redelivery.
Previously, messages once delivered would just get stuck in the queue seemingly forever if they never got ACKed. Note this could lead to partial duplication, for instance if the OMB or Twitter queue handlers die after 1/2 of the outgoing sends. Recommendations: * catch exceptions more aggressively within queue handlers (so only PHP fatal errors are likely to kill in the middle) * for processing that involves sending to multiple clients, consider a second queue similar to the XMPP output, eg for OMB: - first queue gets delivery list and builds message data, enqueueing it for each target address - second queue can handle each individual outgoing message (and attempt redelivery etc separately) This would also protect better against a recurring error preventing delivery in the second part, and could spread out any slow sends over multiple threads.
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php68
1 files changed, 64 insertions, 4 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index f057bd9e4..8f0091a13 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -41,6 +41,10 @@ class StompQueueManager extends QueueManager
protected $sites = array();
+ protected $useTransactions = true;
+ protected $transaction = null;
+ protected $transactionCount = 0;
+
function __construct()
{
parent::__construct();
@@ -201,6 +205,7 @@ class StompQueueManager extends QueueManager
} else {
$this->doSubscribe();
}
+ $this->begin();
return true;
}
@@ -213,6 +218,9 @@ class StompQueueManager extends QueueManager
*/
public function finish()
{
+ // If there are any outstanding delivered messages we haven't processed,
+ // free them for another thread to take.
+ $this->rollback();
if ($this->sites) {
foreach ($this->sites as $server) {
StatusNet::init($server);
@@ -293,7 +301,9 @@ class StompQueueManager extends QueueManager
$notice = Notice::staticGet('id', $id);
if (empty($notice)) {
$this->_log(LOG_WARNING, "Skipping missing $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('badnotice', $queue);
return false;
}
@@ -308,7 +318,9 @@ class StompQueueManager extends QueueManager
$handler = $this->getHandler($queue);
if (!$handler) {
$this->_log(LOG_ERROR, "Missing handler class; skipping $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('badhandler', $queue);
return false;
}
@@ -320,14 +332,18 @@ class StompQueueManager extends QueueManager
// FIXME we probably shouldn't have to do
// this kind of queue management ourselves;
// if we don't ack, it should resend...
- $this->con->ack($frame);
+ $this->ack($frame);
$this->enqueue($item, $queue);
+ $this->commit();
+ $this->begin();
$this->stats('requeued', $queue);
return false;
}
$this->_log(LOG_INFO, "Successfully handled $info");
- $this->con->ack($frame);
+ $this->ack($frame);
+ $this->commit();
+ $this->begin();
$this->stats('handled', $queue);
return true;
}
@@ -369,5 +385,49 @@ class StompQueueManager extends QueueManager
{
common_log($level, 'StompQueueManager: '.$msg);
}
+
+ protected function begin()
+ {
+ if ($this->useTransactions) {
+ if ($this->transaction) {
+ throw new Exception("Tried to start transaction in the middle of a transaction");
+ }
+ $this->transactionCount++;
+ $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time();
+ $this->con->begin($this->transaction);
+ }
+ }
+
+ protected function ack($frame)
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to ack but not in a transaction");
+ }
+ }
+ $this->con->ack($frame, $this->transaction);
+ }
+
+ protected function commit()
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to commit but not in a transaction");
+ }
+ $this->con->commit($this->transaction);
+ $this->transaction = null;
+ }
+ }
+
+ protected function rollback()
+ {
+ if ($this->useTransactions) {
+ if (!$this->transaction) {
+ throw new Exception("Tried to rollback but not in a transaction");
+ }
+ $this->con->commit($this->transaction);
+ $this->transaction = null;
+ }
+ }
}