summaryrefslogtreecommitdiff
path: root/lib/stompqueuemanager.php
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-01-21 16:42:50 -0800
committerBrion Vibber <brion@pobox.com>2010-01-21 22:40:35 -0800
commit0e852def6ae5aa529cca0aef1187152fb5a880be (patch)
tree5b4b49327c5d7224c0c05ce08c1ddf72f1e44f13 /lib/stompqueuemanager.php
parent0bb23e6fd724a12bba6766949cd3294b288d8a43 (diff)
XMPP queued output & initial retooling of DB queue manager to support non-Notice objects.
Queue handlers for XMPP individual & firehose output now send their XML stanzas to another output queue instead of connecting directly to the chat server. This lets us have as many general processing threads as we need, while all actual XMPP input and output go through a single daemon with a single connection open. This avoids problems with multiple connected resources: * multiple windows shown in some chat clients (psi, gajim, kopete) * extra load on server * incoming message delivery forwarding issues Database changes: * queue_item drops 'notice_id' in favor of a 'frame' blob. This is based on Craig Andrews' work branch to generalize queues to take any object, but conservatively leaving out the serialization for now. Table updater (preserves any existing queued items) in db/rc3to09.sql Code changes to watch out for: * Queue handlers should now define a handle() method instead of handle_notice() * QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning thread management (RespawningDaemon) infrastructure. * The polling XmppConfirmManager has been dropped, as the message is queued directly when saving IM settings. * Enable $config['queue']['debug_memory'] to output current memory usage at each run through the event loop to watch for memory leaks To do: * Adapt XMPP i/o to component connection mode for multi-site support. * XMPP input can also be broken out to a queue, which would allow the actual notice save etc to be handled by general queue threads. * Make sure there are no problems with simply pushing serialized Notice objects to queues. * Find a way to improve interactive performance of the database-backed queue handler; polling is pretty painful to XMPP. * Possibly redo the way QueueHandlers are injected into a QueueManager. The grouping used to split out the XMPP output queue is a bit awkward.
Diffstat (limited to 'lib/stompqueuemanager.php')
-rw-r--r--lib/stompqueuemanager.php56
1 files changed, 33 insertions, 23 deletions
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
index 00590fdb6..f057bd9e4 100644
--- a/lib/stompqueuemanager.php
+++ b/lib/stompqueuemanager.php
@@ -39,7 +39,6 @@ class StompQueueManager extends QueueManager
var $base = null;
var $con = null;
- protected $master = null;
protected $sites = array();
function __construct()
@@ -104,11 +103,12 @@ class StompQueueManager extends QueueManager
*/
function getQueues()
{
+ $group = $this->activeGroup();
$site = common_config('site', 'server');
- if (empty($this->handlers[$site])) {
+ if (empty($this->groups[$site][$group])) {
return array();
} else {
- return array_keys($this->handlers[$site]);
+ return array_keys($this->groups[$site][$group]);
}
}
@@ -118,10 +118,12 @@ class StompQueueManager extends QueueManager
*
* @param string $transport
* @param string $class
+ * @param string $group
*/
- public function connect($transport, $class)
+ public function connect($transport, $class, $group='queuedaemon')
{
$this->handlers[common_config('site', 'server')][$transport] = $class;
+ $this->groups[common_config('site', 'server')][$group][$transport] = $class;
}
/**
@@ -130,23 +132,23 @@ class StompQueueManager extends QueueManager
*/
public function enqueue($object, $queue)
{
- $notice = $object;
+ $msg = $this->encode($object);
+ $rep = $this->logrep($object);
$this->_connect();
// XXX: serialize and send entire notice
$result = $this->con->send($this->queueName($queue),
- $notice->id, // BODY of the message
- array ('created' => $notice->created));
+ $msg, // BODY of the message
+ array ('created' => common_sql_now()));
if (!$result) {
- common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
+ common_log(LOG_ERR, "Error sending $rep to $queue queue");
return false;
}
- common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
- . $notice->id . ' for ' . $queue);
+ common_log(LOG_DEBUG, "complete remote queueing $rep for $queue");
$this->stats('enqueued', $queue);
}
@@ -174,7 +176,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
- $ok = $ok && $this->_handleNotice($frame);
+ $ok = $ok && $this->_handleItem($frame);
}
return $ok;
}
@@ -265,7 +267,7 @@ class StompQueueManager extends QueueManager
}
/**
- * Handle and acknowledge a notice event that's come in through a queue.
+ * Handle and acknowledge an event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
* Missing notices or handler classes will drop the message.
@@ -276,7 +278,7 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
- protected function _handleNotice($frame)
+ protected function _handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
@@ -284,15 +286,23 @@ class StompQueueManager extends QueueManager
StatusNet::init($site);
}
- $id = intval($frame->body);
- $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+ if (is_numeric($frame->body)) {
+ $id = intval($frame->body);
+ $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
- $notice = Notice::staticGet('id', $id);
- if (empty($notice)) {
- $this->_log(LOG_WARNING, "Skipping missing $info");
- $this->con->ack($frame);
- $this->stats('badnotice', $queue);
- return false;
+ $notice = Notice::staticGet('id', $id);
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, "Skipping missing $info");
+ $this->con->ack($frame);
+ $this->stats('badnotice', $queue);
+ return false;
+ }
+
+ $item = $notice;
+ } else {
+ // @fixme should we serialize, or json, or what here?
+ $info = "string posted at {$frame->headers['created']} in queue $queue";
+ $item = $frame->body;
}
$handler = $this->getHandler($queue);
@@ -303,7 +313,7 @@ class StompQueueManager extends QueueManager
return false;
}
- $ok = $handler->handle_notice($notice);
+ $ok = $handler->handle($item);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
@@ -311,7 +321,7 @@ class StompQueueManager extends QueueManager
// this kind of queue management ourselves;
// if we don't ack, it should resend...
$this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ $this->enqueue($item, $queue);
$this->stats('requeued', $queue);
return false;
}