summaryrefslogtreecommitdiff
path: root/lib/queuemanager.php
diff options
context:
space:
mode:
Diffstat (limited to 'lib/queuemanager.php')
-rw-r--r--lib/queuemanager.php125
1 files changed, 101 insertions, 24 deletions
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
index 291174d3c..4eb39bfa8 100644
--- a/lib/queuemanager.php
+++ b/lib/queuemanager.php
@@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager
{
static $qm = null;
+ public $master = null;
+ public $handlers = array();
+ public $groups = array();
+
/**
* Factory function to pull the appropriate QueueManager object
* for this site's configuration. It can then be used to queue
@@ -110,6 +114,64 @@ abstract class QueueManager extends IoManager
abstract function enqueue($object, $queue);
/**
+ * Build a representation for an object for logging
+ * @param mixed
+ * @return string
+ */
+ function logrep($object) {
+ if (is_object($object)) {
+ $class = get_class($object);
+ if (isset($object->id)) {
+ return "$class $object->id";
+ }
+ return $class;
+ }
+ if (is_string($object)) {
+ $len = strlen($object);
+ $fragment = mb_substr($object, 0, 32);
+ if (mb_strlen($object) > 32) {
+ $fragment .= '...';
+ }
+ return "string '$fragment' ($len bytes)";
+ }
+ return strval($object);
+ }
+
+ /**
+ * Encode an object for queued storage.
+ * Next gen may use serialization.
+ *
+ * @param mixed $object
+ * @return string
+ */
+ protected function encode($object)
+ {
+ if ($object instanceof Notice) {
+ return $object->id;
+ } else if (is_string($object)) {
+ return $object;
+ } else {
+ throw new ServerException("Can't queue this type", 500);
+ }
+ }
+
+ /**
+ * Decode an object from queued storage.
+ * Accepts back-compat notice reference entries and strings for now.
+ *
+ * @param string
+ * @return mixed
+ */
+ protected function decode($frame)
+ {
+ if (is_numeric($frame)) {
+ return Notice::staticGet(intval($frame));
+ } else {
+ return $frame;
+ }
+ }
+
+ /**
* Instantiate the appropriate QueueHandler class for the given queue.
*
* @param string $queue
@@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager
}
/**
- * Get a list of all registered queue transport names.
+ * Get a list of registered queue transport names to be used
+ * for this daemon.
*
* @return array of strings
*/
function getQueues()
{
- return array_keys($this->handlers);
+ $group = $this->activeGroup();
+ return array_keys($this->groups[$group]);
}
/**
@@ -148,33 +212,29 @@ abstract class QueueManager extends IoManager
*/
function initialize()
{
+ // @fixme we'll want to be able to listen to particular queues...
if (Event::handle('StartInitializeQueueManager', array($this))) {
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- $this->connect('plugin', 'PluginQueueHandler');
- $this->connect('omb', 'OmbQueueHandler');
- $this->connect('ping', 'PingQueueHandler');
- if (common_config('sms', 'enabled')) {
- $this->connect('sms', 'SmsQueueHandler');
- }
+ $this->connect('plugin', 'PluginQueueHandler');
+ $this->connect('omb', 'OmbQueueHandler');
+ $this->connect('ping', 'PingQueueHandler');
+ if (common_config('sms', 'enabled')) {
+ $this->connect('sms', 'SmsQueueHandler');
}
// XMPP output handlers...
- if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
- $this->connect('jabber', 'JabberQueueHandler');
- $this->connect('public', 'PublicQueueHandler');
-
- // @fixme this should move up a level or should get an actual queue
- $this->connect('confirm', 'XmppConfirmHandler');
- }
+ $this->connect('jabber', 'JabberQueueHandler');
+ $this->connect('public', 'PublicQueueHandler');
+
+ // @fixme this should get an actual queue
+ //$this->connect('confirm', 'XmppConfirmHandler');
+
+ // For compat with old plugins not registering their own handlers.
+ $this->connect('plugin', 'PluginQueueHandler');
+
+ $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon');
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- // For compat with old plugins not registering their own handlers.
- $this->connect('plugin', 'PluginQueueHandler');
- }
- }
- if (!defined('XMPP_ONLY_FLAG')) { // hack!
- Event::handle('EndInitializeQueueManager', array($this));
}
+ Event::handle('EndInitializeQueueManager', array($this));
}
/**
@@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager
*
* @param string $transport
* @param string $class
+ * @param string $group
*/
- public function connect($transport, $class)
+ public function connect($transport, $class, $group='queuedaemon')
{
$this->handlers[$transport] = $class;
+ $this->groups[$group][$transport] = $class;
+ }
+
+ /**
+ * @return string queue group to use for this request
+ */
+ function activeGroup()
+ {
+ $group = 'queuedaemon';
+ if ($this->master) {
+ // hack hack
+ if ($this->master instanceof XmppMaster) {
+ return 'xmppdaemon';
+ }
+ }
+ return $group;
}
/**