summaryrefslogtreecommitdiff
path: root/includes/jobqueue
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue')
-rw-r--r--includes/jobqueue/Job.php46
-rw-r--r--includes/jobqueue/JobQueue.php81
-rw-r--r--includes/jobqueue/JobQueueDB.php98
-rw-r--r--includes/jobqueue/JobQueueFederated.php35
-rw-r--r--includes/jobqueue/JobQueueGroup.php143
-rw-r--r--includes/jobqueue/JobQueueRedis.php151
-rw-r--r--includes/jobqueue/JobRunner.php171
-rw-r--r--includes/jobqueue/JobSpecification.php71
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregator.php2
-rw-r--r--includes/jobqueue/aggregator/JobQueueAggregatorRedis.php8
-rw-r--r--includes/jobqueue/jobs/ActivityUpdateJob.php75
-rw-r--r--includes/jobqueue/jobs/AssembleUploadChunksJob.php2
-rw-r--r--includes/jobqueue/jobs/DoubleRedirectJob.php20
-rw-r--r--includes/jobqueue/jobs/DuplicateJob.php2
-rw-r--r--includes/jobqueue/jobs/EmaillingJob.php4
-rw-r--r--includes/jobqueue/jobs/EnotifNotifyJob.php2
-rw-r--r--includes/jobqueue/jobs/EnqueueJob.php21
-rw-r--r--includes/jobqueue/jobs/HTMLCacheUpdateJob.php2
-rw-r--r--includes/jobqueue/jobs/NullJob.php2
-rw-r--r--includes/jobqueue/jobs/PublishStashedFileJob.php2
-rw-r--r--includes/jobqueue/jobs/RecentChangesUpdateJob.php8
-rw-r--r--includes/jobqueue/jobs/RefreshLinksJob.php51
-rw-r--r--includes/jobqueue/jobs/ThumbnailRenderJob.php14
-rw-r--r--includes/jobqueue/jobs/UploadFromUrlJob.php6
24 files changed, 631 insertions, 386 deletions
diff --git a/includes/jobqueue/Job.php b/includes/jobqueue/Job.php
index f8de0b5d..3e23391c 100644
--- a/includes/jobqueue/Job.php
+++ b/includes/jobqueue/Job.php
@@ -32,7 +32,7 @@ abstract class Job implements IJobSpecification {
/** @var string */
public $command;
- /** @var array|bool Array of job parameters or false if none */
+ /** @var array Array of job parameters */
public $params;
/** @var array Additional queue metadata */
@@ -58,11 +58,11 @@ abstract class Job implements IJobSpecification {
*
* @param string $command Job command
* @param Title $title Associated title
- * @param array|bool $params Job parameters
+ * @param array $params Job parameters
* @throws MWException
* @return Job
*/
- public static function factory( $command, Title $title, $params = false ) {
+ public static function factory( $command, Title $title, $params = array() ) {
global $wgJobClasses;
if ( isset( $wgJobClasses[$command] ) ) {
$class = $wgJobClasses[$command];
@@ -80,7 +80,7 @@ abstract class Job implements IJobSpecification {
public function __construct( $command, $title, $params = false ) {
$this->command = $command;
$this->title = $title;
- $this->params = $params;
+ $this->params = is_array( $params ) ? $params : array(); // sanity
// expensive jobs may set this to true
$this->removeDuplicates = false;
@@ -135,6 +135,24 @@ abstract class Job implements IJobSpecification {
}
/**
+ * @return int|null UNIX timestamp of when the job was queued, or null
+ * @since 1.26
+ */
+ public function getQueuedTimestamp() {
+ return isset( $this->metadata['timestamp'] )
+ ? wfTimestampOrNull( TS_UNIX, $this->metadata['timestamp'] )
+ : null;
+ }
+
+ /**
+ * @return int|null UNIX timestamp of when the job was runnable, or null
+ * @since 1.26
+ */
+ public function getReadyTimestamp() {
+ return $this->getReleaseTimestamp() ?: $this->getQueuedTimestamp();
+ }
+
+ /**
* Whether the queue should reject insertion of this job if a duplicate exists
*
* This can be used to avoid duplicated effort or combined with delayed jobs to
@@ -196,15 +214,27 @@ abstract class Job implements IJobSpecification {
}
/**
+ * Get "root job" parameters for a task
+ *
+ * This is used to no-op redundant jobs, including child jobs of jobs,
+ * as long as the children inherit the root job parameters. When a job
+ * with root job parameters and "rootJobIsSelf" set is pushed, the
+ * deduplicateRootJob() method is automatically called on it. If the
+ * root job is only virtual and not actually pushed (e.g. the sub-jobs
+ * are inserted directly), then call deduplicateRootJob() directly.
+ *
* @see JobQueue::deduplicateRootJob()
+ *
* @param string $key A key that identifies the task
* @return array Map of:
+ * - rootJobIsSelf : true
* - rootJobSignature : hash (e.g. SHA1) that identifies the task
* - rootJobTimestamp : TS_MW timestamp of this instance of the task
* @since 1.21
*/
public static function newRootJobParams( $key ) {
return array(
+ 'rootJobIsSelf' => true,
'rootJobSignature' => sha1( $key ),
'rootJobTimestamp' => wfTimestampNow()
);
@@ -237,6 +267,14 @@ abstract class Job implements IJobSpecification {
}
/**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool Whether this is job is a root job
+ */
+ public function isRootJob() {
+ return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] );
+ }
+
+ /**
* Insert a single job into the queue.
* @return bool True on success
* @deprecated since 1.21
diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php
index 91fe86cf..69a3defb 100644
--- a/includes/jobqueue/JobQueue.php
+++ b/includes/jobqueue/JobQueue.php
@@ -94,7 +94,7 @@ abstract class JobQueue {
* This might be useful for improving concurrency for job acquisition.
* - claimTTL : If supported, the queue will recycle jobs that have been popped
* but not acknowledged as completed after this many seconds. Recycling
- * of jobs simple means re-inserting them into the queue. Jobs can be
+ * of jobs simply means re-inserting them into the queue. Jobs can be
* attempted up to three times before being discarded.
*
* Queue classes should throw an exception if they do not support the options given.
@@ -286,7 +286,7 @@ abstract class JobQueue {
* This does not require $wgJobClasses to be set for the given job type.
* Outside callers should use JobQueueGroup::push() instead of this function.
*
- * @param Job|array $jobs A single job or an array of Jobs
+ * @param JobSpecification|JobSpecification[] $jobs
* @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
* @return void
* @throws JobQueueError
@@ -301,7 +301,7 @@ abstract class JobQueue {
* This does not require $wgJobClasses to be set for the given job type.
* Outside callers should use JobQueueGroup::push() instead of this function.
*
- * @param array $jobs List of Jobs
+ * @param JobSpecification[] $jobs
* @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
* @return void
* @throws MWException
@@ -323,11 +323,17 @@ abstract class JobQueue {
$this->doBatchPush( $jobs, $flags );
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
+
+ foreach ( $jobs as $job ) {
+ if ( $job->isRootJob() ) {
+ $this->deduplicateRootJob( $job );
+ }
+ }
}
/**
* @see JobQueue::batchPush()
- * @param array $jobs
+ * @param JobSpecification[] $jobs
* @param int $flags
*/
abstract protected function doBatchPush( array $jobs, $flags );
@@ -359,7 +365,7 @@ abstract class JobQueue {
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
- JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'dupe_pops', $this->type );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
} catch ( Exception $e ) {
@@ -425,11 +431,11 @@ abstract class JobQueue {
*
* This does nothing for certain queue classes.
*
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- final public function deduplicateRootJob( Job $job ) {
+ final public function deduplicateRootJob( IJobSpecification $job ) {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
@@ -440,11 +446,11 @@ abstract class JobQueue {
/**
* @see JobQueue::deduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
if ( !$job->hasRootJobParams() ) {
throw new MWException( "Cannot register root job; missing parameters." );
}
@@ -549,35 +555,6 @@ abstract class JobQueue {
}
/**
- * Return a map of task names to task definition maps.
- * A "task" is a fast periodic queue maintenance action.
- * Mutually exclusive tasks must implement their own locking in the callback.
- *
- * Each task value is an associative array with:
- * - name : the name of the task
- * - callback : a PHP callable that performs the task
- * - period : the period in seconds corresponding to the task frequency
- *
- * @return array
- */
- final public function getPeriodicTasks() {
- $tasks = $this->doGetPeriodicTasks();
- foreach ( $tasks as $name => &$def ) {
- $def['name'] = $name;
- }
-
- return $tasks;
- }
-
- /**
- * @see JobQueue::getPeriodicTasks()
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array();
- }
-
- /**
* Clear any process and persistent caches
*
* @return void
@@ -616,6 +593,20 @@ abstract class JobQueue {
}
/**
+ * Get an iterator to traverse over all claimed jobs in this queue
+ *
+ * Callers should be quick to iterator over it or few results
+ * will be returned due to jobs being acknowledged and deleted
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.26
+ */
+ public function getAllAcquiredJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
+ /**
* Get an iterator to traverse over all abandoned jobs in this queue
*
* @return Iterator
@@ -646,7 +637,6 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getSiblingQueuesWithJobs( array $types ) {
-
return $this->doGetSiblingQueuesWithJobs( $types );
}
@@ -670,7 +660,6 @@ abstract class JobQueue {
* @since 1.22
*/
final public function getSiblingQueueSizes( array $types ) {
-
return $this->doGetSiblingQueueSizes( $types );
}
@@ -689,15 +678,15 @@ abstract class JobQueue {
* @param string $key Event type
* @param string $type Job type
* @param int $delta
- * @param string $wiki Wiki ID (added in 1.23)
* @since 1.22
*/
- public static function incrStats( $key, $type, $delta = 1, $wiki = null ) {
- wfIncrStats( $key, $delta );
- wfIncrStats( "{$key}-{$type}", $delta );
- if ( $wiki !== null ) {
- wfIncrStats( "{$key}-{$type}-{$wiki}", $delta );
+ public static function incrStats( $key, $type, $delta = 1 ) {
+ static $stats;
+ if ( !$stats ) {
+ $stats = RequestContext::getMain()->getStats();
}
+ $stats->updateCount( "jobqueue.{$key}.all", $delta );
+ $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
}
/**
diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php
index d5f47ffd..d1e4a135 100644
--- a/includes/jobqueue/JobQueueDB.php
+++ b/includes/jobqueue/JobQueueDB.php
@@ -29,7 +29,6 @@
*/
class JobQueueDB extends JobQueue {
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
const MAX_OFFSET = 255; // integer; maximum number of rows to skip
@@ -71,15 +70,6 @@ class JobQueueDB extends JobQueue {
* @return bool
*/
protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
$dbr = $this->getSlaveDB();
try {
$found = $dbr->selectField( // unclaimed job
@@ -88,7 +78,6 @@ class JobQueueDB extends JobQueue {
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
- $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
return !$found;
}
@@ -256,12 +245,9 @@ class JobQueueDB extends JobQueue {
foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, $method );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
- JobQueue::incrStats(
- 'job-insert-duplicate',
- $this->type,
- count( $rowSet ) + count( $rowList ) - count( $rows ),
- $this->wiki
+ JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
if ( $flags & self::QOS_ATOMIC ) {
@@ -273,8 +259,6 @@ class JobQueueDB extends JobQueue {
$dbw->commit( $method );
}
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
-
return;
}
@@ -283,10 +267,6 @@ class JobQueueDB extends JobQueue {
* @return Job|bool
*/
protected function doPop() {
- if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
- return false; // queue is empty
- }
-
$dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
@@ -309,22 +289,23 @@ class JobQueueDB extends JobQueue {
}
// Check if we found a row to reserve...
if ( !$row ) {
- $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
break; // nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'pops', $this->type );
// Get the job object from the row...
- $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
- if ( !$title ) {
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
- continue; // try again
- }
+ $title = Title::makeTitle( $row->job_namespace, $row->job_title );
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
break; // done
} while ( true );
+
+ if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+ // Handled jobs that need to be recycled/deleted;
+ // any recycled jobs will be picked up next attempt
+ $this->recycleAndDeleteStaleJobs();
+ }
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
@@ -495,6 +476,8 @@ class JobQueueDB extends JobQueue {
// Delete a row with a single DELETE without holding row locks over RTTs...
$dbw->delete( 'job',
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+ JobQueue::incrStats( 'acks', $this->type );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
@@ -504,11 +487,11 @@ class JobQueueDB extends JobQueue {
/**
* @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
@@ -560,22 +543,10 @@ class JobQueueDB extends JobQueue {
}
/**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- )
- );
- }
-
- /**
* @return void
*/
protected function doFlushCaches() {
- foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ foreach ( array( 'size', 'acquiredcount' ) as $type ) {
$this->cache->delete( $this->getCacheKey( $type ) );
}
}
@@ -585,18 +556,35 @@ class JobQueueDB extends JobQueue {
* @return Iterator
*/
public function getAllQueuedJobs() {
+ return $this->getJobIterator( array( 'job_cmd' => $this->getType(), 'job_token' => '' ) );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ */
+ public function getAllAcquiredJobs() {
+ return $this->getJobIterator( array( 'job_cmd' => $this->getType(), "job_token > ''" ) );
+ }
+
+ /**
+ * @param array $conds Query conditions
+ * @return Iterator
+ */
+ protected function getJobIterator( array $conds ) {
$dbr = $this->getSlaveDB();
try {
return new MappedIterator(
- $dbr->select( 'job', self::selectFields(),
- array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
- function ( $row ) use ( $dbr ) {
+ $dbr->select( 'job', self::selectFields(), $conds ),
+ function ( $row ) {
$job = Job::factory(
$row->job_cmd,
Title::makeTitle( $row->job_namespace, $row->job_title ),
- strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : array()
);
$job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
+
return $job;
}
);
@@ -613,6 +601,10 @@ class JobQueueDB extends JobQueue {
protected function doGetSiblingQueuesWithJobs( array $types ) {
$dbr = $this->getSlaveDB();
+ // @note: this does not check whether the jobs are claimed or not.
+ // This is useful so JobQueueGroup::pop() also sees queues that only
+ // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+ // failed jobs so that they can be popped again for that edge case.
$res = $dbr->select( 'job', 'DISTINCT job_cmd',
array( 'job_cmd' => $types ), __METHOD__ );
@@ -685,9 +677,7 @@ class JobQueueDB extends JobQueue {
);
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
- // The tasks recycled jobs or release delayed jobs into the queue
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ JobQueue::incrStats( 'recycles', $this->type, $affected );
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}
@@ -714,7 +704,7 @@ class JobQueueDB extends JobQueue {
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
+ JobQueue::incrStats( 'abandons', $this->type, $affected );
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php
index d985d449..109ca019 100644
--- a/includes/jobqueue/JobQueueFederated.php
+++ b/includes/jobqueue/JobQueueFederated.php
@@ -93,6 +93,8 @@ class JobQueueFederated extends JobQueue {
) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
+ // The class handles all aggregator calls already
+ unset( $baseConfig['aggregator'] );
// Get the partition queue objects
foreach ( $partitionMap as $partition => $w ) {
if ( !isset( $params['configByPartition'][$partition] ) ) {
@@ -328,7 +330,7 @@ class JobQueueFederated extends JobQueue {
return false;
}
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
$partition = $this->partitionRing->getLiveLocation( $sigature );
@@ -373,27 +375,7 @@ class JobQueueFederated extends JobQueue {
$this->throwErrorIfAllPartitionsDown( $failed );
}
- protected function doGetPeriodicTasks() {
- $tasks = array();
- /** @var JobQueue $queue */
- foreach ( $this->partitionQueues as $partition => $queue ) {
- foreach ( $queue->getPeriodicTasks() as $task => $def ) {
- $tasks["{$partition}:{$task}"] = $def;
- }
- }
-
- return $tasks;
- }
-
protected function doFlushCaches() {
- static $types = array(
- 'empty',
- 'size',
- 'acquiredcount',
- 'delayedcount',
- 'abandonedcount'
- );
-
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->doFlushCaches();
@@ -422,6 +404,17 @@ class JobQueueFederated extends JobQueue {
return $iterator;
}
+ public function getAllAcquiredJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllAcquiredJobs() );
+ }
+
+ return $iterator;
+ }
+
public function getAllAbandonedJobs() {
$iterator = new AppendIterator();
diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php
index ebd547a0..5bd1cc94 100644
--- a/includes/jobqueue/JobQueueGroup.php
+++ b/includes/jobqueue/JobQueueGroup.php
@@ -28,7 +28,7 @@
* @since 1.21
*/
class JobQueueGroup {
- /** @var array */
+ /** @var JobQueueGroup[] */
protected static $instances = array();
/** @var ProcessCacheLRU */
@@ -40,6 +40,9 @@ class JobQueueGroup {
/** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
protected $coalescedQueues;
+ /** @var Job[] */
+ protected $bufferedJobs = array();
+
const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
@@ -100,13 +103,13 @@ class JobQueueGroup {
}
/**
- * Insert jobs into the respective queues of with the belong.
+ * Insert jobs into the respective queues of which they belong
*
* This inserts the jobs into the queue specified by $wgJobTypeConf
* and updates the aggregate job queue information cache as needed.
*
- * @param Job|Job[] $jobs A single Job or a list of Jobs
- * @throws MWException
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @throws InvalidArgumentException
* @return void
*/
public function push( $jobs ) {
@@ -115,13 +118,11 @@ class JobQueueGroup {
return;
}
+ $this->assertValidJobs( $jobs );
+
$jobsByType = array(); // (job type => list of jobs)
foreach ( $jobs as $job ) {
- if ( $job instanceof IJobSpecification ) {
- $jobsByType[$job->getType()][] = $job;
- } else {
- throw new MWException( "Attempted to push a non-Job object into a queue." );
- }
+ $jobsByType[$job->getType()][] = $job;
}
foreach ( $jobsByType as $type => $jobs ) {
@@ -137,6 +138,42 @@ class JobQueueGroup {
}
/**
+ * Buffer jobs for insertion via push() or call it now if in CLI mode
+ *
+ * Note that MediaWiki::restInPeace() calls pushLazyJobs()
+ *
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @return void
+ * @since 1.26
+ */
+ public function lazyPush( $jobs ) {
+ if ( PHP_SAPI === 'cli' ) {
+ $this->push( $jobs );
+ return;
+ }
+
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+
+ // Throw errors now instead of on push(), when other jobs may be buffered
+ $this->assertValidJobs( $jobs );
+
+ $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
+ }
+
+ /**
+ * Push all jobs buffered via lazyPush() into their respective queues
+ *
+ * @return void
+ * @since 1.26
+ */
+ public static function pushLazyJobs() {
+ foreach ( self::$instances as $group ) {
+ $group->push( $group->bufferedJobs );
+ $group->bufferedJobs = array();
+ }
+ }
+
+ /**
* Pop a job off one of the job queues
*
* This pops a job off a queue as specified by $wgJobTypeConf and
@@ -188,10 +225,10 @@ class JobQueueGroup {
* Acknowledge that a job was completed
*
* @param Job $job
- * @return bool
+ * @return void
*/
public function ack( Job $job ) {
- return $this->get( $job->getType() )->ack( $job );
+ $this->get( $job->getType() )->ack( $job );
}
/**
@@ -211,7 +248,6 @@ class JobQueueGroup {
* This does nothing for certain queue classes.
*
* @return void
- * @throws MWException
*/
public function waitForBackups() {
global $wgJobTypeConf;
@@ -342,69 +378,6 @@ class JobQueueGroup {
}
/**
- * Execute any due periodic queue maintenance tasks for all queues.
- *
- * A task is "due" if the time ellapsed since the last run is greater than
- * the defined run period. Concurrent calls to this function will cause tasks
- * to be attempted twice, so they may need their own methods of mutual exclusion.
- *
- * @return int Number of tasks run
- */
- public function executeReadyPeriodicTasks() {
- global $wgMemc;
-
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
- $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
-
- $count = 0;
- $tasksRun = array(); // (queue => task => UNIX timestamp)
- foreach ( $this->getQueueTypes() as $type ) {
- $queue = $this->get( $type );
- foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
- if ( $definition['period'] <= 0 ) {
- continue; // disabled
- } elseif ( !isset( $lastRuns[$type][$task] )
- || $lastRuns[$type][$task] < ( time() - $definition['period'] )
- ) {
- try {
- if ( call_user_func( $definition['callback'] ) !== null ) {
- $tasksRun[$type][$task] = time();
- ++$count;
- }
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- }
- }
-
- if ( $count === 0 ) {
- return $count; // nothing to update
- }
-
- $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
- if ( is_array( $lastRuns ) ) {
- foreach ( $tasksRun as $type => $tasks ) {
- foreach ( $tasks as $task => $timestamp ) {
- if ( !isset( $lastRuns[$type][$task] )
- || $timestamp > $lastRuns[$type][$task]
- ) {
- $lastRuns[$type][$task] = $timestamp;
- }
- }
- }
- } else {
- $lastRuns = $tasksRun;
- }
-
- return $lastRuns;
- } );
-
- return $count;
- }
-
- /**
* @param string $name
* @return mixed
*/
@@ -427,4 +400,24 @@ class JobQueueGroup {
}
}
}
+
+ /**
+ * @param array $jobs
+ * @throws InvalidArgumentException
+ */
+ private function assertValidJobs( array $jobs ) {
+ foreach ( $jobs as $job ) { // sanity checks
+ if ( !( $job instanceof IJobSpecification ) ) {
+ throw new InvalidArgumentException( "Expected IJobSpecification objects" );
+ }
+ }
+ }
+
+ function __destruct() {
+ $n = count( $this->bufferedJobs );
+ if ( $n > 0 ) {
+ $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
+ trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
+ }
+ }
}
diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php
index 6c823fb9..29c8068a 100644
--- a/includes/jobqueue/JobQueueRedis.php
+++ b/includes/jobqueue/JobQueueRedis.php
@@ -81,6 +81,7 @@ class JobQueueRedis extends JobQueue {
* - daemonized : Set to true if the redisJobRunnerService runs in the background.
* This will disable job recycling/undelaying from the MediaWiki side
* to avoid redundance and out-of-sync configuration.
+ * @throws InvalidArgumentException
*/
public function __construct( array $params ) {
parent::__construct( $params );
@@ -89,7 +90,7 @@ class JobQueueRedis extends JobQueue {
$this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
if ( empty( $params['daemonized'] ) ) {
- throw new Exception(
+ throw new InvalidArgumentException(
"Non-daemonized mode is no longer supported. Please install the " .
"mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
}
@@ -110,7 +111,7 @@ class JobQueueRedis extends JobQueue {
/**
* @see JobQueue::doIsEmpty()
* @return bool
- * @throws MWException
+ * @throws JobQueueError
*/
protected function doIsEmpty() {
return $this->doGetSize() == 0;
@@ -119,7 +120,7 @@ class JobQueueRedis extends JobQueue {
/**
* @see JobQueue::doGetSize()
* @return int
- * @throws MWException
+ * @throws JobQueueError
*/
protected function doGetSize() {
$conn = $this->getConnection();
@@ -205,7 +206,7 @@ class JobQueueRedis extends JobQueue {
if ( $flags & self::QOS_ATOMIC ) {
$batches = array( $items ); // all or nothing
} else {
- $batches = array_chunk( $items, 500 ); // avoid tying up the server
+ $batches = array_chunk( $items, 100 ); // avoid tying up the server
}
$failed = 0;
$pushed = 0;
@@ -222,9 +223,9 @@ class JobQueueRedis extends JobQueue {
throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki );
- JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $items ) - $failed - $pushed, $this->wiki );
+ JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
+ count( $items ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
@@ -300,7 +301,7 @@ LUA;
break; // no jobs; nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'job-pop', $this->type );
$item = $this->unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
@@ -356,13 +357,15 @@ LUA;
* @see JobQueue::doAck()
* @param Job $job
* @return Job|bool
- * @throws MWException|JobQueueError
+ * @throws UnexpectedValueException
+ * @throws JobQueueError
*/
protected function doAck( Job $job ) {
if ( !isset( $job->metadata['uuid'] ) ) {
- throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
+ throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
}
+ $uuid = $job->metadata['uuid'];
$conn = $this->getConnection();
try {
static $script =
@@ -379,16 +382,18 @@ LUA;
$this->getQueueKey( 'z-claimed' ), # KEYS[1]
$this->getQueueKey( 'h-attempts' ), # KEYS[2]
$this->getQueueKey( 'h-data' ), # KEYS[3]
- $job->metadata['uuid'] # ARGV[1]
+ $uuid # ARGV[1]
),
3 # number of first argument(s) that are keys
);
if ( !$res ) {
- wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." );
return false;
}
+
+ JobQueue::incrStats( 'job-ack', $this->type );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
@@ -398,13 +403,14 @@ LUA;
/**
* @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @return bool
- * @throws MWException|JobQueueError
+ * @throws JobQueueError
+ * @throws LogicException
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
if ( !$job->hasRootJobParams() ) {
- throw new MWException( "Cannot register root job; missing parameters." );
+ throw new LogicException( "Cannot register root job; missing parameters." );
}
$params = $job->getRootJobParams();
@@ -441,6 +447,7 @@ LUA;
// Get the last time this root job was enqueued
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
} catch ( RedisException $e ) {
+ $timestamp = false;
$this->throwRedisException( $conn, $e );
}
@@ -473,70 +480,84 @@ LUA;
/**
* @see JobQueue::getAllQueuedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllQueuedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator(
- $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
}
/**
- * @see JobQueue::getAllQueuedJobs()
+ * @see JobQueue::getAllDelayedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllDelayedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator( // delayed jobs
- $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllAcquiredJobs() {
+ $conn = $this->getConnection();
+ try {
+ $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $this->getJobIterator( $conn, $uids );
}
/**
* @see JobQueue::getAllAbandonedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllAbandonedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator( // delayed jobs
- $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param array $uids List of job UUIDs
+ * @return MappedIterator
+ */
+ protected function getJobIterator( RedisConnRef $conn, array $uids ) {
+ $that = $this;
+
+ return new MappedIterator(
+ $uids,
+ function ( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ },
+ array( 'accept' => function ( $job ) {
+ return is_object( $job );
+ } )
+ );
}
public function getCoalesceLocationInternal() {
@@ -575,7 +596,8 @@ LUA;
* @param string $uid
* @param RedisConnRef $conn
* @return Job|bool Returns false if the job does not exist
- * @throws MWException|JobQueueError
+ * @throws JobQueueError
+ * @throws UnexpectedValueException
*/
public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
try {
@@ -583,13 +605,16 @@ LUA;
if ( $data === false ) {
return false; // not found
}
- $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+ $item = $this->unserialize( $data );
if ( !is_array( $item ) ) { // this shouldn't happen
- throw new MWException( "Could not find job with ID '$uid'." );
+ throw new UnexpectedValueException( "Could not find job with ID '$uid'." );
}
$title = Title::makeTitle( $item['namespace'], $item['title'] );
$job = Job::factory( $item['type'], $title, $item['params'] );
$job->metadata['uuid'] = $item['uuid'];
+ $job->metadata['timestamp'] = $item['timestamp'];
+ // Add in attempt count for debugging at showJobs.php
+ $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid );
return $job;
} catch ( RedisException $e ) {
@@ -598,13 +623,6 @@ LUA;
}
/**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(); // managed in the runner loop
- }
-
- /**
* @param IJobSpecification $job
* @return array
*/
@@ -631,15 +649,12 @@ LUA;
* @return Job|bool
*/
protected function getJobFromFields( array $fields ) {
- $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
- if ( $title ) {
- $job = Job::factory( $fields['type'], $title, $fields['params'] );
- $job->metadata['uuid'] = $fields['uuid'];
+ $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
+ $job = Job::factory( $fields['type'], $title, $fields['params'] );
+ $job->metadata['uuid'] = $fields['uuid'];
+ $job->metadata['timestamp'] = $fields['timestamp'];
- return $job;
- }
-
- return false;
+ return $job;
}
/**
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php
index b8c5d6cf..13043629 100644
--- a/includes/jobqueue/JobRunner.php
+++ b/includes/jobqueue/JobRunner.php
@@ -36,6 +36,11 @@ class JobRunner implements LoggerAwareInterface {
protected $debug;
/**
+ * @var LoggerInterface $logger
+ */
+ protected $logger;
+
+ /**
* @param callable $debug Optional debug output handler
*/
public function setDebugHandler( $debug ) {
@@ -43,12 +48,8 @@ class JobRunner implements LoggerAwareInterface {
}
/**
- * @var LoggerInterface $logger
- */
- protected $logger;
-
- /**
* @param LoggerInterface $logger
+ * @return void
*/
public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger;
@@ -88,7 +89,7 @@ class JobRunner implements LoggerAwareInterface {
* @return array Summary response that can easily be JSON serialized
*/
public function run( array $options ) {
- global $wgJobClasses;
+ global $wgJobClasses, $wgTrxProfilerLimits;
$response = array( 'jobs' => array(), 'reached' => 'none-ready' );
@@ -102,43 +103,43 @@ class JobRunner implements LoggerAwareInterface {
return $response;
}
- $group = JobQueueGroup::singleton();
- // Handle any required periodic queue maintenance
- $count = $group->executeReadyPeriodicTasks();
- if ( $count > 0 ) {
- $msg = "Executed $count periodic queue task(s).";
- $this->logger->debug( $msg );
- $this->debugCallback( $msg );
- }
-
// Bail out if in read-only mode
if ( wfReadOnly() ) {
$response['reached'] = 'read-only';
return $response;
}
- // Bail out if there is too much DB lag
- list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag();
- if ( $maxLag >= 5 ) {
+ // Catch huge single updates that lead to slave lag
+ $trxProfiler = Profiler::instance()->getTransactionProfiler();
+ $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
+ $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
+
+ // Bail out if there is too much DB lag.
+ // This check should not block as we want to try other wiki queues.
+ $maxAllowedLag = 3;
+ list( , $maxLag ) = wfGetLB( wfWikiID() )->getMaxLag();
+ if ( $maxLag >= $maxAllowedLag ) {
$response['reached'] = 'slave-lag-limit';
return $response;
}
+ $group = JobQueueGroup::singleton();
+
// Flush any pending DB writes for sanity
- wfGetLBFactory()->commitMasterChanges();
+ wfGetLBFactory()->commitAll();
// Some jobs types should not run until a certain timestamp
$backoffs = array(); // map of (type => UNIX expiry)
$backoffDeltas = array(); // map of (type => seconds)
$wait = 'wait'; // block to read backoffs the first time
- $jobsRun = 0;
+ $stats = RequestContext::getMain()->getStats();
+ $jobsPopped = 0;
$timeMsTotal = 0;
$flags = JobQueueGroup::USE_CACHE;
- $checkPeriod = 5.0; // seconds
- $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes
$startTime = microtime( true ); // time since jobs started running
- $lastTime = microtime( true ) - $checkPhase; // time since last slave check
+ $checkLagPeriod = 1.0; // check slave lag this many seconds
+ $lastCheckTime = 1; // timestamp of last slave check
do {
// Sync the persistent backoffs with concurrent runners
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
@@ -154,6 +155,7 @@ class JobRunner implements LoggerAwareInterface {
}
if ( $job ) { // found a job
+ $popTime = time();
$jType = $job->getType();
// Back off of certain jobs for a while (for throttling and for errors)
@@ -168,25 +170,47 @@ class JobRunner implements LoggerAwareInterface {
}
$msg = $job->toString() . " STARTING";
- $this->logger->info( $msg );
+ $this->logger->debug( $msg );
$this->debugCallback( $msg );
// Run the job...
$jobStartTime = microtime( true );
try {
- ++$jobsRun;
+ ++$jobsPopped;
$status = $job->run();
$error = $job->getLastError();
- wfGetLBFactory()->commitMasterChanges();
+ $this->commitMasterChanges( $job );
+
+ DeferredUpdates::doUpdates();
+ $this->commitMasterChanges( $job );
} catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false;
$error = get_class( $e ) . ': ' . $e->getMessage();
MWExceptionHandler::logException( $e );
}
+ // Commit all outstanding connections that are in a transaction
+ // to get a fresh repeatable read snapshot on every connection.
+ wfGetLBFactory()->commitAll();
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
$timeMsTotal += $timeMs;
+ // Record how long jobs wait before getting popped
+ $readyTs = $job->getReadyTimestamp();
+ if ( $readyTs ) {
+ $pickupDelay = $popTime - $readyTs;
+ $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
+ $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
+ }
+ // Record root job age for jobs being run
+ $root = $job->getRootJobParams();
+ if ( $root['rootJobTimestamp'] ) {
+ $age = $popTime - wfTimestamp( TS_UNIX, $root['rootJobTimestamp'] );
+ $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
+ }
+ // Track the execution time for jobs
+ $stats->timing( "jobqueue.run.$jType", $timeMs );
+
// Mark the job as done on success or when the job cannot be retried
if ( $status !== false || !$job->allowRetries() ) {
$group->ack( $job ); // done
@@ -218,7 +242,7 @@ class JobRunner implements LoggerAwareInterface {
);
// Break out if we hit the job count or wall time limits...
- if ( $maxJobs && $jobsRun >= $maxJobs ) {
+ if ( $maxJobs && $jobsPopped >= $maxJobs ) {
$response['reached'] = 'job-limit';
break;
} elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
@@ -229,21 +253,24 @@ class JobRunner implements LoggerAwareInterface {
// Don't let any of the main DB slaves get backed up.
// This only waits for so long before exiting and letting
// other wikis in the farm (on different masters) get a chance.
- $timePassed = microtime( true ) - $lastTime;
- if ( $timePassed >= 5 || $timePassed < 0 ) {
- if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) {
+ $timePassed = microtime( true ) - $lastCheckTime;
+ if ( $timePassed >= $checkLagPeriod || $timePassed < 0 ) {
+ if ( !wfWaitForSlaves( $lastCheckTime, false, '*', $maxAllowedLag ) ) {
$response['reached'] = 'slave-lag-limit';
break;
}
- $lastTime = microtime( true );
+ $lastCheckTime = microtime( true );
}
// Don't let any queue slaves/backups fall behind
- if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) {
+ if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
$group->waitForBackups();
}
// Bail if near-OOM instead of in a job
- $this->assertMemoryOK();
+ if ( !$this->checkMemoryOK() ) {
+ $response['reached'] = 'memory-limit';
+ break;
+ }
}
} while ( $job ); // stop when there are no jobs
@@ -298,7 +325,6 @@ class JobRunner implements LoggerAwareInterface {
* @return array Map of (job type => backoff expiry timestamp)
*/
private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
-
$file = wfTempDir() . '/mw-runJobs-backoffs.json';
if ( is_file( $file ) ) {
$noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
@@ -336,7 +362,6 @@ class JobRunner implements LoggerAwareInterface {
* @return array The new backoffs account for $backoffs and the latest file data
*/
private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
-
if ( !$deltas ) {
return $this->loadBackoffs( $backoffs, $mode );
}
@@ -374,9 +399,9 @@ class JobRunner implements LoggerAwareInterface {
/**
* Make sure that this script is not too close to the memory usage limit.
* It is better to die in between jobs than OOM right in the middle of one.
- * @throws MWException
+ * @return bool
*/
- private function assertMemoryOK() {
+ private function checkMemoryOK() {
static $maxBytes = null;
if ( $maxBytes === null ) {
$m = array();
@@ -390,8 +415,14 @@ class JobRunner implements LoggerAwareInterface {
}
$usedBytes = memory_get_usage();
if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
- throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." );
+ $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
+ $this->debugCallback( $msg );
+ $this->logger->error( $msg );
+
+ return false;
}
+
+ return true;
}
/**
@@ -403,4 +434,68 @@ class JobRunner implements LoggerAwareInterface {
call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
}
}
+
+ /**
+ * Issue a commit on all masters who are currently in a transaction and have
+ * made changes to the database. It also supports sometimes waiting for the
+ * local wiki's slaves to catch up. See the documentation for
+ * $wgJobSerialCommitThreshold for more.
+ *
+ * @param Job $job
+ * @throws DBError
+ */
+ private function commitMasterChanges( Job $job ) {
+ global $wgJobSerialCommitThreshold;
+
+ $lb = wfGetLB( wfWikiID() );
+ if ( $wgJobSerialCommitThreshold !== false ) {
+ // Generally, there is one master connection to the local DB
+ $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+ } else {
+ $dbwSerial = false;
+ }
+
+ if ( !$dbwSerial
+ || !$dbwSerial->namedLocksEnqueue()
+ || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
+ ) {
+ // Writes are all to foreign DBs, named locks don't form queues,
+ // or $wgJobSerialCommitThreshold is not reached; commit changes now
+ wfGetLBFactory()->commitMasterChanges();
+ return;
+ }
+
+ $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() );
+ $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
+
+ // Wait for an exclusive lock to commit
+ if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
+ // This will trigger a rollback in the main loop
+ throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
+ }
+ // Wait for the generic slave to catch up
+ $pos = $lb->getMasterPos();
+ if ( $pos ) {
+ $lb->waitForOne( $pos );
+ }
+
+ $fname = __METHOD__;
+ // Re-ping all masters with transactions. This throws DBError if some
+ // connection died while waiting on locks/slaves, triggering a rollback.
+ wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) use ( $fname ) {
+ $lb->forEachOpenConnection( function( DatabaseBase $conn ) use ( $fname ) {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->query( "SELECT 1", $fname );
+ }
+ } );
+ } );
+
+ // Actually commit the DB master changes
+ wfGetLBFactory()->commitMasterChanges();
+
+ // Release the lock
+ $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+ }
}
diff --git a/includes/jobqueue/JobSpecification.php b/includes/jobqueue/JobSpecification.php
index 42d2a39b..d59c09b5 100644
--- a/includes/jobqueue/JobSpecification.php
+++ b/includes/jobqueue/JobSpecification.php
@@ -59,6 +59,26 @@ interface IJobSpecification {
public function getDeduplicationInfo();
/**
+ * @see JobQueue::deduplicateRootJob()
+ * @return array
+ * @since 1.26
+ */
+ public function getRootJobParams();
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool
+ * @since 1.22
+ */
+ public function hasRootJobParams();
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool Whether this is job is a root job
+ */
+ public function isRootJob();
+
+ /**
* @return Title Descriptive title (this can simply be informative)
*/
public function getTitle();
@@ -68,7 +88,7 @@ interface IJobSpecification {
* Job queue task description base code
*
* Example usage:
- * <code>
+ * @code
* $job = new JobSpecification(
* 'null',
* array( 'lives' => 1, 'usleep' => 100, 'pi' => 3.141569 ),
@@ -76,7 +96,7 @@ interface IJobSpecification {
* Title::makeTitle( NS_SPECIAL, 'nullity' )
* );
* JobQueueGroup::singleton()->push( $job )
- * </code>
+ * @endcode
*
* @ingroup JobQueue
* @since 1.23
@@ -97,7 +117,7 @@ class JobSpecification implements IJobSpecification {
/**
* @param string $type
* @param array $params Map of key/values
- * @param array $opts Map of key/values
+ * @param array $opts Map of key/values; includes 'removeDuplicates'
* @param Title $title Optional descriptive title
*/
public function __construct(
@@ -108,7 +128,7 @@ class JobSpecification implements IJobSpecification {
$this->type = $type;
$this->params = $params;
- $this->title = $title ?: Title::newMainPage();
+ $this->title = $title ?: Title::makeTitle( NS_SPECIAL, 'Badtitle/' . get_class( $this ) );
$this->opts = $opts;
}
@@ -125,51 +145,28 @@ class JobSpecification implements IJobSpecification {
}
}
- /**
- * @return string
- */
public function getType() {
return $this->type;
}
- /**
- * @return Title
- */
public function getTitle() {
return $this->title;
}
- /**
- * @return array
- */
public function getParams() {
return $this->params;
}
- /**
- * @return int|null UNIX timestamp to delay running this job until, otherwise null
- */
public function getReleaseTimestamp() {
return isset( $this->params['jobReleaseTimestamp'] )
? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
: null;
}
- /**
- * @return bool Whether only one of each identical set of jobs should be run
- */
public function ignoreDuplicates() {
return !empty( $this->opts['removeDuplicates'] );
}
- /**
- * Subclasses may need to override this to make duplication detection work.
- * The resulting map conveys everything that makes the job unique. This is
- * only checked if ignoreDuplicates() returns true, meaning that duplicate
- * jobs are supposed to be ignored.
- *
- * @return array Map of key/values
- */
public function getDeduplicationInfo() {
$info = array(
'type' => $this->getType(),
@@ -188,6 +185,26 @@ class JobSpecification implements IJobSpecification {
return $info;
}
+ public function getRootJobParams() {
+ return array(
+ 'rootJobSignature' => isset( $this->params['rootJobSignature'] )
+ ? $this->params['rootJobSignature']
+ : null,
+ 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
+ ? $this->params['rootJobTimestamp']
+ : null
+ );
+ }
+
+ public function hasRootJobParams() {
+ return isset( $this->params['rootJobSignature'] )
+ && isset( $this->params['rootJobTimestamp'] );
+ }
+
+ public function isRootJob() {
+ return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] );
+ }
+
/**
* @return array Field/value map that can immediately be serialized
* @since 1.25
diff --git a/includes/jobqueue/aggregator/JobQueueAggregator.php b/includes/jobqueue/aggregator/JobQueueAggregator.php
index febc277a..aa02d1fa 100644
--- a/includes/jobqueue/aggregator/JobQueueAggregator.php
+++ b/includes/jobqueue/aggregator/JobQueueAggregator.php
@@ -169,4 +169,4 @@ class JobQueueAggregatorNull extends JobQueueAggregator {
protected function doPurge() {
return true;
}
-} \ No newline at end of file
+}
diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
index 847dd6f4..6c49646b 100644
--- a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
+++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
@@ -77,6 +77,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
try {
$conn->multi( Redis::PIPELINE );
$conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
+ $conn->sAdd( $this->getWikiSetKey(), $wiki );
$conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
$conn->exec();
@@ -198,6 +199,13 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
}
/**
+ * @return string
+ */
+ private function getWikiSetKey() {
+ return "jobqueue:aggregator:s-wikis:v2"; // global
+ }
+
+ /**
* @param string $type
* @param string $wiki
* @return string
diff --git a/includes/jobqueue/jobs/ActivityUpdateJob.php b/includes/jobqueue/jobs/ActivityUpdateJob.php
new file mode 100644
index 00000000..f146e6e8
--- /dev/null
+++ b/includes/jobqueue/jobs/ActivityUpdateJob.php
@@ -0,0 +1,75 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for updating user activity like "last viewed" timestamps
+ *
+ * @ingroup JobQueue
+ * @since 1.26
+ */
+class ActivityUpdateJob extends Job {
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'activityUpdateJob', $title, $params );
+
+ if ( !isset( $params['type'] ) ) {
+ throw new InvalidArgumentException( "Missing 'type' parameter." );
+ }
+
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ if ( $this->params['type'] === 'updateWatchlistNotification' ) {
+ $this->updateWatchlistNotification();
+ } else {
+ throw new Exception( "Invalid 'type' parameter '{$this->params['type']}'." );
+ }
+
+ return true;
+ }
+
+ protected function updateWatchlistNotification() {
+ $casTimestamp = ( $this->params['notifTime'] !== null )
+ ? $this->params['notifTime']
+ : $this->params['curTime'];
+
+ $dbw = wfGetDB( DB_MASTER );
+ $dbw->update( 'watchlist',
+ array(
+ 'wl_notificationtimestamp' => $dbw->timestampOrNull( $this->params['notifTime'] )
+ ),
+ array(
+ 'wl_user' => $this->params['userid'],
+ 'wl_namespace' => $this->title->getNamespace(),
+ 'wl_title' => $this->title->getDBkey(),
+ // Add a "check and set" style comparison to handle conflicts.
+ // The inequality always avoids updates when the current value
+ // is already NULL per ANSI SQL. This is desired since NULL means
+ // that the user is "caught up" on edits already. When the field
+ // is non-NULL, make sure not to set it back in time or set it to
+ // NULL when newer revisions were in fact added to the page.
+ 'wl_notificationtimestamp < ' . $dbw->addQuotes( $dbw->timestamp( $casTimestamp ) )
+ ),
+ __METHOD__
+ );
+ }
+}
diff --git a/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/includes/jobqueue/jobs/AssembleUploadChunksJob.php
index b7f09e77..a1de77e6 100644
--- a/includes/jobqueue/jobs/AssembleUploadChunksJob.php
+++ b/includes/jobqueue/jobs/AssembleUploadChunksJob.php
@@ -27,7 +27,7 @@
* @ingroup Upload
*/
class AssembleUploadChunksJob extends Job {
- public function __construct( $title, $params ) {
+ public function __construct( Title $title, array $params ) {
parent::__construct( 'AssembleUploadChunks', $title, $params );
$this->removeDuplicates = true;
}
diff --git a/includes/jobqueue/jobs/DoubleRedirectJob.php b/includes/jobqueue/jobs/DoubleRedirectJob.php
index 2561f2f1..ab638967 100644
--- a/includes/jobqueue/jobs/DoubleRedirectJob.php
+++ b/includes/jobqueue/jobs/DoubleRedirectJob.php
@@ -41,6 +41,16 @@ class DoubleRedirectJob extends Job {
private static $user;
/**
+ * @param Title $title
+ * @param array $params
+ */
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'fixDoubleRedirect', $title, $params );
+ $this->reason = $params['reason'];
+ $this->redirTitle = Title::newFromText( $params['redirTitle'] );
+ }
+
+ /**
* Insert jobs into the job queue to fix redirects to the given title
* @param string $reason The reason for the fix, see message
* "double-redirect-fixed-<reason>"
@@ -82,16 +92,6 @@ class DoubleRedirectJob extends Job {
}
/**
- * @param Title $title
- * @param array|bool $params
- */
- function __construct( $title, $params = false ) {
- parent::__construct( 'fixDoubleRedirect', $title, $params );
- $this->reason = $params['reason'];
- $this->redirTitle = Title::newFromText( $params['redirTitle'] );
- }
-
- /**
* @return bool
*/
function run() {
diff --git a/includes/jobqueue/jobs/DuplicateJob.php b/includes/jobqueue/jobs/DuplicateJob.php
index c5e3a234..068d5319 100644
--- a/includes/jobqueue/jobs/DuplicateJob.php
+++ b/includes/jobqueue/jobs/DuplicateJob.php
@@ -33,7 +33,7 @@ final class DuplicateJob extends Job {
* @param Title $title
* @param array $params Job parameters
*/
- function __construct( $title, $params ) {
+ function __construct( Title $title, array $params ) {
parent::__construct( 'duplicate', $title, $params );
}
diff --git a/includes/jobqueue/jobs/EmaillingJob.php b/includes/jobqueue/jobs/EmaillingJob.php
index df8ae63e..beeb0673 100644
--- a/includes/jobqueue/jobs/EmaillingJob.php
+++ b/includes/jobqueue/jobs/EmaillingJob.php
@@ -28,7 +28,7 @@
* @ingroup JobQueue
*/
class EmaillingJob extends Job {
- function __construct( $title, $params ) {
+ function __construct( Title $title = null, array $params ) {
parent::__construct( 'sendMail', Title::newMainPage(), $params );
}
@@ -38,7 +38,7 @@ class EmaillingJob extends Job {
$this->params['from'],
$this->params['subj'],
$this->params['body'],
- $this->params['replyto']
+ array( 'replyTo' => $this->params['replyto'] )
);
return $status->isOK();
diff --git a/includes/jobqueue/jobs/EnotifNotifyJob.php b/includes/jobqueue/jobs/EnotifNotifyJob.php
index 1ed99a58..9a5c3c72 100644
--- a/includes/jobqueue/jobs/EnotifNotifyJob.php
+++ b/includes/jobqueue/jobs/EnotifNotifyJob.php
@@ -27,7 +27,7 @@
* @ingroup JobQueue
*/
class EnotifNotifyJob extends Job {
- function __construct( $title, $params ) {
+ function __construct( Title $title, array $params ) {
parent::__construct( 'enotifNotify', $title, $params );
}
diff --git a/includes/jobqueue/jobs/EnqueueJob.php b/includes/jobqueue/jobs/EnqueueJob.php
index 46fb2aa7..c7ee9b65 100644
--- a/includes/jobqueue/jobs/EnqueueJob.php
+++ b/includes/jobqueue/jobs/EnqueueJob.php
@@ -40,13 +40,13 @@ final class EnqueueJob extends Job {
* @param Title $title
* @param array $params Job parameters
*/
- function __construct( $title, $params ) {
+ function __construct( Title $title, array $params ) {
parent::__construct( 'enqueue', $title, $params );
}
/**
- * @param Job|JobSpecification|array $jobs
- * @return JobRouteJob
+ * @param JobSpecification|JobSpecification[] $jobs
+ * @return EnqueueJob
*/
public static function newFromLocalJobs( $jobs ) {
$jobs = is_array( $jobs ) ? $jobs : array( $jobs );
@@ -56,9 +56,11 @@ final class EnqueueJob extends Job {
/**
* @param array $jobsByWiki Map of (wiki => JobSpecification list)
- * @return JobRouteJob
+ * @return EnqueueJob
*/
public static function newFromJobsByWiki( array $jobsByWiki ) {
+ $deduplicate = true;
+
$jobMapsByWiki = array();
foreach ( $jobsByWiki as $wiki => $jobs ) {
$jobMapsByWiki[$wiki] = array();
@@ -68,10 +70,19 @@ final class EnqueueJob extends Job {
} else {
throw new InvalidArgumentException( "Jobs must be of type JobSpecification." );
}
+ $deduplicate = $deduplicate && $job->ignoreDuplicates();
}
}
- return new self( Title::newMainPage(), array( 'jobsByWiki' => $jobMapsByWiki ) );
+ $eJob = new self(
+ Title::makeTitle( NS_SPECIAL, 'Badtitle/' . __CLASS__ ),
+ array( 'jobsByWiki' => $jobMapsByWiki )
+ );
+ // If *all* jobs to be pushed are to be de-duplicated (a common case), then
+ // de-duplicate this whole job itself to avoid build up in high traffic cases
+ $eJob->removeDuplicates = $deduplicate;
+
+ return $eJob;
}
public function run() {
diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
index e5e521c3..a9010c25 100644
--- a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
+++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
@@ -34,7 +34,7 @@
* @ingroup JobQueue
*/
class HTMLCacheUpdateJob extends Job {
- function __construct( $title, $params = '' ) {
+ function __construct( Title $title, array $params ) {
parent::__construct( 'htmlCacheUpdate', $title, $params );
// Base backlink purge jobs can be de-duplicated
$this->removeDuplicates = ( !isset( $params['range'] ) && !isset( $params['pages'] ) );
diff --git a/includes/jobqueue/jobs/NullJob.php b/includes/jobqueue/jobs/NullJob.php
index f94d6ebc..26d3c5c8 100644
--- a/includes/jobqueue/jobs/NullJob.php
+++ b/includes/jobqueue/jobs/NullJob.php
@@ -49,7 +49,7 @@ class NullJob extends Job {
* @param Title $title
* @param array $params Job parameters (lives, usleep)
*/
- function __construct( $title, $params ) {
+ function __construct( Title $title, array $params ) {
parent::__construct( 'null', $title, $params );
if ( !isset( $this->params['lives'] ) ) {
$this->params['lives'] = 1;
diff --git a/includes/jobqueue/jobs/PublishStashedFileJob.php b/includes/jobqueue/jobs/PublishStashedFileJob.php
index a922dd3d..8a180ec3 100644
--- a/includes/jobqueue/jobs/PublishStashedFileJob.php
+++ b/includes/jobqueue/jobs/PublishStashedFileJob.php
@@ -29,7 +29,7 @@
* @ingroup JobQueue
*/
class PublishStashedFileJob extends Job {
- public function __construct( $title, $params ) {
+ public function __construct( Title $title, array $params ) {
parent::__construct( 'PublishStashedFile', $title, $params );
$this->removeDuplicates = true;
}
diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php
index cc04595d..d6fa26b8 100644
--- a/includes/jobqueue/jobs/RecentChangesUpdateJob.php
+++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php
@@ -27,7 +27,7 @@
* @since 1.25
*/
class RecentChangesUpdateJob extends Job {
- function __construct( $title, $params ) {
+ function __construct( Title $title, array $params ) {
parent::__construct( 'recentChangesUpdate', $title, $params );
if ( !isset( $params['type'] ) ) {
@@ -75,11 +75,13 @@ class RecentChangesUpdateJob extends Job {
$lockKey = wfWikiID() . ':recentchanges-prune';
$dbw = wfGetDB( DB_MASTER );
- if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) {
+ if ( !$dbw->lockIsFree( $lockKey, __METHOD__ )
+ || !$dbw->lock( $lockKey, __METHOD__, 1 )
+ ) {
return; // already in progress
}
- $batchSize = 100; // Avoid slave lag
+ $batchSize = 100; // avoid slave lag
$cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
do {
$rcIds = $dbw->selectFieldValues( 'recentchanges',
diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php
index 1252b0b5..935d2fb1 100644
--- a/includes/jobqueue/jobs/RefreshLinksJob.php
+++ b/includes/jobqueue/jobs/RefreshLinksJob.php
@@ -37,7 +37,9 @@
class RefreshLinksJob extends Job {
const PARSE_THRESHOLD_SEC = 1.0;
- function __construct( $title, $params = '' ) {
+ const CLOCK_FUDGE = 10;
+
+ function __construct( Title $title, array $params ) {
parent::__construct( 'refreshLinks', $title, $params );
// A separate type is used just for cascade-protected backlinks
if ( !empty( $this->params['prioritize'] ) ) {
@@ -109,9 +111,6 @@ class RefreshLinksJob extends Job {
* @return bool
*/
protected function runForTitle( Title $title = null ) {
- $linkCache = LinkCache::singleton();
- $linkCache->clear();
-
if ( is_null( $title ) ) {
$this->setLastError( "refreshLinks: Invalid title" );
return false;
@@ -124,14 +123,18 @@ class RefreshLinksJob extends Job {
wfGetLB()->waitFor( $this->params['masterPos'] );
}
- $page = WikiPage::factory( $title );
+ // Clear out title cache data from prior job transaction snapshots
+ $linkCache = LinkCache::singleton();
+ $linkCache->clear();
- // Fetch the current revision...
+ // Fetch the current page and revision...
+ $page = WikiPage::factory( $title );
$revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL );
if ( !$revision ) {
$this->setLastError( "refreshLinks: Article not found {$title->getPrefixedDBkey()}" );
return false; // XXX: what if it was just deleted?
}
+
$content = $revision->getContent( Revision::RAW );
if ( !$content ) {
// If there is no content, pretend the content is empty
@@ -140,34 +143,50 @@ class RefreshLinksJob extends Job {
$parserOutput = false;
$parserOptions = $page->makeParserOptions( 'canonical' );
- // If page_touched changed after this root job (with a good slave lag skew factor),
- // then it is likely that any views of the pages already resulted in re-parses which
- // are now in cache. This can be reused to avoid expensive parsing in some cases.
+ // If page_touched changed after this root job, then it is likely that
+ // any views of the pages already resulted in re-parses which are now in
+ // cache. The cache can be reused to avoid expensive parsing in some cases.
if ( isset( $this->params['rootJobTimestamp'] ) ) {
- $skewedTimestamp = wfTimestamp( TS_UNIX, $this->params['rootJobTimestamp'] ) + 5;
- if ( $page->getLinksTimestamp() > wfTimestamp( TS_MW, $skewedTimestamp ) ) {
+ $opportunistic = !empty( $this->params['isOpportunistic'] );
+
+ $skewedTimestamp = $this->params['rootJobTimestamp'];
+ if ( $opportunistic ) {
+ // Neither clock skew nor DB snapshot/slave lag matter much for such
+ // updates; focus on reusing the (often recently updated) cache
+ } else {
+ // For transclusion updates, the template changes must be reflected
+ $skewedTimestamp = wfTimestamp( TS_MW,
+ wfTimestamp( TS_UNIX, $skewedTimestamp ) + self::CLOCK_FUDGE
+ );
+ }
+
+ if ( $page->getLinksTimestamp() > $skewedTimestamp ) {
// Something already updated the backlinks since this job was made
return true;
}
- if ( $page->getTouched() > wfTimestamp( TS_MW, $skewedTimestamp ) ) {
+
+ if ( $page->getTouched() >= $skewedTimestamp || $opportunistic ) {
+ // Something bumped page_touched since this job was made
+ // or the cache is otherwise suspected to be up-to-date
$parserOutput = ParserCache::singleton()->getDirty( $page, $parserOptions );
- if ( $parserOutput && $parserOutput->getCacheTime() <= $skewedTimestamp ) {
+ if ( $parserOutput && $parserOutput->getCacheTime() < $skewedTimestamp ) {
$parserOutput = false; // too stale
}
}
}
+
// Fetch the current revision and parse it if necessary...
if ( $parserOutput == false ) {
$start = microtime( true );
// Revision ID must be passed to the parser output to get revision variables correct
$parserOutput = $content->getParserOutput(
$title, $revision->getId(), $parserOptions, false );
- $ellapsed = microtime( true ) - $start;
+ $elapsed = microtime( true ) - $start;
// If it took a long time to render, then save this back to the cache to avoid
// wasted CPU by other apaches or job runners. We don't want to always save to
// cache as this can cause high cache I/O and LRU churn when a template changes.
- if ( $ellapsed >= self::PARSE_THRESHOLD_SEC
- && $page->isParserCacheUsed( $parserOptions, $revision->getId() )
+ if ( $elapsed >= self::PARSE_THRESHOLD_SEC
+ && $page->shouldCheckParserCache( $parserOptions, $revision->getId() )
&& $parserOutput->isCacheable()
) {
$ctime = wfTimestamp( TS_MW, (int)$start ); // cache time
diff --git a/includes/jobqueue/jobs/ThumbnailRenderJob.php b/includes/jobqueue/jobs/ThumbnailRenderJob.php
index ab381388..f558c488 100644
--- a/includes/jobqueue/jobs/ThumbnailRenderJob.php
+++ b/includes/jobqueue/jobs/ThumbnailRenderJob.php
@@ -27,7 +27,7 @@
* @ingroup JobQueue
*/
class ThumbnailRenderJob extends Job {
- public function __construct( $title, $params ) {
+ public function __construct( Title $title, array $params ) {
parent::__construct( 'ThumbnailRender', $title, $params );
}
@@ -50,16 +50,16 @@ class ThumbnailRenderJob extends Job {
return false;
}
} elseif ( $wgUploadThumbnailRenderMethod === 'http' ) {
- $status = $this->hitThumbUrl( $file, $transformParams );
+ $thumbUrl = '';
+ $status = $this->hitThumbUrl( $file, $transformParams, $thumbUrl );
wfDebug( __METHOD__ . ": received status {$status}\n" );
- if ( $status === 200 || $status === 301 || $status === 302 ) {
+ // 400 happens when requesting a size greater or equal than the original
+ if ( $status === 200 || $status === 301 || $status === 302 || $status === 400 ) {
return true;
} elseif ( $status ) {
- // Note that this currently happens (500) when requesting sizes larger then or
- // equal to the original, which is harmless.
- $this->setLastError( __METHOD__ . ': incorrect HTTP status ' . $status );
+ $this->setLastError( __METHOD__ . ': incorrect HTTP status ' . $status . ' when hitting ' . $thumbUrl );
return false;
} else {
$this->setLastError( __METHOD__ . ': HTTP request failure' );
@@ -75,7 +75,7 @@ class ThumbnailRenderJob extends Job {
}
}
- protected function hitThumbUrl( $file, $transformParams ) {
+ protected function hitThumbUrl( $file, $transformParams, &$thumbUrl ) {
global $wgUploadThumbnailRenderHttpCustomHost, $wgUploadThumbnailRenderHttpCustomDomain;
$thumbName = $file->thumbName( $transformParams );
diff --git a/includes/jobqueue/jobs/UploadFromUrlJob.php b/includes/jobqueue/jobs/UploadFromUrlJob.php
index d15fd025..ade48106 100644
--- a/includes/jobqueue/jobs/UploadFromUrlJob.php
+++ b/includes/jobqueue/jobs/UploadFromUrlJob.php
@@ -39,7 +39,7 @@ class UploadFromUrlJob extends Job {
/** @var User */
protected $user;
- public function __construct( $title, $params ) {
+ public function __construct( Title $title, array $params ) {
parent::__construct( 'uploadFromUrl', $title, $params );
}
@@ -166,11 +166,11 @@ class UploadFromUrlJob extends Job {
}
/**
- * Initialize the session data. Sets the intial result to queued.
+ * Initialize the session data. Sets the initial result to queued.
*/
public function initializeSessionData() {
$session =& self::getSessionData( $this->params['sessionKey'] );
- $$session['result'] = 'Queued';
+ $session['result'] = 'Queued';
}
/**