summaryrefslogtreecommitdiff
path: root/lib/queuemanager.php
diff options
context:
space:
mode:
authorBrion Vibber <brion@pobox.com>2010-01-22 12:52:36 -0800
committerBrion Vibber <brion@pobox.com>2010-01-22 12:52:36 -0800
commitc7507e7e9dafa6d6e054978e720e4fce3abc9929 (patch)
treed756f2a2c7c9b487b6ef0f066a87f3cde40bc65d /lib/queuemanager.php
parenta3e484a0e898bb94dd45cd7807bea1a931d7c6a9 (diff)
XMPP queued output & initial retooling of DB queue manager to support non-Notice objects.
Queue handlers for XMPP individual & firehose output now send their XML stanzas to another output queue instead of connecting directly to the chat server. This lets us have as many general processing threads as we need, while all actual XMPP input and output go through a single daemon with a single connection open. This avoids problems with multiple connected resources: * multiple windows shown in some chat clients (psi, gajim, kopete) * extra load on server * incoming message delivery forwarding issues Database changes: * queue_item drops 'notice_id' in favor of a 'frame' blob. This is based on Craig Andrews' work branch to generalize queues to take any object, but conservatively leaving out the serialization for now. Table updater (preserves any existing queued items) in db/rc3to09.sql Code changes to watch out for: * Queue handlers should now define a handle() method instead of handle_notice() * QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning thread management (RespawningDaemon) infrastructure. * The polling XmppConfirmManager has been dropped, as the message is queued directly when saving IM settings. * Enable $config['queue']['debug_memory'] to output current memory usage at each run through the event loop to watch for memory leaks To do: * Adapt XMPP i/o to component connection mode for multi-site support. * XMPP input can also be broken out to a queue, which would allow the actual notice save etc to be handled by general queue threads. * Make sure there are no problems with simply pushing serialized Notice objects to queues. * Find a way to improve interactive performance of the database-backed queue handler; polling is pretty painful to XMPP. * Possibly redo the way QueueHandlers are injected into a QueueManager. The grouping used to split out the XMPP output queue is a bit awkward. Conflicts: scripts/xmppdaemon.php
Diffstat (limited to 'lib/queuemanager.php')
-rw-r--r--lib/queuemanager.php125
1 files changed, 101 insertions, 24 deletions
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index 291174d3c..4eb39bfa8 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager
{
static $qm = null;
+ public $master = null;
+ public $handlers = array();
+ public $groups = array();
+
/**
* Factory function to pull the appropriate QueueManager object
* for this site's configuration. It can then be used to queue
@@ -110,6 +114,64 @@ abstract class QueueManager extends IoManager
abstract function enqueue($object, $queue);
/**
+ * Build a representation for an object for logging
+ * @param mixed
+ * @return string
+ */
+ function logrep($object) {
+ if (is_object($object)) {
+ $class = get_class($object);
+ if (isset($object->id)) {
+ return "$class $object->id";
+ }
+ return $class;
+ }
+ if (is_string($object)) {
+ $len = strlen($object);
+ $fragment = mb_substr($object, 0, 32);
+ if (mb_strlen($object) > 32) {
+ $fragment .= '...';
+ }
+ return "string '$fragment' ($len bytes)";
+ }
+ return strval($object);
+ }
+
+ /**
+ * Encode an object for queued storage.
+ * Next gen may use serialization.
+ *
+ * @param mixed $object
+ * @return string
+ */
+ protected function encode($object)
+ {
+ if ($object instanceof Notice) {
+ return $object->id;
+ } else if (is_string($object)) {
+ return $object;
+ } else {
+ throw new ServerException("Can't queue this type", 500);
+ }
+ }
+
+ /**
+ * Decode an object from queued storage.
+ * Accepts back-compat notice reference entries and strings for now.
+ *
+ * @param string
+ * @return mixed
+ */
+ protected function decode($frame)
+ {
+ if (is_numeric($frame)) {
+ return Notice::staticGet(intval($frame));
+ } else {
+ return $frame;
+ }
+ }
+
+ /**
* Instantiate the appropriate QueueHandler class for the given queue.
*
* @param string $queue
@@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager
}
/**
- * Get a list of all registered queue transport names.
+ * Get a list of registered queue transport names to be used
+ * for this daemon.
*
* @return array of strings
*/
function getQueues()
{
- return array_keys($this->handlers);
+ $group = $this->activeGroup();
+ return array_keys($this->groups[$group]);
}
/**
@@ -148,33 +212,29 @@ abstract class QueueManager extends IoManager
*/
function initialize()
{
+ // @fixme we'll want to be able to listen to particular queues...
if (Event::handle('StartInitializeQueueManager', array($this))) {
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- $this->connect('plugin', 'PluginQueueHandler');
- $this->connect('omb', 'OmbQueueHandler');
- $this->connect('ping', 'PingQueueHandler');
- if (common_config('sms', 'enabled')) {
- $this->connect('sms', 'SmsQueueHandler');
- }
+ $this->connect('plugin', 'PluginQueueHandler');
+ $this->connect('omb', 'OmbQueueHandler');
+ $this->connect('ping', 'PingQueueHandler');
+ if (common_config('sms', 'enabled')) {
+ $this->connect('sms', 'SmsQueueHandler');
}
// XMPP output handlers...
- if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
- $this->connect('jabber', 'JabberQueueHandler');
- $this->connect('public', 'PublicQueueHandler');
-
- // @fixme this should move up a level or should get an actual queue
- $this->connect('confirm', 'XmppConfirmHandler');
- }
+ $this->connect('jabber', 'JabberQueueHandler');
+ $this->connect('public', 'PublicQueueHandler');
+
+ // @fixme this should get an actual queue
+ //$this->connect('confirm', 'XmppConfirmHandler');
+
+ // For compat with old plugins not registering their own handlers.
+ $this->connect('plugin', 'PluginQueueHandler');
+
+ $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon');
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- // For compat with old plugins not registering their own handlers.
- $this->connect('plugin', 'PluginQueueHandler');
- }
- }
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- Event::handle('EndInitializeQueueManager', array($this));
}
+ Event::handle('EndInitializeQueueManager', array($this));
}
/**
@@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager
*
* @param string $transport
* @param string $class
+ * @param string $group
*/
- public function connect($transport, $class)
+ public function connect($transport, $class, $group='queuedaemon')
{
$this->handlers[$transport] = $class;
+ $this->groups[$group][$transport] = $class;
+ }
+
+ /**
+ * @return string queue group to use for this request
+ */
+ function activeGroup()
+ {
+ $group = 'queuedaemon';
+ if ($this->master) {
+ // hack hack
+ if ($this->master instanceof XmppMaster) {
+ return 'xmppdaemon';
+ }
+ }
+ return $group;
}
/**