From c7507e7e9dafa6d6e054978e720e4fce3abc9929 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Fri, 22 Jan 2010 12:52:36 -0800 Subject: XMPP queued output & initial retooling of DB queue manager to support non-Notice objects. Queue handlers for XMPP individual & firehose output now send their XML stanzas to another output queue instead of connecting directly to the chat server. This lets us have as many general processing threads as we need, while all actual XMPP input and output go through a single daemon with a single connection open. This avoids problems with multiple connected resources: * multiple windows shown in some chat clients (psi, gajim, kopete) * extra load on server * incoming message delivery forwarding issues Database changes: * queue_item drops 'notice_id' in favor of a 'frame' blob. This is based on Craig Andrews' work branch to generalize queues to take any object, but conservatively leaving out the serialization for now. Table updater (preserves any existing queued items) in db/rc3to09.sql Code changes to watch out for: * Queue handlers should now define a handle() method instead of handle_notice() * QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning thread management (RespawningDaemon) infrastructure. * The polling XmppConfirmManager has been dropped, as the message is queued directly when saving IM settings. * Enable $config['queue']['debug_memory'] to output current memory usage at each run through the event loop to watch for memory leaks To do: * Adapt XMPP i/o to component connection mode for multi-site support. * XMPP input can also be broken out to a queue, which would allow the actual notice save etc to be handled by general queue threads. * Make sure there are no problems with simply pushing serialized Notice objects to queues. * Find a way to improve interactive performance of the database-backed queue handler; polling is pretty painful to XMPP. * Possibly redo the way QueueHandlers are injected into a QueueManager. The grouping used to split out the XMPP output queue is a bit awkward. Conflicts: scripts/xmppdaemon.php --- lib/queuemanager.php | 125 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 101 insertions(+), 24 deletions(-) (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 291174d3c..4eb39bfa8 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; + public $master = null; + public $handlers = array(); + public $groups = array(); + /** * Factory function to pull the appropriate QueueManager object * for this site's configuration. It can then be used to queue @@ -109,6 +113,64 @@ abstract class QueueManager extends IoManager */ abstract function enqueue($object, $queue); + /** + * Build a representation for an object for logging + * @param mixed + * @return string + */ + function logrep($object) { + if (is_object($object)) { + $class = get_class($object); + if (isset($object->id)) { + return "$class $object->id"; + } + return $class; + } + if (is_string($object)) { + $len = strlen($object); + $fragment = mb_substr($object, 0, 32); + if (mb_strlen($object) > 32) { + $fragment .= '...'; + } + return "string '$fragment' ($len bytes)"; + } + return strval($object); + } + + /** + * Encode an object for queued storage. + * Next gen may use serialization. + * + * @param mixed $object + * @return string + */ + protected function encode($object) + { + if ($object instanceof Notice) { + return $object->id; + } else if (is_string($object)) { + return $object; + } else { + throw new ServerException("Can't queue this type", 500); + } + } + + /** + * Decode an object from queued storage. + * Accepts back-compat notice reference entries and strings for now. + * + * @param string + * @return mixed + */ + protected function decode($frame) + { + if (is_numeric($frame)) { + return Notice::staticGet(intval($frame)); + } else { + return $frame; + } + } + /** * Instantiate the appropriate QueueHandler class for the given queue. * @@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager } /** - * Get a list of all registered queue transport names. + * Get a list of registered queue transport names to be used + * for this daemon. * * @return array of strings */ function getQueues() { - return array_keys($this->handlers); + $group = $this->activeGroup(); + return array_keys($this->groups[$group]); } /** @@ -148,33 +212,29 @@ abstract class QueueManager extends IoManager */ function initialize() { + // @fixme we'll want to be able to listen to particular queues... if (Event::handle('StartInitializeQueueManager', array($this))) { - if (!defined('XMPP_ONLY_FLAG')) { // hack! - $this->connect('plugin', 'PluginQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); - $this->connect('ping', 'PingQueueHandler'); - if (common_config('sms', 'enabled')) { - $this->connect('sms', 'SmsQueueHandler'); - } + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); } // XMPP output handlers... - if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - - // @fixme this should move up a level or should get an actual queue - $this->connect('confirm', 'XmppConfirmHandler'); - } + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // @fixme this should get an actual queue + //$this->connect('confirm', 'XmppConfirmHandler'); + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); - if (!defined('XMPP_ONLY_FLAG')) { // hack! - // For compat with old plugins not registering their own handlers. - $this->connect('plugin', 'PluginQueueHandler'); - } - } - if (!defined('XMPP_ONLY_FLAG')) { // hack! - Event::handle('EndInitializeQueueManager', array($this)); } + Event::handle('EndInitializeQueueManager', array($this)); } /** @@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager * * @param string $transport * @param string $class + * @param string $group */ - public function connect($transport, $class) + public function connect($transport, $class, $group='queuedaemon') { $this->handlers[$transport] = $class; + $this->groups[$group][$transport] = $class; + } + + /** + * @return string queue group to use for this request + */ + function activeGroup() + { + $group = 'queuedaemon'; + if ($this->master) { + // hack hack + if ($this->master instanceof XmppMaster) { + return 'xmppdaemon'; + } + } + return $group; } /** -- cgit v1.2.3 From e26a843caf9f6bb0d11a7128884db235ededcce0 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Mon, 25 Jan 2010 18:08:21 -0500 Subject: Offload inbox updates to a queue handler to speed up posting online Moved much of the writing that happens when posting a notice to a new queuehandler, distribqueuehandler. This updates tags, groups, replies and inboxes at queue time (or at Web time, if queues are disabled). To make this work well, I had to break up the monolithic Notice::blowCaches() and make cache blowing happen closer to where data is updated. Squashed commit of the following: commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3 Author: Evan Prodromou Date: Mon Jan 25 14:56:41 2010 -0500 slightly better handling of blowing tag memory cache commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef Author: Evan Prodromou Date: Mon Jan 25 01:42:56 2010 -0500 change 'distribute' to 'distrib' so not too long for dbqueue commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f Author: Evan Prodromou Date: Mon Jan 25 01:39:15 2010 -0500 change handle_notice() to handle() in distributqueuehandler commit 1a39ccd28b9994137d7bfd21bb4f230546938e77 Author: Evan Prodromou Date: Mon Jan 25 16:05:25 2010 -0500 error with queuemanager commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7 Author: Evan Prodromou Date: Mon Jan 25 01:11:34 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8 Author: Evan Prodromou Date: Mon Jan 25 00:48:44 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 1c781dd08c88a35dafc5c01230b4872fd6b95182 Author: Evan Prodromou Date: Wed Jan 20 08:54:18 2010 -0500 move broadcasting and distributing to new queuehandler commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954 Author: Evan Prodromou Date: Wed Jan 20 08:53:12 2010 -0500 Move distribution of notices to new distribute queue handler --- lib/queuemanager.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'lib/queuemanager.php') diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 4eb39bfa8..e5cf8239e 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -217,6 +217,7 @@ abstract class QueueManager extends IoManager $this->connect('plugin', 'PluginQueueHandler'); $this->connect('omb', 'OmbQueueHandler'); $this->connect('ping', 'PingQueueHandler'); + $this->connect('distrib', 'DistribQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); } @@ -224,7 +225,7 @@ abstract class QueueManager extends IoManager // XMPP output handlers... $this->connect('jabber', 'JabberQueueHandler'); $this->connect('public', 'PublicQueueHandler'); - + // @fixme this should get an actual queue //$this->connect('confirm', 'XmppConfirmHandler'); -- cgit v1.2.3