diff options
Diffstat (limited to 'maintenance/nextJobDB.php')
-rw-r--r-- | maintenance/nextJobDB.php | 152 |
1 files changed, 55 insertions, 97 deletions
diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php index e66e981b..219b5d8e 100644 --- a/maintenance/nextJobDB.php +++ b/maintenance/nextJobDB.php @@ -18,11 +18,10 @@ * http://www.gnu.org/copyleft/gpl.html * * @file - * @todo Make this work on PostgreSQL and maybe other database servers * @ingroup Maintenance */ -require_once( __DIR__ . '/Maintenance.php' ); +require_once __DIR__ . '/Maintenance.php'; /** * Maintenance script that picks a database that has pending jobs. @@ -33,129 +32,88 @@ class nextJobDB extends Maintenance { public function __construct() { parent::__construct(); $this->mDescription = "Pick a database that has pending jobs"; - $this->addOption( 'type', "The type of job to search for", false, true ); + $this->addOption( 'type', "Search by job type", false, true ); + $this->addOption( 'types', "Space separated list of job types to search for", false, true ); } public function execute() { - global $wgMemc; - $type = $this->getOption( 'type', false ); + global $wgJobTypesExcludedFromDefaultQueue; - $memcKey = 'jobqueue:dbs:v2'; - $pendingDBs = $wgMemc->get( $memcKey ); - - // If the cache entry wasn't present, or in 1% of cases otherwise, - // regenerate the cache. - if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) { - $pendingDBs = $this->getPendingDbs(); - $wgMemc->set( $memcKey, $pendingDBs, 300 ); + // job type required/picked + if ( $this->hasOption( 'types' ) ) { + $types = explode( ' ', $this->getOption( 'types' ) ); + } elseif ( $this->hasOption( 'type' ) ) { + $types = array( $this->getOption( 'type' ) ); + } else { + $types = false; } - if ( !$pendingDBs ) { - return; + // Handle any required periodic queue maintenance + $this->executeReadyPeriodicTasks(); + + // Get all the queues with jobs in them + $pendingDBs = JobQueueAggregator::singleton()->getAllReadyWikiQueues(); + if ( !count( $pendingDBs ) ) { + return; // no DBs with jobs or cache is both empty and locked } do { $again = false; - if ( $type === false ) { - $candidates = call_user_func_array( 'array_merge', $pendingDBs ); - } elseif ( isset( $pendingDBs[$type] ) ) { - $candidates = $pendingDBs[$type]; - } else { - $candidates = array(); - } - if ( !$candidates ) { - return; - } - - $candidates = array_values( $candidates ); - $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ]; - if ( !$this->checkJob( $type, $db ) ) { - // This job is not available in the current database. Remove it from - // the cache. - if ( $type === false ) { - foreach ( $pendingDBs as $type2 => $dbs ) { - $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) ); + $candidates = array(); // list of (type, db) + // Flatten the tree of candidates into a flat list so that a random + // item can be selected, weighing each queue (type/db tuple) equally. + foreach ( $pendingDBs as $type => $dbs ) { + if ( + ( is_array( $types ) && in_array( $type, $types ) ) || + ( $types === false && !in_array( $type, $wgJobTypesExcludedFromDefaultQueue ) ) + ) { + foreach ( $dbs as $db ) { + $candidates[] = array( $type, $db ); } - } else { - $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) ); } + } + if ( !count( $candidates ) ) { + return; // no jobs for this type + } - $wgMemc->set( $memcKey, $pendingDBs, 300 ); + list( $type, $db ) = $candidates[mt_rand( 0, count( $candidates ) - 1 )]; + if ( JobQueueGroup::singleton( $db )->isQueueDeprioritized( $type ) ) { + $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) ); $again = true; } } while ( $again ); - $this->output( $db . "\n" ); - } - - /** - * Check if the specified database has a job of the specified type in it. - * The type may be false to indicate "all". - * @param $type string - * @param $dbName string - * @return bool - */ - function checkJob( $type, $dbName ) { - $lb = wfGetLB( $dbName ); - $db = $lb->getConnection( DB_MASTER, array(), $dbName ); - if ( $type === false ) { - $conds = Job::defaultQueueConditions( ); + if ( $this->hasOption( 'types' ) ) { + $this->output( $db . " " . $type . "\n" ); } else { - $conds = array( 'job_cmd' => $type ); + $this->output( $db . "\n" ); } - - - $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ ); - $lb->reuseConnection( $db ); - return $exists; } /** - * Get all databases that have a pending job - * @return array + * Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time) + * @return integer */ - private function getPendingDbs() { - global $wgLocalDatabases; - $pendingDBs = array(); - # Cross-reference DBs by master DB server - $dbsByMaster = array(); - foreach ( $wgLocalDatabases as $db ) { - $lb = wfGetLB( $db ); - $dbsByMaster[$lb->getServerName( 0 )][] = $db; - } - - foreach ( $dbsByMaster as $dbs ) { - $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] ); - - # Padding row for MySQL bug - $pad = str_repeat( '-', 40 ); - $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)"; - foreach ( $dbs as $wikiId ) { - if ( $sql != '' ) { - $sql .= ' UNION '; + private function executeReadyPeriodicTasks() { + global $wgLocalDatabases, $wgMemc; + + $count = 0; + $memcKey = 'jobqueue:periodic:lasttime'; + $timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0 + if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes + if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock + foreach ( $wgLocalDatabases as $db ) { + $count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks(); } - - list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId ); - $dbConn->tablePrefix( $tablePrefix ); - $jobTable = $dbConn->tableName( 'job' ); - - $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)"; - } - $res = $dbConn->query( $sql, __METHOD__ ); - $first = true; - foreach ( $res as $row ) { - if ( $first ) { - // discard padding row - $first = false; - continue; - } - $pendingDBs[$row->job_cmd][] = $row->db; + $wgMemc->set( $memcKey, time() ); + $wgMemc->delete( "$memcKey:rebuild" ); // unlock } } - return $pendingDBs; + + return $count; } } $maintClass = "nextJobDb"; -require_once( RUN_MAINTENANCE_IF_MAIN ); +require_once RUN_MAINTENANCE_IF_MAIN; |