diff options
Diffstat (limited to 'lib/queuemanager.php')
-rw-r--r-- | lib/queuemanager.php | 125 |
1 files changed, 101 insertions, 24 deletions
diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 291174d3c..4eb39bfa8 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; + public $master = null; + public $handlers = array(); + public $groups = array(); + /** * Factory function to pull the appropriate QueueManager object * for this site's configuration. It can then be used to queue @@ -110,6 +114,64 @@ abstract class QueueManager extends IoManager abstract function enqueue($object, $queue); /** + * Build a representation for an object for logging + * @param mixed + * @return string + */ + function logrep($object) { + if (is_object($object)) { + $class = get_class($object); + if (isset($object->id)) { + return "$class $object->id"; + } + return $class; + } + if (is_string($object)) { + $len = strlen($object); + $fragment = mb_substr($object, 0, 32); + if (mb_strlen($object) > 32) { + $fragment .= '...'; + } + return "string '$fragment' ($len bytes)"; + } + return strval($object); + } + + /** + * Encode an object for queued storage. + * Next gen may use serialization. + * + * @param mixed $object + * @return string + */ + protected function encode($object) + { + if ($object instanceof Notice) { + return $object->id; + } else if (is_string($object)) { + return $object; + } else { + throw new ServerException("Can't queue this type", 500); + } + } + + /** + * Decode an object from queued storage. + * Accepts back-compat notice reference entries and strings for now. + * + * @param string + * @return mixed + */ + protected function decode($frame) + { + if (is_numeric($frame)) { + return Notice::staticGet(intval($frame)); + } else { + return $frame; + } + } + + /** * Instantiate the appropriate QueueHandler class for the given queue. * * @param string $queue @@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager } /** - * Get a list of all registered queue transport names. + * Get a list of registered queue transport names to be used + * for this daemon. * * @return array of strings */ function getQueues() { - return array_keys($this->handlers); + $group = $this->activeGroup(); + return array_keys($this->groups[$group]); } /** @@ -148,33 +212,29 @@ abstract class QueueManager extends IoManager */ function initialize() { + // @fixme we'll want to be able to listen to particular queues... if (Event::handle('StartInitializeQueueManager', array($this))) { - if (!defined('XMPP_ONLY_FLAG')) { // hack! - $this->connect('plugin', 'PluginQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); - $this->connect('ping', 'PingQueueHandler'); - if (common_config('sms', 'enabled')) { - $this->connect('sms', 'SmsQueueHandler'); - } + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); } // XMPP output handlers... - if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - - // @fixme this should move up a level or should get an actual queue - $this->connect('confirm', 'XmppConfirmHandler'); - } + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // @fixme this should get an actual queue + //$this->connect('confirm', 'XmppConfirmHandler'); + + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); - if (!defined('XMPP_ONLY_FLAG')) { // hack! - // For compat with old plugins not registering their own handlers. - $this->connect('plugin', 'PluginQueueHandler'); - } - } - if (!defined('XMPP_ONLY_FLAG')) { // hack! - Event::handle('EndInitializeQueueManager', array($this)); } + Event::handle('EndInitializeQueueManager', array($this)); } /** @@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager * * @param string $transport * @param string $class + * @param string $group */ - public function connect($transport, $class) + public function connect($transport, $class, $group='queuedaemon') { $this->handlers[$transport] = $class; + $this->groups[$group][$transport] = $class; + } + + /** + * @return string queue group to use for this request + */ + function activeGroup() + { + $group = 'queuedaemon'; + if ($this->master) { + // hack hack + if ($this->master instanceof XmppMaster) { + return 'xmppdaemon'; + } + } + return $group; } /** |