summaryrefslogtreecommitdiff
path: root/scripts/queuedaemon.php
diff options
context:
space:
mode:
authorEvan Prodromou <evan@status.net>2010-01-22 14:18:43 -0500
committerEvan Prodromou <evan@status.net>2010-01-22 14:18:43 -0500
commitc8bc598cfd67353f33d7785556374b5d6865a7d9 (patch)
tree6be983b49cfc78a35baa361f77ad0b365a9a9bed /scripts/queuedaemon.php
parente666433eb4a66078e3459d7d4f51f5ce5a1ad589 (diff)
parent29d83c8ca94201cb010b5aef564df78ab868ea0c (diff)
Merge branch 'testing' into 0.9.x
Diffstat (limited to 'scripts/queuedaemon.php')
-rwxr-xr-xscripts/queuedaemon.php149
1 files changed, 26 insertions, 123 deletions
diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php
index 162f617e0..a9cfda6d7 100755
--- a/scripts/queuedaemon.php
+++ b/scripts/queuedaemon.php
@@ -29,6 +29,8 @@ $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-
*
* Recognizes Linux and Mac OS X; others will return default of 1.
*
+ * @fixme move this to SpawningDaemon, but to get the default val for help
+ * text we seem to need it before loading infrastructure
* @return intval
*/
function getProcessorCount()
@@ -83,143 +85,29 @@ define('CLAIM_TIMEOUT', 1200);
* We can then pass individual items through the QueueHandler subclasses
* they belong to.
*/
-class QueueDaemon extends Daemon
+class QueueDaemon extends SpawningDaemon
{
- protected $allsites;
- protected $threads=1;
+ protected $allsites = false;
function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
{
- parent::__construct($daemonize);
-
- if ($id) {
- $this->set_id($id);
- }
+ parent::__construct($id, $daemonize, $threads);
$this->all = $allsites;
- $this->threads = $threads;
- }
-
- /**
- * How many seconds a polling-based queue manager should wait between
- * checks for new items to handle.
- *
- * Defaults to 60 seconds; override to speed up or slow down.
- *
- * @return int timeout in seconds
- */
- function timeout()
- {
- return 60;
- }
-
- function name()
- {
- return strtolower(get_class($this).'.'.$this->get_id());
- }
-
- function run()
- {
- if ($this->threads > 1) {
- return $this->runThreads();
- } else {
- return $this->runLoop();
- }
- }
-
- function runThreads()
- {
- $children = array();
- for ($i = 1; $i <= $this->threads; $i++) {
- $pid = pcntl_fork();
- if ($pid < 0) {
- print "Couldn't fork for thread $i; aborting\n";
- exit(1);
- } else if ($pid == 0) {
- $this->runChild($i);
- exit(0);
- } else {
- $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
- $children[$i] = $pid;
- }
- }
-
- $this->log(LOG_INFO, "Waiting for children to complete.");
- while (count($children) > 0) {
- $status = null;
- $pid = pcntl_wait($status);
- if ($pid > 0) {
- $i = array_search($pid, $children);
- if ($i === false) {
- $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
- continue;
- }
- unset($children[$i]);
- $this->log(LOG_INFO, "Thread $i pid $pid exited.");
-
- $pid = pcntl_fork();
- if ($pid < 0) {
- print "Couldn't fork to respawn thread $i; aborting thread.\n";
- } else if ($pid == 0) {
- $this->runChild($i);
- exit(0);
- } else {
- $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
- $children[$i] = $pid;
- }
- }
- }
- $this->log(LOG_INFO, "All child processes complete.");
- return true;
- }
-
- function runChild($thread)
- {
- $this->set_id($this->get_id() . "." . $thread);
- $this->resetDb();
- $this->runLoop();
- }
-
- /**
- * Reconnect to the database for each child process,
- * or they'll get very confused trying to use the
- * same socket.
- */
- function resetDb()
- {
- // @fixme do we need to explicitly open the db too
- // or is this implied?
- global $_DB_DATAOBJECT;
- unset($_DB_DATAOBJECT['CONNECTIONS']);
-
- // Reconnect main memcached, or threads will stomp on
- // each other and corrupt their requests.
- $cache = common_memcache();
- if ($cache) {
- $cache->reconnect();
- }
-
- // Also reconnect memcached for status_network table.
- if (!empty(Status_network::$cache)) {
- Status_network::$cache->close();
- Status_network::$cache = null;
- }
}
/**
* Setup and start of run loop for this queue handler as a daemon.
* Most of the heavy lifting is passed on to the QueueManager's service()
- * method, which passes control on to the QueueHandler's handle_notice()
- * method for each notice that comes in on the queue.
- *
- * Most of the time this won't need to be overridden in a subclass.
+ * method, which passes control on to the QueueHandler's handle()
+ * method for each item that comes in on the queue.
*
* @return boolean true on success, false on failure
*/
- function runLoop()
+ function runThread()
{
$this->log(LOG_INFO, 'checking for queued notices');
- $master = new IoMaster($this->get_id());
+ $master = new QueueMaster($this->get_id());
$master->init($this->all);
$master->service();
@@ -229,10 +117,25 @@ class QueueDaemon extends Daemon
return true;
}
+}
- function log($level, $msg)
+class QueueMaster extends IoMaster
+{
+ /**
+ * Initialize IoManagers for the currently configured site
+ * which are appropriate to this instance.
+ */
+ function initManagers()
{
- common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
+ $classes = array();
+ if (Event::handle('StartQueueDaemonIoManagers', array(&$classes))) {
+ $classes[] = 'QueueManager';
+ }
+ Event::handle('EndQueueDaemonIoManagers', array(&$classes));
+
+ foreach ($classes as $class) {
+ $this->instantiate($class);
+ }
}
}