From 99866a459bff927c4a72c17904d2ecbc1c421af3 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Fri, 22 Jan 2010 12:35:05 -0800 Subject: Fix for stuck queue messages: wrap processing in stomp transactions so our lack of an ACK if PHP dies actually triggers redelivery. Previously, messages once delivered would just get stuck in the queue seemingly forever if they never got ACKed. Note this could lead to partial duplication, for instance if the OMB or Twitter queue handlers die after 1/2 of the outgoing sends. Recommendations: * catch exceptions more aggressively within queue handlers (so only PHP fatal errors are likely to kill in the middle) * for processing that involves sending to multiple clients, consider a second queue similar to the XMPP output, eg for OMB: - first queue gets delivery list and builds message data, enqueueing it for each target address - second queue can handle each individual outgoing message (and attempt redelivery etc separately) This would also protect better against a recurring error preventing delivery in the second part, and could spread out any slow sends over multiple threads. --- lib/stompqueuemanager.php | 68 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 4 deletions(-) (limited to 'lib') diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index f057bd9e4..8f0091a13 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -41,6 +41,10 @@ class StompQueueManager extends QueueManager protected $sites = array(); + protected $useTransactions = true; + protected $transaction = null; + protected $transactionCount = 0; + function __construct() { parent::__construct(); @@ -201,6 +205,7 @@ class StompQueueManager extends QueueManager } else { $this->doSubscribe(); } + $this->begin(); return true; } @@ -213,6 +218,9 @@ class StompQueueManager extends QueueManager */ public function finish() { + // If there are any outstanding delivered messages we haven't processed, + // free them for another thread to take. + $this->rollback(); if ($this->sites) { foreach ($this->sites as $server) { StatusNet::init($server); @@ -293,7 +301,9 @@ class StompQueueManager extends QueueManager $notice = Notice::staticGet('id', $id); if (empty($notice)) { $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('badnotice', $queue); return false; } @@ -308,7 +318,9 @@ class StompQueueManager extends QueueManager $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERROR, "Missing handler class; skipping $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('badhandler', $queue); return false; } @@ -320,14 +332,18 @@ class StompQueueManager extends QueueManager // FIXME we probably shouldn't have to do // this kind of queue management ourselves; // if we don't ack, it should resend... - $this->con->ack($frame); + $this->ack($frame); $this->enqueue($item, $queue); + $this->commit(); + $this->begin(); $this->stats('requeued', $queue); return false; } $this->_log(LOG_INFO, "Successfully handled $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('handled', $queue); return true; } @@ -369,5 +385,49 @@ class StompQueueManager extends QueueManager { common_log($level, 'StompQueueManager: '.$msg); } + + protected function begin() + { + if ($this->useTransactions) { + if ($this->transaction) { + throw new Exception("Tried to start transaction in the middle of a transaction"); + } + $this->transactionCount++; + $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time(); + $this->con->begin($this->transaction); + } + } + + protected function ack($frame) + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to ack but not in a transaction"); + } + } + $this->con->ack($frame, $this->transaction); + } + + protected function commit() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to commit but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } + + protected function rollback() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to rollback but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } } -- cgit v1.2.3-54-g00ecf From 845f051c2f85248ef85d0a34f032792ca83f04a4 Mon Sep 17 00:00:00 2001 From: Craig Andrews Date: Fri, 22 Jan 2010 18:02:05 -0500 Subject: StompQueueManager uses decode() to decode queued frames --- lib/stompqueuemanager.php | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) (limited to 'lib') diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 8f0091a13..4bbdeedc2 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -294,26 +294,7 @@ class StompQueueManager extends QueueManager StatusNet::init($site); } - if (is_numeric($frame->body)) { - $id = intval($frame->body); - $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; - - $notice = Notice::staticGet('id', $id); - if (empty($notice)) { - $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->ack($frame); - $this->commit(); - $this->begin(); - $this->stats('badnotice', $queue); - return false; - } - - $item = $notice; - } else { - // @fixme should we serialize, or json, or what here? - $info = "string posted at {$frame->headers['created']} in queue $queue"; - $item = $frame->body; - } + $item = $this->decode($frame->body); $handler = $this->getHandler($queue); if (!$handler) { -- cgit v1.2.3-54-g00ecf From 23c0d663d63c49183494ebb049160af633a2d2ec Mon Sep 17 00:00:00 2001 From: Craig Andrews Date: Sat, 23 Jan 2010 01:03:41 -0500 Subject: Allow for instances as well as class names to be passed as queue handlers and iomanagers. --- lib/iomaster.php | 10 +++++++--- lib/queuemanager.php | 6 ++++-- 2 files changed, 11 insertions(+), 5 deletions(-) (limited to 'lib') diff --git a/lib/iomaster.php b/lib/iomaster.php index 004e92b3e..29bd677bd 100644 --- a/lib/iomaster.php +++ b/lib/iomaster.php @@ -102,7 +102,7 @@ abstract class IoMaster */ protected function instantiate($class) { - if (isset($this->singletons[$class])) { + if (is_string($class) && isset($this->singletons[$class])) { // Already instantiated a multi-site-capable handler. // Just let it know it should listen to this site too! $this->singletons[$class]->addSite(common_config('site', 'server')); @@ -129,7 +129,11 @@ abstract class IoMaster protected function getManager($class) { - return call_user_func(array($class, 'get')); + if(is_object($class)){ + return $class; + } else { + return call_user_func(array($class, 'get')); + } } /** @@ -347,7 +351,7 @@ abstract class IoMaster * for per-queue and per-site records. * * @param string $key counter name - * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' + * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01' */ public function stats($key, $owners=array()) { diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 4eb39bfa8..b2e86b127 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -181,7 +181,9 @@ abstract class QueueManager extends IoManager { if (isset($this->handlers[$queue])) { $class = $this->handlers[$queue]; - if (class_exists($class)) { + if(is_object($class)) { + return $class; + } else if (class_exists($class)) { return new $class(); } else { common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); @@ -242,7 +244,7 @@ abstract class QueueManager extends IoManager * Only registered transports will be reliably picked up! * * @param string $transport - * @param string $class + * @param string $class class name or object instance * @param string $group */ public function connect($transport, $class, $group='queuedaemon') -- cgit v1.2.3-54-g00ecf