diff options
author | Craig Andrews <candrews@integralblue.com> | 2010-02-16 13:15:09 -0500 |
---|---|---|
committer | Craig Andrews <candrews@integralblue.com> | 2010-02-16 13:15:09 -0500 |
commit | 20d6a7caed6636c28cc7b95c584549691dff4388 (patch) | |
tree | bcf44f5f31a38ff08b363ab45ab486ace9ff1a99 /lib/queuemanager.php | |
parent | 32084e33a266797b306158df29e48f057651b410 (diff) | |
parent | d5cbfe8071d56438cfa168dc3db56a959317eae0 (diff) |
Merge branch '0.9.x' into 1.0.x
Conflicts:
lib/queuemanager.php
lib/xmppmanager.php
plugins/Xmpp/Fake_XMPP.php
scripts/imdaemon.php
Diffstat (limited to 'lib/queuemanager.php')
-rw-r--r-- | lib/queuemanager.php | 84 |
1 files changed, 60 insertions, 24 deletions
diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 162c9f375..576c7c0af 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,9 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; - public $master = null; - public $handlers = array(); - public $groups = array(); + protected $master = null; + protected $handlers = array(); + protected $groups = array(); + protected $activeGroups = array(); /** * Factory function to pull the appropriate QueueManager object @@ -162,7 +163,7 @@ abstract class QueueManager extends IoManager */ protected function encode($item) { - return serialize($object); + return serialize($item); } /** @@ -192,40 +193,48 @@ abstract class QueueManager extends IoManager } else if (class_exists($class)) { return new $class(); } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); } } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); } return null; } /** * Get a list of registered queue transport names to be used - * for this daemon. + * for listening in this daemon. * * @return array of strings */ - function getQueues() + function activeQueues() { - $group = $this->activeGroup(); - return array_keys($this->groups[$group]); + $queues = array(); + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + $queues = array_merge($queues, $this->groups[$group]); + } + } + + return array_keys($queues); } /** - * Initialize the list of queue handlers + * Initialize the list of queue handlers for the current site. * * @event StartInitializeQueueManager * @event EndInitializeQueueManager */ function initialize() { - // @fixme we'll want to be able to listen to particular queues... + $this->handlers = array(); + $this->groups = array(); + $this->groupsByTransport = array(); + if (Event::handle('StartInitializeQueueManager', array($this))) { - $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('distrib', 'DistribQueueHandler'); $this->connect('omb', 'OmbQueueHandler'); $this->connect('ping', 'PingQueueHandler'); - $this->connect('distrib', 'DistribQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); } @@ -244,25 +253,41 @@ abstract class QueueManager extends IoManager * @param string $class class name or object instance * @param string $group */ - public function connect($transport, $class, $group='queuedaemon') + public function connect($transport, $class, $group='main') { $this->handlers[$transport] = $class; $this->groups[$group][$transport] = $class; + $this->groupsByTransport[$transport] = $group; } /** - * @return string queue group to use for this request + * Set the active group which will be used for listening. + * @param string $group */ - function activeGroup() + function setActiveGroup($group) { - $group = 'queuedaemon'; - if ($this->master) { - // hack hack - if ($this->master instanceof ImMaster) { - return 'imdaemon'; - } + $this->activeGroups = array($group); + } + + /** + * Set the active group(s) which will be used for listening. + * @param array $groups + */ + function setActiveGroups($groups) + { + $this->activeGroups = $groups; + } + + /** + * @return string queue group for this queue + */ + function queueGroup($queue) + { + if (isset($this->groupsByTransport[$queue])) { + return $this->groupsByTransport[$queue]; + } else { + throw new Exception("Requested group for unregistered transport $queue"); } - return $group; } /** @@ -286,4 +311,15 @@ abstract class QueueManager extends IoManager $monitor->stats($key, $owners); } } + + protected function _log($level, $msg) + { + $class = get_class($this); + if ($this->activeGroups) { + $groups = ' (' . implode(',', $this->activeGroups) . ')'; + } else { + $groups = ''; + } + common_log($level, "$class$groups: $msg"); + } } |