diff options
Diffstat (limited to 'scripts/queuedaemon.php')
-rwxr-xr-x | scripts/queuedaemon.php | 149 |
1 files changed, 26 insertions, 123 deletions
diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php index 162f617e0..a9cfda6d7 100755 --- a/scripts/queuedaemon.php +++ b/scripts/queuedaemon.php @@ -29,6 +29,8 @@ $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp- * * Recognizes Linux and Mac OS X; others will return default of 1. * + * @fixme move this to SpawningDaemon, but to get the default val for help + * text we seem to need it before loading infrastructure * @return intval */ function getProcessorCount() @@ -83,143 +85,29 @@ define('CLAIM_TIMEOUT', 1200); * We can then pass individual items through the QueueHandler subclasses * they belong to. */ -class QueueDaemon extends Daemon +class QueueDaemon extends SpawningDaemon { - protected $allsites; - protected $threads=1; + protected $allsites = false; function __construct($id=null, $daemonize=true, $threads=1, $allsites=false) { - parent::__construct($daemonize); - - if ($id) { - $this->set_id($id); - } + parent::__construct($id, $daemonize, $threads); $this->all = $allsites; - $this->threads = $threads; - } - - /** - * How many seconds a polling-based queue manager should wait between - * checks for new items to handle. - * - * Defaults to 60 seconds; override to speed up or slow down. - * - * @return int timeout in seconds - */ - function timeout() - { - return 60; - } - - function name() - { - return strtolower(get_class($this).'.'.$this->get_id()); - } - - function run() - { - if ($this->threads > 1) { - return $this->runThreads(); - } else { - return $this->runLoop(); - } - } - - function runThreads() - { - $children = array(); - for ($i = 1; $i <= $this->threads; $i++) { - $pid = pcntl_fork(); - if ($pid < 0) { - print "Couldn't fork for thread $i; aborting\n"; - exit(1); - } else if ($pid == 0) { - $this->runChild($i); - exit(0); - } else { - $this->log(LOG_INFO, "Spawned thread $i as pid $pid"); - $children[$i] = $pid; - } - } - - $this->log(LOG_INFO, "Waiting for children to complete."); - while (count($children) > 0) { - $status = null; - $pid = pcntl_wait($status); - if ($pid > 0) { - $i = array_search($pid, $children); - if ($i === false) { - $this->log(LOG_ERR, "Unrecognized child pid $pid exited!"); - continue; - } - unset($children[$i]); - $this->log(LOG_INFO, "Thread $i pid $pid exited."); - - $pid = pcntl_fork(); - if ($pid < 0) { - print "Couldn't fork to respawn thread $i; aborting thread.\n"; - } else if ($pid == 0) { - $this->runChild($i); - exit(0); - } else { - $this->log(LOG_INFO, "Respawned thread $i as pid $pid"); - $children[$i] = $pid; - } - } - } - $this->log(LOG_INFO, "All child processes complete."); - return true; - } - - function runChild($thread) - { - $this->set_id($this->get_id() . "." . $thread); - $this->resetDb(); - $this->runLoop(); - } - - /** - * Reconnect to the database for each child process, - * or they'll get very confused trying to use the - * same socket. - */ - function resetDb() - { - // @fixme do we need to explicitly open the db too - // or is this implied? - global $_DB_DATAOBJECT; - unset($_DB_DATAOBJECT['CONNECTIONS']); - - // Reconnect main memcached, or threads will stomp on - // each other and corrupt their requests. - $cache = common_memcache(); - if ($cache) { - $cache->reconnect(); - } - - // Also reconnect memcached for status_network table. - if (!empty(Status_network::$cache)) { - Status_network::$cache->close(); - Status_network::$cache = null; - } } /** * Setup and start of run loop for this queue handler as a daemon. * Most of the heavy lifting is passed on to the QueueManager's service() - * method, which passes control on to the QueueHandler's handle_notice() - * method for each notice that comes in on the queue. - * - * Most of the time this won't need to be overridden in a subclass. + * method, which passes control on to the QueueHandler's handle() + * method for each item that comes in on the queue. * * @return boolean true on success, false on failure */ - function runLoop() + function runThread() { $this->log(LOG_INFO, 'checking for queued notices'); - $master = new IoMaster($this->get_id()); + $master = new QueueMaster($this->get_id()); $master->init($this->all); $master->service(); @@ -229,10 +117,25 @@ class QueueDaemon extends Daemon return true; } +} - function log($level, $msg) +class QueueMaster extends IoMaster +{ + /** + * Initialize IoManagers for the currently configured site + * which are appropriate to this instance. + */ + function initManagers() { - common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg); + $classes = array(); + if (Event::handle('StartQueueDaemonIoManagers', array(&$classes))) { + $classes[] = 'QueueManager'; + } + Event::handle('EndQueueDaemonIoManagers', array(&$classes)); + + foreach ($classes as $class) { + $this->instantiate($class); + } } } |