diff options
Diffstat (limited to 'includes/jobqueue')
24 files changed, 631 insertions, 386 deletions
diff --git a/includes/jobqueue/Job.php b/includes/jobqueue/Job.php index f8de0b5d..3e23391c 100644 --- a/includes/jobqueue/Job.php +++ b/includes/jobqueue/Job.php @@ -32,7 +32,7 @@ abstract class Job implements IJobSpecification { /** @var string */ public $command; - /** @var array|bool Array of job parameters or false if none */ + /** @var array Array of job parameters */ public $params; /** @var array Additional queue metadata */ @@ -58,11 +58,11 @@ abstract class Job implements IJobSpecification { * * @param string $command Job command * @param Title $title Associated title - * @param array|bool $params Job parameters + * @param array $params Job parameters * @throws MWException * @return Job */ - public static function factory( $command, Title $title, $params = false ) { + public static function factory( $command, Title $title, $params = array() ) { global $wgJobClasses; if ( isset( $wgJobClasses[$command] ) ) { $class = $wgJobClasses[$command]; @@ -80,7 +80,7 @@ abstract class Job implements IJobSpecification { public function __construct( $command, $title, $params = false ) { $this->command = $command; $this->title = $title; - $this->params = $params; + $this->params = is_array( $params ) ? $params : array(); // sanity // expensive jobs may set this to true $this->removeDuplicates = false; @@ -135,6 +135,24 @@ abstract class Job implements IJobSpecification { } /** + * @return int|null UNIX timestamp of when the job was queued, or null + * @since 1.26 + */ + public function getQueuedTimestamp() { + return isset( $this->metadata['timestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->metadata['timestamp'] ) + : null; + } + + /** + * @return int|null UNIX timestamp of when the job was runnable, or null + * @since 1.26 + */ + public function getReadyTimestamp() { + return $this->getReleaseTimestamp() ?: $this->getQueuedTimestamp(); + } + + /** * Whether the queue should reject insertion of this job if a duplicate exists * * This can be used to avoid duplicated effort or combined with delayed jobs to @@ -196,15 +214,27 @@ abstract class Job implements IJobSpecification { } /** + * Get "root job" parameters for a task + * + * This is used to no-op redundant jobs, including child jobs of jobs, + * as long as the children inherit the root job parameters. When a job + * with root job parameters and "rootJobIsSelf" set is pushed, the + * deduplicateRootJob() method is automatically called on it. If the + * root job is only virtual and not actually pushed (e.g. the sub-jobs + * are inserted directly), then call deduplicateRootJob() directly. + * * @see JobQueue::deduplicateRootJob() + * * @param string $key A key that identifies the task * @return array Map of: + * - rootJobIsSelf : true * - rootJobSignature : hash (e.g. SHA1) that identifies the task * - rootJobTimestamp : TS_MW timestamp of this instance of the task * @since 1.21 */ public static function newRootJobParams( $key ) { return array( + 'rootJobIsSelf' => true, 'rootJobSignature' => sha1( $key ), 'rootJobTimestamp' => wfTimestampNow() ); @@ -237,6 +267,14 @@ abstract class Job implements IJobSpecification { } /** + * @see JobQueue::deduplicateRootJob() + * @return bool Whether this is job is a root job + */ + public function isRootJob() { + return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] ); + } + + /** * Insert a single job into the queue. * @return bool True on success * @deprecated since 1.21 diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index 91fe86cf..69a3defb 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -94,7 +94,7 @@ abstract class JobQueue { * This might be useful for improving concurrency for job acquisition. * - claimTTL : If supported, the queue will recycle jobs that have been popped * but not acknowledged as completed after this many seconds. Recycling - * of jobs simple means re-inserting them into the queue. Jobs can be + * of jobs simply means re-inserting them into the queue. Jobs can be * attempted up to three times before being discarded. * * Queue classes should throw an exception if they do not support the options given. @@ -286,7 +286,7 @@ abstract class JobQueue { * This does not require $wgJobClasses to be set for the given job type. * Outside callers should use JobQueueGroup::push() instead of this function. * - * @param Job|array $jobs A single job or an array of Jobs + * @param JobSpecification|JobSpecification[] $jobs * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) * @return void * @throws JobQueueError @@ -301,7 +301,7 @@ abstract class JobQueue { * This does not require $wgJobClasses to be set for the given job type. * Outside callers should use JobQueueGroup::push() instead of this function. * - * @param array $jobs List of Jobs + * @param JobSpecification[] $jobs * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) * @return void * @throws MWException @@ -323,11 +323,17 @@ abstract class JobQueue { $this->doBatchPush( $jobs, $flags ); $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); + + foreach ( $jobs as $job ) { + if ( $job->isRootJob() ) { + $this->deduplicateRootJob( $job ); + } + } } /** * @see JobQueue::batchPush() - * @param array $jobs + * @param JobSpecification[] $jobs * @param int $flags */ abstract protected function doBatchPush( array $jobs, $flags ); @@ -359,7 +365,7 @@ abstract class JobQueue { // Flag this job as an old duplicate based on its "root" job... try { if ( $job && $this->isRootJobOldDuplicate( $job ) ) { - JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki ); + JobQueue::incrStats( 'dupe_pops', $this->type ); $job = DuplicateJob::newFromJob( $job ); // convert to a no-op } } catch ( Exception $e ) { @@ -425,11 +431,11 @@ abstract class JobQueue { * * This does nothing for certain queue classes. * - * @param Job $job + * @param IJobSpecification $job * @throws MWException * @return bool */ - final public function deduplicateRootJob( Job $job ) { + final public function deduplicateRootJob( IJobSpecification $job ) { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } @@ -440,11 +446,11 @@ abstract class JobQueue { /** * @see JobQueue::deduplicateRootJob() - * @param Job $job + * @param IJobSpecification $job * @throws MWException * @return bool */ - protected function doDeduplicateRootJob( Job $job ) { + protected function doDeduplicateRootJob( IJobSpecification $job ) { if ( !$job->hasRootJobParams() ) { throw new MWException( "Cannot register root job; missing parameters." ); } @@ -549,35 +555,6 @@ abstract class JobQueue { } /** - * Return a map of task names to task definition maps. - * A "task" is a fast periodic queue maintenance action. - * Mutually exclusive tasks must implement their own locking in the callback. - * - * Each task value is an associative array with: - * - name : the name of the task - * - callback : a PHP callable that performs the task - * - period : the period in seconds corresponding to the task frequency - * - * @return array - */ - final public function getPeriodicTasks() { - $tasks = $this->doGetPeriodicTasks(); - foreach ( $tasks as $name => &$def ) { - $def['name'] = $name; - } - - return $tasks; - } - - /** - * @see JobQueue::getPeriodicTasks() - * @return array - */ - protected function doGetPeriodicTasks() { - return array(); - } - - /** * Clear any process and persistent caches * * @return void @@ -616,6 +593,20 @@ abstract class JobQueue { } /** + * Get an iterator to traverse over all claimed jobs in this queue + * + * Callers should be quick to iterator over it or few results + * will be returned due to jobs being acknowledged and deleted + * + * @return Iterator + * @throws JobQueueError + * @since 1.26 + */ + public function getAllAcquiredJobs() { + return new ArrayIterator( array() ); // not implemented + } + + /** * Get an iterator to traverse over all abandoned jobs in this queue * * @return Iterator @@ -646,7 +637,6 @@ abstract class JobQueue { * @since 1.22 */ final public function getSiblingQueuesWithJobs( array $types ) { - return $this->doGetSiblingQueuesWithJobs( $types ); } @@ -670,7 +660,6 @@ abstract class JobQueue { * @since 1.22 */ final public function getSiblingQueueSizes( array $types ) { - return $this->doGetSiblingQueueSizes( $types ); } @@ -689,15 +678,15 @@ abstract class JobQueue { * @param string $key Event type * @param string $type Job type * @param int $delta - * @param string $wiki Wiki ID (added in 1.23) * @since 1.22 */ - public static function incrStats( $key, $type, $delta = 1, $wiki = null ) { - wfIncrStats( $key, $delta ); - wfIncrStats( "{$key}-{$type}", $delta ); - if ( $wiki !== null ) { - wfIncrStats( "{$key}-{$type}-{$wiki}", $delta ); + public static function incrStats( $key, $type, $delta = 1 ) { + static $stats; + if ( !$stats ) { + $stats = RequestContext::getMain()->getStats(); } + $stats->updateCount( "jobqueue.{$key}.all", $delta ); + $stats->updateCount( "jobqueue.{$key}.{$type}", $delta ); } /** diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index d5f47ffd..d1e4a135 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -29,7 +29,6 @@ */ class JobQueueDB extends JobQueue { const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating - const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random const MAX_OFFSET = 255; // integer; maximum number of rows to skip @@ -71,15 +70,6 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doIsEmpty() { - $key = $this->getCacheKey( 'empty' ); - - $isEmpty = $this->cache->get( $key ); - if ( $isEmpty === 'true' ) { - return true; - } elseif ( $isEmpty === 'false' ) { - return false; - } - $dbr = $this->getSlaveDB(); try { $found = $dbr->selectField( // unclaimed job @@ -88,7 +78,6 @@ class JobQueueDB extends JobQueue { } catch ( DBError $e ) { $this->throwDBException( $e ); } - $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); return !$found; } @@ -256,12 +245,9 @@ class JobQueueDB extends JobQueue { foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { $dbw->insert( 'job', $rowBatch, $method ); } - JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki ); - JobQueue::incrStats( - 'job-insert-duplicate', - $this->type, - count( $rowSet ) + count( $rowList ) - count( $rows ), - $this->wiki + JobQueue::incrStats( 'inserts', $this->type, count( $rows ) ); + JobQueue::incrStats( 'dupe_inserts', $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) ); } catch ( DBError $e ) { if ( $flags & self::QOS_ATOMIC ) { @@ -273,8 +259,6 @@ class JobQueueDB extends JobQueue { $dbw->commit( $method ); } - $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); - return; } @@ -283,10 +267,6 @@ class JobQueueDB extends JobQueue { * @return Job|bool */ protected function doPop() { - if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { - return false; // queue is empty - } - $dbw = $this->getMasterDB(); try { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction @@ -309,22 +289,23 @@ class JobQueueDB extends JobQueue { } // Check if we found a row to reserve... if ( !$row ) { - $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); break; // nothing to do } - JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); + JobQueue::incrStats( 'pops', $this->type ); // Get the job object from the row... - $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); - if ( !$title ) { - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - wfDebug( "Row has invalid title '{$row->job_title}'.\n" ); - continue; // try again - } + $title = Title::makeTitle( $row->job_namespace, $row->job_title ); $job = Job::factory( $row->job_cmd, $title, self::extractBlob( $row->job_params ), $row->job_id ); $job->metadata['id'] = $row->job_id; + $job->metadata['timestamp'] = $row->job_timestamp; break; // done } while ( true ); + + if ( !$job || mt_rand( 0, 9 ) == 0 ) { + // Handled jobs that need to be recycled/deleted; + // any recycled jobs will be picked up next attempt + $this->recycleAndDeleteStaleJobs(); + } } catch ( DBError $e ) { $this->throwDBException( $e ); } @@ -495,6 +476,8 @@ class JobQueueDB extends JobQueue { // Delete a row with a single DELETE without holding row locks over RTTs... $dbw->delete( 'job', array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + + JobQueue::incrStats( 'acks', $this->type ); } catch ( DBError $e ) { $this->throwDBException( $e ); } @@ -504,11 +487,11 @@ class JobQueueDB extends JobQueue { /** * @see JobQueue::doDeduplicateRootJob() - * @param Job $job + * @param IJobSpecification $job * @throws MWException * @return bool */ - protected function doDeduplicateRootJob( Job $job ) { + protected function doDeduplicateRootJob( IJobSpecification $job ) { $params = $job->getParams(); if ( !isset( $params['rootJobSignature'] ) ) { throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); @@ -560,22 +543,10 @@ class JobQueueDB extends JobQueue { } /** - * @return array - */ - protected function doGetPeriodicTasks() { - return array( - 'recycleAndDeleteStaleJobs' => array( - 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) - ) - ); - } - - /** * @return void */ protected function doFlushCaches() { - foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { + foreach ( array( 'size', 'acquiredcount' ) as $type ) { $this->cache->delete( $this->getCacheKey( $type ) ); } } @@ -585,18 +556,35 @@ class JobQueueDB extends JobQueue { * @return Iterator */ public function getAllQueuedJobs() { + return $this->getJobIterator( array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ); + } + + /** + * @see JobQueue::getAllAcquiredJobs() + * @return Iterator + */ + public function getAllAcquiredJobs() { + return $this->getJobIterator( array( 'job_cmd' => $this->getType(), "job_token > ''" ) ); + } + + /** + * @param array $conds Query conditions + * @return Iterator + */ + protected function getJobIterator( array $conds ) { $dbr = $this->getSlaveDB(); try { return new MappedIterator( - $dbr->select( 'job', self::selectFields(), - array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function ( $row ) use ( $dbr ) { + $dbr->select( 'job', self::selectFields(), $conds ), + function ( $row ) { $job = Job::factory( $row->job_cmd, Title::makeTitle( $row->job_namespace, $row->job_title ), - strlen( $row->job_params ) ? unserialize( $row->job_params ) : false + strlen( $row->job_params ) ? unserialize( $row->job_params ) : array() ); $job->metadata['id'] = $row->job_id; + $job->metadata['timestamp'] = $row->job_timestamp; + return $job; } ); @@ -613,6 +601,10 @@ class JobQueueDB extends JobQueue { protected function doGetSiblingQueuesWithJobs( array $types ) { $dbr = $this->getSlaveDB(); + // @note: this does not check whether the jobs are claimed or not. + // This is useful so JobQueueGroup::pop() also sees queues that only + // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue + // failed jobs so that they can be popped again for that edge case. $res = $dbr->select( 'job', 'DISTINCT job_cmd', array( 'job_cmd' => $types ), __METHOD__ ); @@ -685,9 +677,7 @@ class JobQueueDB extends JobQueue { ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki ); - // The tasks recycled jobs or release delayed jobs into the queue - $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + JobQueue::incrStats( 'recycles', $this->type, $affected ); $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); } } @@ -714,7 +704,7 @@ class JobQueueDB extends JobQueue { $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki ); + JobQueue::incrStats( 'abandons', $this->type, $affected ); } $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php index d985d449..109ca019 100644 --- a/includes/jobqueue/JobQueueFederated.php +++ b/includes/jobqueue/JobQueueFederated.php @@ -93,6 +93,8 @@ class JobQueueFederated extends JobQueue { ) { unset( $baseConfig[$o] ); // partition queue doesn't care about this } + // The class handles all aggregator calls already + unset( $baseConfig['aggregator'] ); // Get the partition queue objects foreach ( $partitionMap as $partition => $w ) { if ( !isset( $params['configByPartition'][$partition] ) ) { @@ -328,7 +330,7 @@ class JobQueueFederated extends JobQueue { return false; } - protected function doDeduplicateRootJob( Job $job ) { + protected function doDeduplicateRootJob( IJobSpecification $job ) { $params = $job->getRootJobParams(); $sigature = $params['rootJobSignature']; $partition = $this->partitionRing->getLiveLocation( $sigature ); @@ -373,27 +375,7 @@ class JobQueueFederated extends JobQueue { $this->throwErrorIfAllPartitionsDown( $failed ); } - protected function doGetPeriodicTasks() { - $tasks = array(); - /** @var JobQueue $queue */ - foreach ( $this->partitionQueues as $partition => $queue ) { - foreach ( $queue->getPeriodicTasks() as $task => $def ) { - $tasks["{$partition}:{$task}"] = $def; - } - } - - return $tasks; - } - protected function doFlushCaches() { - static $types = array( - 'empty', - 'size', - 'acquiredcount', - 'delayedcount', - 'abandonedcount' - ); - /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) { $queue->doFlushCaches(); @@ -422,6 +404,17 @@ class JobQueueFederated extends JobQueue { return $iterator; } + public function getAllAcquiredJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllAcquiredJobs() ); + } + + return $iterator; + } + public function getAllAbandonedJobs() { $iterator = new AppendIterator(); diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php index ebd547a0..5bd1cc94 100644 --- a/includes/jobqueue/JobQueueGroup.php +++ b/includes/jobqueue/JobQueueGroup.php @@ -28,7 +28,7 @@ * @since 1.21 */ class JobQueueGroup { - /** @var array */ + /** @var JobQueueGroup[] */ protected static $instances = array(); /** @var ProcessCacheLRU */ @@ -40,6 +40,9 @@ class JobQueueGroup { /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ protected $coalescedQueues; + /** @var Job[] */ + protected $bufferedJobs = array(); + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job @@ -100,13 +103,13 @@ class JobQueueGroup { } /** - * Insert jobs into the respective queues of with the belong. + * Insert jobs into the respective queues of which they belong * * This inserts the jobs into the queue specified by $wgJobTypeConf * and updates the aggregate job queue information cache as needed. * - * @param Job|Job[] $jobs A single Job or a list of Jobs - * @throws MWException + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @throws InvalidArgumentException * @return void */ public function push( $jobs ) { @@ -115,13 +118,11 @@ class JobQueueGroup { return; } + $this->assertValidJobs( $jobs ); + $jobsByType = array(); // (job type => list of jobs) foreach ( $jobs as $job ) { - if ( $job instanceof IJobSpecification ) { - $jobsByType[$job->getType()][] = $job; - } else { - throw new MWException( "Attempted to push a non-Job object into a queue." ); - } + $jobsByType[$job->getType()][] = $job; } foreach ( $jobsByType as $type => $jobs ) { @@ -137,6 +138,42 @@ class JobQueueGroup { } /** + * Buffer jobs for insertion via push() or call it now if in CLI mode + * + * Note that MediaWiki::restInPeace() calls pushLazyJobs() + * + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @return void + * @since 1.26 + */ + public function lazyPush( $jobs ) { + if ( PHP_SAPI === 'cli' ) { + $this->push( $jobs ); + return; + } + + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + + // Throw errors now instead of on push(), when other jobs may be buffered + $this->assertValidJobs( $jobs ); + + $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs ); + } + + /** + * Push all jobs buffered via lazyPush() into their respective queues + * + * @return void + * @since 1.26 + */ + public static function pushLazyJobs() { + foreach ( self::$instances as $group ) { + $group->push( $group->bufferedJobs ); + $group->bufferedJobs = array(); + } + } + + /** * Pop a job off one of the job queues * * This pops a job off a queue as specified by $wgJobTypeConf and @@ -188,10 +225,10 @@ class JobQueueGroup { * Acknowledge that a job was completed * * @param Job $job - * @return bool + * @return void */ public function ack( Job $job ) { - return $this->get( $job->getType() )->ack( $job ); + $this->get( $job->getType() )->ack( $job ); } /** @@ -211,7 +248,6 @@ class JobQueueGroup { * This does nothing for certain queue classes. * * @return void - * @throws MWException */ public function waitForBackups() { global $wgJobTypeConf; @@ -342,69 +378,6 @@ class JobQueueGroup { } /** - * Execute any due periodic queue maintenance tasks for all queues. - * - * A task is "due" if the time ellapsed since the last run is greater than - * the defined run period. Concurrent calls to this function will cause tasks - * to be attempted twice, so they may need their own methods of mutual exclusion. - * - * @return int Number of tasks run - */ - public function executeReadyPeriodicTasks() { - global $wgMemc; - - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); - $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) - - $count = 0; - $tasksRun = array(); // (queue => task => UNIX timestamp) - foreach ( $this->getQueueTypes() as $type ) { - $queue = $this->get( $type ); - foreach ( $queue->getPeriodicTasks() as $task => $definition ) { - if ( $definition['period'] <= 0 ) { - continue; // disabled - } elseif ( !isset( $lastRuns[$type][$task] ) - || $lastRuns[$type][$task] < ( time() - $definition['period'] ) - ) { - try { - if ( call_user_func( $definition['callback'] ) !== null ) { - $tasksRun[$type][$task] = time(); - ++$count; - } - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - } - } - - if ( $count === 0 ) { - return $count; // nothing to update - } - - $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) { - if ( is_array( $lastRuns ) ) { - foreach ( $tasksRun as $type => $tasks ) { - foreach ( $tasks as $task => $timestamp ) { - if ( !isset( $lastRuns[$type][$task] ) - || $timestamp > $lastRuns[$type][$task] - ) { - $lastRuns[$type][$task] = $timestamp; - } - } - } - } else { - $lastRuns = $tasksRun; - } - - return $lastRuns; - } ); - - return $count; - } - - /** * @param string $name * @return mixed */ @@ -427,4 +400,24 @@ class JobQueueGroup { } } } + + /** + * @param array $jobs + * @throws InvalidArgumentException + */ + private function assertValidJobs( array $jobs ) { + foreach ( $jobs as $job ) { // sanity checks + if ( !( $job instanceof IJobSpecification ) ) { + throw new InvalidArgumentException( "Expected IJobSpecification objects" ); + } + } + } + + function __destruct() { + $n = count( $this->bufferedJobs ); + if ( $n > 0 ) { + $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) ); + trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." ); + } + } } diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 6c823fb9..29c8068a 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -81,6 +81,7 @@ class JobQueueRedis extends JobQueue { * - daemonized : Set to true if the redisJobRunnerService runs in the background. * This will disable job recycling/undelaying from the MediaWiki side * to avoid redundance and out-of-sync configuration. + * @throws InvalidArgumentException */ public function __construct( array $params ) { parent::__construct( $params ); @@ -89,7 +90,7 @@ class JobQueueRedis extends JobQueue { $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); if ( empty( $params['daemonized'] ) ) { - throw new Exception( + throw new InvalidArgumentException( "Non-daemonized mode is no longer supported. Please install the " . "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); } @@ -110,7 +111,7 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doIsEmpty() * @return bool - * @throws MWException + * @throws JobQueueError */ protected function doIsEmpty() { return $this->doGetSize() == 0; @@ -119,7 +120,7 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doGetSize() * @return int - * @throws MWException + * @throws JobQueueError */ protected function doGetSize() { $conn = $this->getConnection(); @@ -205,7 +206,7 @@ class JobQueueRedis extends JobQueue { if ( $flags & self::QOS_ATOMIC ) { $batches = array( $items ); // all or nothing } else { - $batches = array_chunk( $items, 500 ); // avoid tying up the server + $batches = array_chunk( $items, 100 ); // avoid tying up the server } $failed = 0; $pushed = 0; @@ -222,9 +223,9 @@ class JobQueueRedis extends JobQueue { throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." ); } - JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki ); - JobQueue::incrStats( 'job-insert-duplicate', $this->type, - count( $items ) - $failed - $pushed, $this->wiki ); + JobQueue::incrStats( 'inserts', $this->type, count( $items ) ); + JobQueue::incrStats( 'dupe_inserts', $this->type, + count( $items ) - $failed - $pushed ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } @@ -300,7 +301,7 @@ LUA; break; // no jobs; nothing to do } - JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); + JobQueue::incrStats( 'job-pop', $this->type ); $item = $this->unserialize( $blob ); if ( $item === false ) { wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); @@ -356,13 +357,15 @@ LUA; * @see JobQueue::doAck() * @param Job $job * @return Job|bool - * @throws MWException|JobQueueError + * @throws UnexpectedValueException + * @throws JobQueueError */ protected function doAck( Job $job ) { if ( !isset( $job->metadata['uuid'] ) ) { - throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); + throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); } + $uuid = $job->metadata['uuid']; $conn = $this->getConnection(); try { static $script = @@ -379,16 +382,18 @@ LUA; $this->getQueueKey( 'z-claimed' ), # KEYS[1] $this->getQueueKey( 'h-attempts' ), # KEYS[2] $this->getQueueKey( 'h-data' ), # KEYS[3] - $job->metadata['uuid'] # ARGV[1] + $uuid # ARGV[1] ), 3 # number of first argument(s) that are keys ); if ( !$res ) { - wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." ); return false; } + + JobQueue::incrStats( 'job-ack', $this->type ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } @@ -398,13 +403,14 @@ LUA; /** * @see JobQueue::doDeduplicateRootJob() - * @param Job $job + * @param IJobSpecification $job * @return bool - * @throws MWException|JobQueueError + * @throws JobQueueError + * @throws LogicException */ - protected function doDeduplicateRootJob( Job $job ) { + protected function doDeduplicateRootJob( IJobSpecification $job ) { if ( !$job->hasRootJobParams() ) { - throw new MWException( "Cannot register root job; missing parameters." ); + throw new LogicException( "Cannot register root job; missing parameters." ); } $params = $job->getRootJobParams(); @@ -441,6 +447,7 @@ LUA; // Get the last time this root job was enqueued $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); } catch ( RedisException $e ) { + $timestamp = false; $this->throwRedisException( $conn, $e ); } @@ -473,70 +480,84 @@ LUA; /** * @see JobQueue::getAllQueuedJobs() * @return Iterator + * @throws JobQueueError */ public function getAllQueuedJobs() { $conn = $this->getConnection(); try { - $that = $this; - - return new MappedIterator( - $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), - function ( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { - return is_object( $job ); - } ) - ); + $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } + + return $this->getJobIterator( $conn, $uids ); } /** - * @see JobQueue::getAllQueuedJobs() + * @see JobQueue::getAllDelayedJobs() * @return Iterator + * @throws JobQueueError */ public function getAllDelayedJobs() { $conn = $this->getConnection(); try { - $that = $this; - - return new MappedIterator( // delayed jobs - $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), - function ( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { - return is_object( $job ); - } ) - ); + $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @see JobQueue::getAllAcquiredJobs() + * @return Iterator + * @throws JobQueueError + */ + public function getAllAcquiredJobs() { + $conn = $this->getConnection(); + try { + $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $this->getJobIterator( $conn, $uids ); } /** * @see JobQueue::getAllAbandonedJobs() * @return Iterator + * @throws JobQueueError */ public function getAllAbandonedJobs() { $conn = $this->getConnection(); try { - $that = $this; - - return new MappedIterator( // delayed jobs - $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ), - function ( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { - return is_object( $job ); - } ) - ); + $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @param RedisConnRef $conn + * @param array $uids List of job UUIDs + * @return MappedIterator + */ + protected function getJobIterator( RedisConnRef $conn, array $uids ) { + $that = $this; + + return new MappedIterator( + $uids, + function ( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) + ); } public function getCoalesceLocationInternal() { @@ -575,7 +596,8 @@ LUA; * @param string $uid * @param RedisConnRef $conn * @return Job|bool Returns false if the job does not exist - * @throws MWException|JobQueueError + * @throws JobQueueError + * @throws UnexpectedValueException */ public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { try { @@ -583,13 +605,16 @@ LUA; if ( $data === false ) { return false; // not found } - $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); + $item = $this->unserialize( $data ); if ( !is_array( $item ) ) { // this shouldn't happen - throw new MWException( "Could not find job with ID '$uid'." ); + throw new UnexpectedValueException( "Could not find job with ID '$uid'." ); } $title = Title::makeTitle( $item['namespace'], $item['title'] ); $job = Job::factory( $item['type'], $title, $item['params'] ); $job->metadata['uuid'] = $item['uuid']; + $job->metadata['timestamp'] = $item['timestamp']; + // Add in attempt count for debugging at showJobs.php + $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ); return $job; } catch ( RedisException $e ) { @@ -598,13 +623,6 @@ LUA; } /** - * @return array - */ - protected function doGetPeriodicTasks() { - return array(); // managed in the runner loop - } - - /** * @param IJobSpecification $job * @return array */ @@ -631,15 +649,12 @@ LUA; * @return Job|bool */ protected function getJobFromFields( array $fields ) { - $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); - if ( $title ) { - $job = Job::factory( $fields['type'], $title, $fields['params'] ); - $job->metadata['uuid'] = $fields['uuid']; + $title = Title::makeTitle( $fields['namespace'], $fields['title'] ); + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['uuid'] = $fields['uuid']; + $job->metadata['timestamp'] = $fields['timestamp']; - return $job; - } - - return false; + return $job; } /** diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index b8c5d6cf..13043629 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -36,6 +36,11 @@ class JobRunner implements LoggerAwareInterface { protected $debug; /** + * @var LoggerInterface $logger + */ + protected $logger; + + /** * @param callable $debug Optional debug output handler */ public function setDebugHandler( $debug ) { @@ -43,12 +48,8 @@ class JobRunner implements LoggerAwareInterface { } /** - * @var LoggerInterface $logger - */ - protected $logger; - - /** * @param LoggerInterface $logger + * @return void */ public function setLogger( LoggerInterface $logger ) { $this->logger = $logger; @@ -88,7 +89,7 @@ class JobRunner implements LoggerAwareInterface { * @return array Summary response that can easily be JSON serialized */ public function run( array $options ) { - global $wgJobClasses; + global $wgJobClasses, $wgTrxProfilerLimits; $response = array( 'jobs' => array(), 'reached' => 'none-ready' ); @@ -102,43 +103,43 @@ class JobRunner implements LoggerAwareInterface { return $response; } - $group = JobQueueGroup::singleton(); - // Handle any required periodic queue maintenance - $count = $group->executeReadyPeriodicTasks(); - if ( $count > 0 ) { - $msg = "Executed $count periodic queue task(s)."; - $this->logger->debug( $msg ); - $this->debugCallback( $msg ); - } - // Bail out if in read-only mode if ( wfReadOnly() ) { $response['reached'] = 'read-only'; return $response; } - // Bail out if there is too much DB lag - list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag(); - if ( $maxLag >= 5 ) { + // Catch huge single updates that lead to slave lag + $trxProfiler = Profiler::instance()->getTransactionProfiler(); + $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); + $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ ); + + // Bail out if there is too much DB lag. + // This check should not block as we want to try other wiki queues. + $maxAllowedLag = 3; + list( , $maxLag ) = wfGetLB( wfWikiID() )->getMaxLag(); + if ( $maxLag >= $maxAllowedLag ) { $response['reached'] = 'slave-lag-limit'; return $response; } + $group = JobQueueGroup::singleton(); + // Flush any pending DB writes for sanity - wfGetLBFactory()->commitMasterChanges(); + wfGetLBFactory()->commitAll(); // Some jobs types should not run until a certain timestamp $backoffs = array(); // map of (type => UNIX expiry) $backoffDeltas = array(); // map of (type => seconds) $wait = 'wait'; // block to read backoffs the first time - $jobsRun = 0; + $stats = RequestContext::getMain()->getStats(); + $jobsPopped = 0; $timeMsTotal = 0; $flags = JobQueueGroup::USE_CACHE; - $checkPeriod = 5.0; // seconds - $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes $startTime = microtime( true ); // time since jobs started running - $lastTime = microtime( true ) - $checkPhase; // time since last slave check + $checkLagPeriod = 1.0; // check slave lag this many seconds + $lastCheckTime = 1; // timestamp of last slave check do { // Sync the persistent backoffs with concurrent runners $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); @@ -154,6 +155,7 @@ class JobRunner implements LoggerAwareInterface { } if ( $job ) { // found a job + $popTime = time(); $jType = $job->getType(); // Back off of certain jobs for a while (for throttling and for errors) @@ -168,25 +170,47 @@ class JobRunner implements LoggerAwareInterface { } $msg = $job->toString() . " STARTING"; - $this->logger->info( $msg ); + $this->logger->debug( $msg ); $this->debugCallback( $msg ); // Run the job... $jobStartTime = microtime( true ); try { - ++$jobsRun; + ++$jobsPopped; $status = $job->run(); $error = $job->getLastError(); - wfGetLBFactory()->commitMasterChanges(); + $this->commitMasterChanges( $job ); + + DeferredUpdates::doUpdates(); + $this->commitMasterChanges( $job ); } catch ( Exception $e ) { MWExceptionHandler::rollbackMasterChangesAndLog( $e ); $status = false; $error = get_class( $e ) . ': ' . $e->getMessage(); MWExceptionHandler::logException( $e ); } + // Commit all outstanding connections that are in a transaction + // to get a fresh repeatable read snapshot on every connection. + wfGetLBFactory()->commitAll(); $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); $timeMsTotal += $timeMs; + // Record how long jobs wait before getting popped + $readyTs = $job->getReadyTimestamp(); + if ( $readyTs ) { + $pickupDelay = $popTime - $readyTs; + $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay ); + $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay ); + } + // Record root job age for jobs being run + $root = $job->getRootJobParams(); + if ( $root['rootJobTimestamp'] ) { + $age = $popTime - wfTimestamp( TS_UNIX, $root['rootJobTimestamp'] ); + $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age ); + } + // Track the execution time for jobs + $stats->timing( "jobqueue.run.$jType", $timeMs ); + // Mark the job as done on success or when the job cannot be retried if ( $status !== false || !$job->allowRetries() ) { $group->ack( $job ); // done @@ -218,7 +242,7 @@ class JobRunner implements LoggerAwareInterface { ); // Break out if we hit the job count or wall time limits... - if ( $maxJobs && $jobsRun >= $maxJobs ) { + if ( $maxJobs && $jobsPopped >= $maxJobs ) { $response['reached'] = 'job-limit'; break; } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) { @@ -229,21 +253,24 @@ class JobRunner implements LoggerAwareInterface { // Don't let any of the main DB slaves get backed up. // This only waits for so long before exiting and letting // other wikis in the farm (on different masters) get a chance. - $timePassed = microtime( true ) - $lastTime; - if ( $timePassed >= 5 || $timePassed < 0 ) { - if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) { + $timePassed = microtime( true ) - $lastCheckTime; + if ( $timePassed >= $checkLagPeriod || $timePassed < 0 ) { + if ( !wfWaitForSlaves( $lastCheckTime, false, '*', $maxAllowedLag ) ) { $response['reached'] = 'slave-lag-limit'; break; } - $lastTime = microtime( true ); + $lastCheckTime = microtime( true ); } // Don't let any queue slaves/backups fall behind - if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) { + if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) { $group->waitForBackups(); } // Bail if near-OOM instead of in a job - $this->assertMemoryOK(); + if ( !$this->checkMemoryOK() ) { + $response['reached'] = 'memory-limit'; + break; + } } } while ( $job ); // stop when there are no jobs @@ -298,7 +325,6 @@ class JobRunner implements LoggerAwareInterface { * @return array Map of (job type => backoff expiry timestamp) */ private function loadBackoffs( array $backoffs, $mode = 'wait' ) { - $file = wfTempDir() . '/mw-runJobs-backoffs.json'; if ( is_file( $file ) ) { $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; @@ -336,7 +362,6 @@ class JobRunner implements LoggerAwareInterface { * @return array The new backoffs account for $backoffs and the latest file data */ private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { - if ( !$deltas ) { return $this->loadBackoffs( $backoffs, $mode ); } @@ -374,9 +399,9 @@ class JobRunner implements LoggerAwareInterface { /** * Make sure that this script is not too close to the memory usage limit. * It is better to die in between jobs than OOM right in the middle of one. - * @throws MWException + * @return bool */ - private function assertMemoryOK() { + private function checkMemoryOK() { static $maxBytes = null; if ( $maxBytes === null ) { $m = array(); @@ -390,8 +415,14 @@ class JobRunner implements LoggerAwareInterface { } $usedBytes = memory_get_usage(); if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { - throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." ); + $msg = "Detected excessive memory usage ($usedBytes/$maxBytes)."; + $this->debugCallback( $msg ); + $this->logger->error( $msg ); + + return false; } + + return true; } /** @@ -403,4 +434,68 @@ class JobRunner implements LoggerAwareInterface { call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) ); } } + + /** + * Issue a commit on all masters who are currently in a transaction and have + * made changes to the database. It also supports sometimes waiting for the + * local wiki's slaves to catch up. See the documentation for + * $wgJobSerialCommitThreshold for more. + * + * @param Job $job + * @throws DBError + */ + private function commitMasterChanges( Job $job ) { + global $wgJobSerialCommitThreshold; + + $lb = wfGetLB( wfWikiID() ); + if ( $wgJobSerialCommitThreshold !== false ) { + // Generally, there is one master connection to the local DB + $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); + } else { + $dbwSerial = false; + } + + if ( !$dbwSerial + || !$dbwSerial->namedLocksEnqueue() + || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold + ) { + // Writes are all to foreign DBs, named locks don't form queues, + // or $wgJobSerialCommitThreshold is not reached; commit changes now + wfGetLBFactory()->commitMasterChanges(); + return; + } + + $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() ); + $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); + + // Wait for an exclusive lock to commit + if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) { + // This will trigger a rollback in the main loop + throw new DBError( $dbwSerial, "Timed out waiting on commit queue." ); + } + // Wait for the generic slave to catch up + $pos = $lb->getMasterPos(); + if ( $pos ) { + $lb->waitForOne( $pos ); + } + + $fname = __METHOD__; + // Re-ping all masters with transactions. This throws DBError if some + // connection died while waiting on locks/slaves, triggering a rollback. + wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) use ( $fname ) { + $lb->forEachOpenConnection( function( DatabaseBase $conn ) use ( $fname ) { + if ( $conn->writesOrCallbacksPending() ) { + $conn->query( "SELECT 1", $fname ); + } + } ); + } ); + + // Actually commit the DB master changes + wfGetLBFactory()->commitMasterChanges(); + + // Release the lock + $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ ); + } } diff --git a/includes/jobqueue/JobSpecification.php b/includes/jobqueue/JobSpecification.php index 42d2a39b..d59c09b5 100644 --- a/includes/jobqueue/JobSpecification.php +++ b/includes/jobqueue/JobSpecification.php @@ -59,6 +59,26 @@ interface IJobSpecification { public function getDeduplicationInfo(); /** + * @see JobQueue::deduplicateRootJob() + * @return array + * @since 1.26 + */ + public function getRootJobParams(); + + /** + * @see JobQueue::deduplicateRootJob() + * @return bool + * @since 1.22 + */ + public function hasRootJobParams(); + + /** + * @see JobQueue::deduplicateRootJob() + * @return bool Whether this is job is a root job + */ + public function isRootJob(); + + /** * @return Title Descriptive title (this can simply be informative) */ public function getTitle(); @@ -68,7 +88,7 @@ interface IJobSpecification { * Job queue task description base code * * Example usage: - * <code> + * @code * $job = new JobSpecification( * 'null', * array( 'lives' => 1, 'usleep' => 100, 'pi' => 3.141569 ), @@ -76,7 +96,7 @@ interface IJobSpecification { * Title::makeTitle( NS_SPECIAL, 'nullity' ) * ); * JobQueueGroup::singleton()->push( $job ) - * </code> + * @endcode * * @ingroup JobQueue * @since 1.23 @@ -97,7 +117,7 @@ class JobSpecification implements IJobSpecification { /** * @param string $type * @param array $params Map of key/values - * @param array $opts Map of key/values + * @param array $opts Map of key/values; includes 'removeDuplicates' * @param Title $title Optional descriptive title */ public function __construct( @@ -108,7 +128,7 @@ class JobSpecification implements IJobSpecification { $this->type = $type; $this->params = $params; - $this->title = $title ?: Title::newMainPage(); + $this->title = $title ?: Title::makeTitle( NS_SPECIAL, 'Badtitle/' . get_class( $this ) ); $this->opts = $opts; } @@ -125,51 +145,28 @@ class JobSpecification implements IJobSpecification { } } - /** - * @return string - */ public function getType() { return $this->type; } - /** - * @return Title - */ public function getTitle() { return $this->title; } - /** - * @return array - */ public function getParams() { return $this->params; } - /** - * @return int|null UNIX timestamp to delay running this job until, otherwise null - */ public function getReleaseTimestamp() { return isset( $this->params['jobReleaseTimestamp'] ) ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) : null; } - /** - * @return bool Whether only one of each identical set of jobs should be run - */ public function ignoreDuplicates() { return !empty( $this->opts['removeDuplicates'] ); } - /** - * Subclasses may need to override this to make duplication detection work. - * The resulting map conveys everything that makes the job unique. This is - * only checked if ignoreDuplicates() returns true, meaning that duplicate - * jobs are supposed to be ignored. - * - * @return array Map of key/values - */ public function getDeduplicationInfo() { $info = array( 'type' => $this->getType(), @@ -188,6 +185,26 @@ class JobSpecification implements IJobSpecification { return $info; } + public function getRootJobParams() { + return array( + 'rootJobSignature' => isset( $this->params['rootJobSignature'] ) + ? $this->params['rootJobSignature'] + : null, + 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] ) + ? $this->params['rootJobTimestamp'] + : null + ); + } + + public function hasRootJobParams() { + return isset( $this->params['rootJobSignature'] ) + && isset( $this->params['rootJobTimestamp'] ); + } + + public function isRootJob() { + return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] ); + } + /** * @return array Field/value map that can immediately be serialized * @since 1.25 diff --git a/includes/jobqueue/aggregator/JobQueueAggregator.php b/includes/jobqueue/aggregator/JobQueueAggregator.php index febc277a..aa02d1fa 100644 --- a/includes/jobqueue/aggregator/JobQueueAggregator.php +++ b/includes/jobqueue/aggregator/JobQueueAggregator.php @@ -169,4 +169,4 @@ class JobQueueAggregatorNull extends JobQueueAggregator { protected function doPurge() { return true; } -}
\ No newline at end of file +} diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php index 847dd6f4..6c49646b 100644 --- a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php +++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php @@ -77,6 +77,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { try { $conn->multi( Redis::PIPELINE ); $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); + $conn->sAdd( $this->getWikiSetKey(), $wiki ); $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); $conn->exec(); @@ -198,6 +199,13 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { } /** + * @return string + */ + private function getWikiSetKey() { + return "jobqueue:aggregator:s-wikis:v2"; // global + } + + /** * @param string $type * @param string $wiki * @return string diff --git a/includes/jobqueue/jobs/ActivityUpdateJob.php b/includes/jobqueue/jobs/ActivityUpdateJob.php new file mode 100644 index 00000000..f146e6e8 --- /dev/null +++ b/includes/jobqueue/jobs/ActivityUpdateJob.php @@ -0,0 +1,75 @@ +<?php +/** + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + * @ingroup JobQueue + */ + +/** + * Job for updating user activity like "last viewed" timestamps + * + * @ingroup JobQueue + * @since 1.26 + */ +class ActivityUpdateJob extends Job { + function __construct( Title $title, array $params ) { + parent::__construct( 'activityUpdateJob', $title, $params ); + + if ( !isset( $params['type'] ) ) { + throw new InvalidArgumentException( "Missing 'type' parameter." ); + } + + $this->removeDuplicates = true; + } + + public function run() { + if ( $this->params['type'] === 'updateWatchlistNotification' ) { + $this->updateWatchlistNotification(); + } else { + throw new Exception( "Invalid 'type' parameter '{$this->params['type']}'." ); + } + + return true; + } + + protected function updateWatchlistNotification() { + $casTimestamp = ( $this->params['notifTime'] !== null ) + ? $this->params['notifTime'] + : $this->params['curTime']; + + $dbw = wfGetDB( DB_MASTER ); + $dbw->update( 'watchlist', + array( + 'wl_notificationtimestamp' => $dbw->timestampOrNull( $this->params['notifTime'] ) + ), + array( + 'wl_user' => $this->params['userid'], + 'wl_namespace' => $this->title->getNamespace(), + 'wl_title' => $this->title->getDBkey(), + // Add a "check and set" style comparison to handle conflicts. + // The inequality always avoids updates when the current value + // is already NULL per ANSI SQL. This is desired since NULL means + // that the user is "caught up" on edits already. When the field + // is non-NULL, make sure not to set it back in time or set it to + // NULL when newer revisions were in fact added to the page. + 'wl_notificationtimestamp < ' . $dbw->addQuotes( $dbw->timestamp( $casTimestamp ) ) + ), + __METHOD__ + ); + } +} diff --git a/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/includes/jobqueue/jobs/AssembleUploadChunksJob.php index b7f09e77..a1de77e6 100644 --- a/includes/jobqueue/jobs/AssembleUploadChunksJob.php +++ b/includes/jobqueue/jobs/AssembleUploadChunksJob.php @@ -27,7 +27,7 @@ * @ingroup Upload */ class AssembleUploadChunksJob extends Job { - public function __construct( $title, $params ) { + public function __construct( Title $title, array $params ) { parent::__construct( 'AssembleUploadChunks', $title, $params ); $this->removeDuplicates = true; } diff --git a/includes/jobqueue/jobs/DoubleRedirectJob.php b/includes/jobqueue/jobs/DoubleRedirectJob.php index 2561f2f1..ab638967 100644 --- a/includes/jobqueue/jobs/DoubleRedirectJob.php +++ b/includes/jobqueue/jobs/DoubleRedirectJob.php @@ -41,6 +41,16 @@ class DoubleRedirectJob extends Job { private static $user; /** + * @param Title $title + * @param array $params + */ + function __construct( Title $title, array $params ) { + parent::__construct( 'fixDoubleRedirect', $title, $params ); + $this->reason = $params['reason']; + $this->redirTitle = Title::newFromText( $params['redirTitle'] ); + } + + /** * Insert jobs into the job queue to fix redirects to the given title * @param string $reason The reason for the fix, see message * "double-redirect-fixed-<reason>" @@ -82,16 +92,6 @@ class DoubleRedirectJob extends Job { } /** - * @param Title $title - * @param array|bool $params - */ - function __construct( $title, $params = false ) { - parent::__construct( 'fixDoubleRedirect', $title, $params ); - $this->reason = $params['reason']; - $this->redirTitle = Title::newFromText( $params['redirTitle'] ); - } - - /** * @return bool */ function run() { diff --git a/includes/jobqueue/jobs/DuplicateJob.php b/includes/jobqueue/jobs/DuplicateJob.php index c5e3a234..068d5319 100644 --- a/includes/jobqueue/jobs/DuplicateJob.php +++ b/includes/jobqueue/jobs/DuplicateJob.php @@ -33,7 +33,7 @@ final class DuplicateJob extends Job { * @param Title $title * @param array $params Job parameters */ - function __construct( $title, $params ) { + function __construct( Title $title, array $params ) { parent::__construct( 'duplicate', $title, $params ); } diff --git a/includes/jobqueue/jobs/EmaillingJob.php b/includes/jobqueue/jobs/EmaillingJob.php index df8ae63e..beeb0673 100644 --- a/includes/jobqueue/jobs/EmaillingJob.php +++ b/includes/jobqueue/jobs/EmaillingJob.php @@ -28,7 +28,7 @@ * @ingroup JobQueue */ class EmaillingJob extends Job { - function __construct( $title, $params ) { + function __construct( Title $title = null, array $params ) { parent::__construct( 'sendMail', Title::newMainPage(), $params ); } @@ -38,7 +38,7 @@ class EmaillingJob extends Job { $this->params['from'], $this->params['subj'], $this->params['body'], - $this->params['replyto'] + array( 'replyTo' => $this->params['replyto'] ) ); return $status->isOK(); diff --git a/includes/jobqueue/jobs/EnotifNotifyJob.php b/includes/jobqueue/jobs/EnotifNotifyJob.php index 1ed99a58..9a5c3c72 100644 --- a/includes/jobqueue/jobs/EnotifNotifyJob.php +++ b/includes/jobqueue/jobs/EnotifNotifyJob.php @@ -27,7 +27,7 @@ * @ingroup JobQueue */ class EnotifNotifyJob extends Job { - function __construct( $title, $params ) { + function __construct( Title $title, array $params ) { parent::__construct( 'enotifNotify', $title, $params ); } diff --git a/includes/jobqueue/jobs/EnqueueJob.php b/includes/jobqueue/jobs/EnqueueJob.php index 46fb2aa7..c7ee9b65 100644 --- a/includes/jobqueue/jobs/EnqueueJob.php +++ b/includes/jobqueue/jobs/EnqueueJob.php @@ -40,13 +40,13 @@ final class EnqueueJob extends Job { * @param Title $title * @param array $params Job parameters */ - function __construct( $title, $params ) { + function __construct( Title $title, array $params ) { parent::__construct( 'enqueue', $title, $params ); } /** - * @param Job|JobSpecification|array $jobs - * @return JobRouteJob + * @param JobSpecification|JobSpecification[] $jobs + * @return EnqueueJob */ public static function newFromLocalJobs( $jobs ) { $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); @@ -56,9 +56,11 @@ final class EnqueueJob extends Job { /** * @param array $jobsByWiki Map of (wiki => JobSpecification list) - * @return JobRouteJob + * @return EnqueueJob */ public static function newFromJobsByWiki( array $jobsByWiki ) { + $deduplicate = true; + $jobMapsByWiki = array(); foreach ( $jobsByWiki as $wiki => $jobs ) { $jobMapsByWiki[$wiki] = array(); @@ -68,10 +70,19 @@ final class EnqueueJob extends Job { } else { throw new InvalidArgumentException( "Jobs must be of type JobSpecification." ); } + $deduplicate = $deduplicate && $job->ignoreDuplicates(); } } - return new self( Title::newMainPage(), array( 'jobsByWiki' => $jobMapsByWiki ) ); + $eJob = new self( + Title::makeTitle( NS_SPECIAL, 'Badtitle/' . __CLASS__ ), + array( 'jobsByWiki' => $jobMapsByWiki ) + ); + // If *all* jobs to be pushed are to be de-duplicated (a common case), then + // de-duplicate this whole job itself to avoid build up in high traffic cases + $eJob->removeDuplicates = $deduplicate; + + return $eJob; } public function run() { diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php index e5e521c3..a9010c25 100644 --- a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php +++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php @@ -34,7 +34,7 @@ * @ingroup JobQueue */ class HTMLCacheUpdateJob extends Job { - function __construct( $title, $params = '' ) { + function __construct( Title $title, array $params ) { parent::__construct( 'htmlCacheUpdate', $title, $params ); // Base backlink purge jobs can be de-duplicated $this->removeDuplicates = ( !isset( $params['range'] ) && !isset( $params['pages'] ) ); diff --git a/includes/jobqueue/jobs/NullJob.php b/includes/jobqueue/jobs/NullJob.php index f94d6ebc..26d3c5c8 100644 --- a/includes/jobqueue/jobs/NullJob.php +++ b/includes/jobqueue/jobs/NullJob.php @@ -49,7 +49,7 @@ class NullJob extends Job { * @param Title $title * @param array $params Job parameters (lives, usleep) */ - function __construct( $title, $params ) { + function __construct( Title $title, array $params ) { parent::__construct( 'null', $title, $params ); if ( !isset( $this->params['lives'] ) ) { $this->params['lives'] = 1; diff --git a/includes/jobqueue/jobs/PublishStashedFileJob.php b/includes/jobqueue/jobs/PublishStashedFileJob.php index a922dd3d..8a180ec3 100644 --- a/includes/jobqueue/jobs/PublishStashedFileJob.php +++ b/includes/jobqueue/jobs/PublishStashedFileJob.php @@ -29,7 +29,7 @@ * @ingroup JobQueue */ class PublishStashedFileJob extends Job { - public function __construct( $title, $params ) { + public function __construct( Title $title, array $params ) { parent::__construct( 'PublishStashedFile', $title, $params ); $this->removeDuplicates = true; } diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php index cc04595d..d6fa26b8 100644 --- a/includes/jobqueue/jobs/RecentChangesUpdateJob.php +++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php @@ -27,7 +27,7 @@ * @since 1.25 */ class RecentChangesUpdateJob extends Job { - function __construct( $title, $params ) { + function __construct( Title $title, array $params ) { parent::__construct( 'recentChangesUpdate', $title, $params ); if ( !isset( $params['type'] ) ) { @@ -75,11 +75,13 @@ class RecentChangesUpdateJob extends Job { $lockKey = wfWikiID() . ':recentchanges-prune'; $dbw = wfGetDB( DB_MASTER ); - if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) { + if ( !$dbw->lockIsFree( $lockKey, __METHOD__ ) + || !$dbw->lock( $lockKey, __METHOD__, 1 ) + ) { return; // already in progress } - $batchSize = 100; // Avoid slave lag + $batchSize = 100; // avoid slave lag $cutoff = $dbw->timestamp( time() - $wgRCMaxAge ); do { $rcIds = $dbw->selectFieldValues( 'recentchanges', diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php index 1252b0b5..935d2fb1 100644 --- a/includes/jobqueue/jobs/RefreshLinksJob.php +++ b/includes/jobqueue/jobs/RefreshLinksJob.php @@ -37,7 +37,9 @@ class RefreshLinksJob extends Job { const PARSE_THRESHOLD_SEC = 1.0; - function __construct( $title, $params = '' ) { + const CLOCK_FUDGE = 10; + + function __construct( Title $title, array $params ) { parent::__construct( 'refreshLinks', $title, $params ); // A separate type is used just for cascade-protected backlinks if ( !empty( $this->params['prioritize'] ) ) { @@ -109,9 +111,6 @@ class RefreshLinksJob extends Job { * @return bool */ protected function runForTitle( Title $title = null ) { - $linkCache = LinkCache::singleton(); - $linkCache->clear(); - if ( is_null( $title ) ) { $this->setLastError( "refreshLinks: Invalid title" ); return false; @@ -124,14 +123,18 @@ class RefreshLinksJob extends Job { wfGetLB()->waitFor( $this->params['masterPos'] ); } - $page = WikiPage::factory( $title ); + // Clear out title cache data from prior job transaction snapshots + $linkCache = LinkCache::singleton(); + $linkCache->clear(); - // Fetch the current revision... + // Fetch the current page and revision... + $page = WikiPage::factory( $title ); $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL ); if ( !$revision ) { $this->setLastError( "refreshLinks: Article not found {$title->getPrefixedDBkey()}" ); return false; // XXX: what if it was just deleted? } + $content = $revision->getContent( Revision::RAW ); if ( !$content ) { // If there is no content, pretend the content is empty @@ -140,34 +143,50 @@ class RefreshLinksJob extends Job { $parserOutput = false; $parserOptions = $page->makeParserOptions( 'canonical' ); - // If page_touched changed after this root job (with a good slave lag skew factor), - // then it is likely that any views of the pages already resulted in re-parses which - // are now in cache. This can be reused to avoid expensive parsing in some cases. + // If page_touched changed after this root job, then it is likely that + // any views of the pages already resulted in re-parses which are now in + // cache. The cache can be reused to avoid expensive parsing in some cases. if ( isset( $this->params['rootJobTimestamp'] ) ) { - $skewedTimestamp = wfTimestamp( TS_UNIX, $this->params['rootJobTimestamp'] ) + 5; - if ( $page->getLinksTimestamp() > wfTimestamp( TS_MW, $skewedTimestamp ) ) { + $opportunistic = !empty( $this->params['isOpportunistic'] ); + + $skewedTimestamp = $this->params['rootJobTimestamp']; + if ( $opportunistic ) { + // Neither clock skew nor DB snapshot/slave lag matter much for such + // updates; focus on reusing the (often recently updated) cache + } else { + // For transclusion updates, the template changes must be reflected + $skewedTimestamp = wfTimestamp( TS_MW, + wfTimestamp( TS_UNIX, $skewedTimestamp ) + self::CLOCK_FUDGE + ); + } + + if ( $page->getLinksTimestamp() > $skewedTimestamp ) { // Something already updated the backlinks since this job was made return true; } - if ( $page->getTouched() > wfTimestamp( TS_MW, $skewedTimestamp ) ) { + + if ( $page->getTouched() >= $skewedTimestamp || $opportunistic ) { + // Something bumped page_touched since this job was made + // or the cache is otherwise suspected to be up-to-date $parserOutput = ParserCache::singleton()->getDirty( $page, $parserOptions ); - if ( $parserOutput && $parserOutput->getCacheTime() <= $skewedTimestamp ) { + if ( $parserOutput && $parserOutput->getCacheTime() < $skewedTimestamp ) { $parserOutput = false; // too stale } } } + // Fetch the current revision and parse it if necessary... if ( $parserOutput == false ) { $start = microtime( true ); // Revision ID must be passed to the parser output to get revision variables correct $parserOutput = $content->getParserOutput( $title, $revision->getId(), $parserOptions, false ); - $ellapsed = microtime( true ) - $start; + $elapsed = microtime( true ) - $start; // If it took a long time to render, then save this back to the cache to avoid // wasted CPU by other apaches or job runners. We don't want to always save to // cache as this can cause high cache I/O and LRU churn when a template changes. - if ( $ellapsed >= self::PARSE_THRESHOLD_SEC - && $page->isParserCacheUsed( $parserOptions, $revision->getId() ) + if ( $elapsed >= self::PARSE_THRESHOLD_SEC + && $page->shouldCheckParserCache( $parserOptions, $revision->getId() ) && $parserOutput->isCacheable() ) { $ctime = wfTimestamp( TS_MW, (int)$start ); // cache time diff --git a/includes/jobqueue/jobs/ThumbnailRenderJob.php b/includes/jobqueue/jobs/ThumbnailRenderJob.php index ab381388..f558c488 100644 --- a/includes/jobqueue/jobs/ThumbnailRenderJob.php +++ b/includes/jobqueue/jobs/ThumbnailRenderJob.php @@ -27,7 +27,7 @@ * @ingroup JobQueue */ class ThumbnailRenderJob extends Job { - public function __construct( $title, $params ) { + public function __construct( Title $title, array $params ) { parent::__construct( 'ThumbnailRender', $title, $params ); } @@ -50,16 +50,16 @@ class ThumbnailRenderJob extends Job { return false; } } elseif ( $wgUploadThumbnailRenderMethod === 'http' ) { - $status = $this->hitThumbUrl( $file, $transformParams ); + $thumbUrl = ''; + $status = $this->hitThumbUrl( $file, $transformParams, $thumbUrl ); wfDebug( __METHOD__ . ": received status {$status}\n" ); - if ( $status === 200 || $status === 301 || $status === 302 ) { + // 400 happens when requesting a size greater or equal than the original + if ( $status === 200 || $status === 301 || $status === 302 || $status === 400 ) { return true; } elseif ( $status ) { - // Note that this currently happens (500) when requesting sizes larger then or - // equal to the original, which is harmless. - $this->setLastError( __METHOD__ . ': incorrect HTTP status ' . $status ); + $this->setLastError( __METHOD__ . ': incorrect HTTP status ' . $status . ' when hitting ' . $thumbUrl ); return false; } else { $this->setLastError( __METHOD__ . ': HTTP request failure' ); @@ -75,7 +75,7 @@ class ThumbnailRenderJob extends Job { } } - protected function hitThumbUrl( $file, $transformParams ) { + protected function hitThumbUrl( $file, $transformParams, &$thumbUrl ) { global $wgUploadThumbnailRenderHttpCustomHost, $wgUploadThumbnailRenderHttpCustomDomain; $thumbName = $file->thumbName( $transformParams ); diff --git a/includes/jobqueue/jobs/UploadFromUrlJob.php b/includes/jobqueue/jobs/UploadFromUrlJob.php index d15fd025..ade48106 100644 --- a/includes/jobqueue/jobs/UploadFromUrlJob.php +++ b/includes/jobqueue/jobs/UploadFromUrlJob.php @@ -39,7 +39,7 @@ class UploadFromUrlJob extends Job { /** @var User */ protected $user; - public function __construct( $title, $params ) { + public function __construct( Title $title, array $params ) { parent::__construct( 'uploadFromUrl', $title, $params ); } @@ -166,11 +166,11 @@ class UploadFromUrlJob extends Job { } /** - * Initialize the session data. Sets the intial result to queued. + * Initialize the session data. Sets the initial result to queued. */ public function initializeSessionData() { $session =& self::getSessionData( $this->params['sessionKey'] ); - $$session['result'] = 'Queued'; + $session['result'] = 'Queued'; } /** |