diff options
author | Pierre Schmitz <pierre@archlinux.de> | 2015-06-04 07:31:04 +0200 |
---|---|---|
committer | Pierre Schmitz <pierre@archlinux.de> | 2015-06-04 07:58:39 +0200 |
commit | f6d65e533c62f6deb21342d4901ece24497b433e (patch) | |
tree | f28adf0362d14bcd448f7b65a7aaf38650f923aa /includes/jobqueue | |
parent | c27b2e832fe25651ef2410fae85b41072aae7519 (diff) |
Update to MediaWiki 1.25.1
Diffstat (limited to 'includes/jobqueue')
22 files changed, 770 insertions, 786 deletions
diff --git a/includes/jobqueue/Job.php b/includes/jobqueue/Job.php index ee3f2c2b..f8de0b5d 100644 --- a/includes/jobqueue/Job.php +++ b/includes/jobqueue/Job.php @@ -47,20 +47,12 @@ abstract class Job implements IJobSpecification { /** @var string Text for error that occurred last */ protected $error; - /*------------------------------------------------------------------------- - * Abstract functions - *------------------------------------------------------------------------*/ - /** * Run the job * @return bool Success */ abstract public function run(); - /*------------------------------------------------------------------------- - * Static functions - *------------------------------------------------------------------------*/ - /** * Create the appropriate object to handle a specific job * @@ -81,80 +73,37 @@ abstract class Job implements IJobSpecification { } /** + * @param string $command + * @param Title $title + * @param array|bool $params Can not be === true + */ + public function __construct( $command, $title, $params = false ) { + $this->command = $command; + $this->title = $title; + $this->params = $params; + + // expensive jobs may set this to true + $this->removeDuplicates = false; + } + + /** * Batch-insert a group of jobs into the queue. * This will be wrapped in a transaction with a forced commit. * * This may add duplicate at insert time, but they will be * removed later on, when the first one is popped. * - * @param array $jobs Array of Job objects + * @param Job[] $jobs Array of Job objects * @return bool * @deprecated since 1.21 */ public static function batchInsert( $jobs ) { + wfDeprecated( __METHOD__, '1.21' ); JobQueueGroup::singleton()->push( $jobs ); return true; } /** - * Insert a group of jobs into the queue. - * - * Same as batchInsert() but does not commit and can thus - * be rolled-back as part of a larger transaction. However, - * large batches of jobs can cause slave lag. - * - * @param array $jobs Array of Job objects - * @return bool - * @deprecated since 1.21 - */ - public static function safeBatchInsert( $jobs ) { - JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC ); - return true; - } - - /** - * Pop a job of a certain type. This tries less hard than pop() to - * actually find a job; it may be adversely affected by concurrent job - * runners. - * - * @param string $type - * @return Job|bool Returns false if there are no jobs - * @deprecated since 1.21 - */ - public static function pop_type( $type ) { - return JobQueueGroup::singleton()->get( $type )->pop(); - } - - /** - * Pop a job off the front of the queue. - * This is subject to $wgJobTypesExcludedFromDefaultQueue. - * - * @return Job|bool False if there are no jobs - * @deprecated since 1.21 - */ - public static function pop() { - return JobQueueGroup::singleton()->pop(); - } - - /*------------------------------------------------------------------------- - * Non-static functions - *------------------------------------------------------------------------*/ - - /** - * @param string $command - * @param Title $title - * @param array|bool $params - */ - public function __construct( $command, $title, $params = false ) { - $this->command = $command; - $this->title = $title; - $this->params = $params; - - // expensive jobs may set this to true - $this->removeDuplicates = false; - } - - /** * @return string */ public function getType() { @@ -186,7 +135,15 @@ abstract class Job implements IJobSpecification { } /** - * @return bool Whether only one of each identical set of jobs should be run + * Whether the queue should reject insertion of this job if a duplicate exists + * + * This can be used to avoid duplicated effort or combined with delayed jobs to + * coalesce updates into larger batches. Claimed jobs are never treated as + * duplicates of new jobs, and some queues may allow a few duplicates due to + * network partitions and fail-over. Thus, additional locking is needed to + * enforce mutual exclusion if this is really needed. + * + * @return bool */ public function ignoreDuplicates() { return $this->removeDuplicates; @@ -231,6 +188,8 @@ abstract class Job implements IJobSpecification { unset( $info['params']['rootJobTimestamp'] ); // Likewise for jobs with different delay times unset( $info['params']['jobReleaseTimestamp'] ); + // Queues pack and hash this array, so normalize the order + ksort( $info['params'] ); } return $info; @@ -315,7 +274,7 @@ abstract class Job implements IJobSpecification { break; } } - if ( $filteredValue ) { + if ( $filteredValue && count( $filteredValue ) < 10 ) { $value = FormatJson::encode( $filteredValue ); } else { $value = "array(" . count( $value ) . ")"; @@ -328,16 +287,25 @@ abstract class Job implements IJobSpecification { } } - if ( is_object( $this->title ) ) { - $s = "{$this->command} " . $this->title->getPrefixedDBkey(); - if ( $paramString !== '' ) { - $s .= ' ' . $paramString; + $metaString = ''; + foreach ( $this->metadata as $key => $value ) { + if ( is_scalar( $value ) && mb_strlen( $value ) < 1024 ) { + $metaString .= ( $metaString ? ",$key=$value" : "$key=$value" ); } + } - return $s; - } else { - return "{$this->command} $paramString"; + $s = $this->command; + if ( is_object( $this->title ) ) { + $s .= " {$this->title->getPrefixedDBkey()}"; } + if ( $paramString != '' ) { + $s .= " $paramString"; + } + if ( $metaString != '' ) { + $s .= " ($metaString)"; + } + + return $s; } protected function setLastError( $error ) { diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index c00d22e9..91fe86cf 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -44,11 +44,10 @@ abstract class JobQueue { /** @var int Maximum number of times to try a job */ protected $maxTries; - /** @var bool Allow delayed jobs */ - protected $checkDelay; - /** @var BagOStuff */ protected $dupCache; + /** @var JobQueueAggregator */ + protected $aggr; const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions @@ -71,11 +70,10 @@ abstract class JobQueue { if ( !in_array( $this->order, $this->supportedOrders() ) ) { throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); } - $this->checkDelay = !empty( $params['checkDelay'] ); - if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { - throw new MWException( __CLASS__ . " does not support delayed jobs." ); - } $this->dupCache = wfGetCache( CACHE_ANYTHING ); + $this->aggr = isset( $params['aggregator'] ) + ? $params['aggregator'] + : new JobQueueAggregatorNull( array() ); } /** @@ -98,10 +96,6 @@ abstract class JobQueue { * but not acknowledged as completed after this many seconds. Recycling * of jobs simple means re-inserting them into the queue. Jobs can be * attempted up to three times before being discarded. - * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. - * This lets delayed jobs wait in a staging area until a given timestamp is - * reached, at which point they will enter the queue. If this is not enabled - * or not supported, an exception will be thrown on delayed job insertion. * * Queue classes should throw an exception if they do not support the options given. * @@ -144,14 +138,6 @@ abstract class JobQueue { } /** - * @return bool Whether delayed jobs are enabled - * @since 1.22 - */ - final public function delayedJobsEnabled() { - return $this->checkDelay; - } - - /** * Get the allowed queue orders for configuration validation * * @return array Subset of (random, timestamp, fifo, undefined) @@ -175,6 +161,14 @@ abstract class JobQueue { } /** + * @return bool Whether delayed jobs are enabled + * @since 1.22 + */ + final public function delayedJobsEnabled() { + return $this->supportsDelayedJobs(); + } + + /** * Quickly check if the queue has no available (unacquired, non-delayed) jobs. * Queue classes should use caching if they are any slower without memcached. * @@ -187,9 +181,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function isEmpty() { - wfProfileIn( __METHOD__ ); $res = $this->doIsEmpty(); - wfProfileOut( __METHOD__ ); return $res; } @@ -210,9 +202,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function getSize() { - wfProfileIn( __METHOD__ ); $res = $this->doGetSize(); - wfProfileOut( __METHOD__ ); return $res; } @@ -233,9 +223,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function getAcquiredCount() { - wfProfileIn( __METHOD__ ); $res = $this->doGetAcquiredCount(); - wfProfileOut( __METHOD__ ); return $res; } @@ -257,9 +245,7 @@ abstract class JobQueue { * @since 1.22 */ final public function getDelayedCount() { - wfProfileIn( __METHOD__ ); $res = $this->doGetDelayedCount(); - wfProfileOut( __METHOD__ ); return $res; } @@ -282,9 +268,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function getAbandonedCount() { - wfProfileIn( __METHOD__ ); $res = $this->doGetAbandonedCount(); - wfProfileOut( __METHOD__ ); return $res; } @@ -308,7 +292,8 @@ abstract class JobQueue { * @throws JobQueueError */ final public function push( $jobs, $flags = 0 ) { - $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + $this->batchPush( $jobs, $flags ); } /** @@ -330,15 +315,14 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); - } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) { throw new MWException( "Got delayed '{$job->getType()}' job; delays are not supported." ); } } - wfProfileIn( __METHOD__ ); $this->doBatchPush( $jobs, $flags ); - wfProfileOut( __METHOD__ ); + $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); } /** @@ -366,9 +350,11 @@ abstract class JobQueue { throw new MWException( "Unrecognized job type '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $job = $this->doPop(); - wfProfileOut( __METHOD__ ); + + if ( !$job ) { + $this->aggr->notifyQueueEmpty( $this->wiki, $this->type ); + } // Flag this job as an old duplicate based on its "root" job... try { @@ -376,7 +362,7 @@ abstract class JobQueue { JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki ); $job = DuplicateJob::newFromJob( $job ); // convert to a no-op } - } catch ( MWException $e ) { + } catch ( Exception $e ) { // don't lose jobs over this } @@ -403,9 +389,7 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $this->doAck( $job ); - wfProfileOut( __METHOD__ ); } /** @@ -449,9 +433,7 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $ok = $this->doDeduplicateRootJob( $job ); - wfProfileOut( __METHOD__ ); return $ok; } @@ -494,9 +476,7 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); - wfProfileOut( __METHOD__ ); return $isDuplicate; } @@ -538,9 +518,7 @@ abstract class JobQueue { * @return void */ final public function delete() { - wfProfileIn( __METHOD__ ); $this->doDelete(); - wfProfileOut( __METHOD__ ); } /** @@ -560,9 +538,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function waitForBackups() { - wfProfileIn( __METHOD__ ); $this->doWaitForBackups(); - wfProfileOut( __METHOD__ ); } /** @@ -607,9 +583,7 @@ abstract class JobQueue { * @return void */ final public function flushCaches() { - wfProfileIn( __METHOD__ ); $this->doFlushCaches(); - wfProfileOut( __METHOD__ ); } /** @@ -642,6 +616,17 @@ abstract class JobQueue { } /** + * Get an iterator to traverse over all abandoned jobs in this queue + * + * @return Iterator + * @throws JobQueueError + * @since 1.25 + */ + public function getAllAbandonedJobs() { + return new ArrayIterator( array() ); // not implemented + } + + /** * Do not use this function outside of JobQueue/JobQueueGroup * * @return string @@ -661,7 +646,6 @@ abstract class JobQueue { * @since 1.22 */ final public function getSiblingQueuesWithJobs( array $types ) { - $section = new ProfileSection( __METHOD__ ); return $this->doGetSiblingQueuesWithJobs( $types ); } @@ -686,7 +670,6 @@ abstract class JobQueue { * @since 1.22 */ final public function getSiblingQueueSizes( array $types ) { - $section = new ProfileSection( __METHOD__ ); return $this->doGetSiblingQueueSizes( $types ); } diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index 08873cc1..d5f47ffd 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -221,7 +221,7 @@ class JobQueueDB extends JobQueue { } $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated - $rowList = array(); // list of jobs for jobs that are are not de-duplicated + $rowList = array(); // list of jobs for jobs that are not de-duplicated foreach ( $jobs as $job ) { $row = $this->insertFields( $job ); if ( $job->ignoreDuplicates() ) { @@ -556,7 +556,7 @@ class JobQueueDB extends JobQueue { * @return void */ protected function doWaitForBackups() { - wfWaitForSlaves(); + wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false ); } /** @@ -686,7 +686,9 @@ class JobQueueDB extends JobQueue { $affected = $dbw->affectedRows(); $count += $affected; JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki ); + // The tasks recycled jobs or release delayed jobs into the queue $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); } } diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php index c4301eed..d985d449 100644 --- a/includes/jobqueue/JobQueueFederated.php +++ b/includes/jobqueue/JobQueueFederated.php @@ -49,20 +49,12 @@ class JobQueueFederated extends JobQueue { /** @var HashRing */ protected $partitionRing; - /** @var HashRing */ - protected $partitionPushRing; /** @var array (partition name => JobQueue) reverse sorted by weight */ protected $partitionQueues = array(); - /** @var BagOStuff */ - protected $cache; - /** @var int Maximum number of partitions to try */ protected $maxPartitionsTry; - const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating - const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date - /** * @param array $params Possible keys: * - sectionsByWiki : A map of wiki IDs to section names. @@ -72,10 +64,8 @@ class JobQueueFederated extends JobQueue { * have explicitly defined sections. * - configByPartition : Map of queue partition names to configuration arrays. * These configuration arrays are passed to JobQueue::factory(). - * The options set here are overriden by those passed to this + * The options set here are overridden by those passed to this * the federated queue itself (e.g. 'order' and 'claimTTL'). - * - partitionsNoPush : List of partition names that can handle pop() but not push(). - * This can be used to migrate away from a certain partition. * - maxPartitionsTry : Maximum number of times to attempt job insertion using * different partition queues. This improves availability * during failure, at the cost of added latency and somewhat @@ -96,17 +86,10 @@ class JobQueueFederated extends JobQueue { // Get the full partition map $partitionMap = $params['partitionsBySection'][$section]; arsort( $partitionMap, SORT_NUMERIC ); - // Get the partitions jobs can actually be pushed to - $partitionPushMap = $partitionMap; - if ( isset( $params['partitionsNoPush'] ) ) { - foreach ( $params['partitionsNoPush'] as $partition ) { - unset( $partitionPushMap[$partition] ); - } - } // Get the config to pass to merge into each partition queue config $baseConfig = $params; foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry', - 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o + 'partitionsBySection', 'configByPartition', ) as $o ) { unset( $baseConfig[$o] ); // partition queue doesn't care about this } @@ -120,14 +103,6 @@ class JobQueueFederated extends JobQueue { } // Ring of all partitions $this->partitionRing = new HashRing( $partitionMap ); - // Get the ring of partitions to push jobs into - if ( count( $partitionPushMap ) === count( $partitionMap ) ) { - $this->partitionPushRing = clone $this->partitionRing; // faster - } else { - $this->partitionPushRing = new HashRing( $partitionPushMap ); - } - // Aggregate cache some per-queue values if there are multiple partition queues - $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); } protected function supportedOrders() { @@ -140,19 +115,16 @@ class JobQueueFederated extends JobQueue { } protected function supportsDelayedJobs() { - return true; // defer checks to the partitions + foreach ( $this->partitionQueues as $queue ) { + if ( !$queue->supportsDelayedJobs() ) { + return false; + } + } + + return true; } protected function doIsEmpty() { - $key = $this->getCacheKey( 'empty' ); - - $isEmpty = $this->cache->get( $key ); - if ( $isEmpty === 'true' ) { - return true; - } elseif ( $isEmpty === 'false' ) { - return false; - } - $empty = true; $failed = 0; foreach ( $this->partitionQueues as $queue ) { @@ -160,12 +132,11 @@ class JobQueueFederated extends JobQueue { $empty = $empty && $queue->doIsEmpty(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); - $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG ); return $empty; } @@ -191,32 +162,24 @@ class JobQueueFederated extends JobQueue { * @return int */ protected function getCrossPartitionSum( $type, $method ) { - $key = $this->getCacheKey( $type ); - - $count = $this->cache->get( $key ); - if ( $count !== false ) { - return $count; - } - + $count = 0; $failed = 0; foreach ( $this->partitionQueues as $queue ) { try { $count += $queue->$method(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); - return $count; } protected function doBatchPush( array $jobs, $flags ) { // Local ring variable that may be changed to point to a new ring on failure - $partitionRing = $this->partitionPushRing; + $partitionRing = $this->partitionRing; // Try to insert the jobs and update $partitionsTry on any failures. // Retry to insert any remaning jobs again, ignoring the bad partitions. $jobsLeft = $jobs; @@ -277,12 +240,9 @@ class JobQueueFederated extends JobQueue { $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); } catch ( JobQueueError $e ) { $ok = false; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } - if ( $ok ) { - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); - } else { + if ( !$ok ) { if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist throw new JobQueueError( "Could not insert job(s), no partitions available." ); } @@ -299,12 +259,9 @@ class JobQueueFederated extends JobQueue { $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); } catch ( JobQueueError $e ) { $ok = false; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } - if ( $ok ) { - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); - } else { + if ( !$ok ) { if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist throw new JobQueueError( "Could not insert job(s), no partitions available." ); } @@ -331,7 +288,7 @@ class JobQueueFederated extends JobQueue { $job = $queue->pop(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); $job = false; } if ( $job ) { @@ -344,9 +301,6 @@ class JobQueueFederated extends JobQueue { } $this->throwErrorIfAllPartitionsDown( $failed ); - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'true', self::CACHE_TTL_LONG ); - return false; } @@ -361,12 +315,12 @@ class JobQueueFederated extends JobQueue { protected function doIsRootJobOldDuplicate( Job $job ) { $params = $job->getRootJobParams(); $sigature = $params['rootJobSignature']; - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + $partition = $this->partitionRing->getLiveLocation( $sigature ); try { return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); } catch ( JobQueueError $e ) { - if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionRing->getLiveLocation( $sigature ); return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); } } @@ -377,12 +331,12 @@ class JobQueueFederated extends JobQueue { protected function doDeduplicateRootJob( Job $job ) { $params = $job->getRootJobParams(); $sigature = $params['rootJobSignature']; - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + $partition = $this->partitionRing->getLiveLocation( $sigature ); try { return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); } catch ( JobQueueError $e ) { - if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionRing->getLiveLocation( $sigature ); return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); } } @@ -398,7 +352,7 @@ class JobQueueFederated extends JobQueue { $queue->doDelete(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -413,7 +367,7 @@ class JobQueueFederated extends JobQueue { $queue->waitForBackups(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -440,10 +394,6 @@ class JobQueueFederated extends JobQueue { 'abandonedcount' ); - foreach ( $types as $type ) { - $this->cache->delete( $this->getCacheKey( $type ) ); - } - /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) { $queue->doFlushCaches(); @@ -472,6 +422,17 @@ class JobQueueFederated extends JobQueue { return $iterator; } + public function getAllAbandonedJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllAbandonedJobs() ); + } + + return $iterator; + } + public function getCoalesceLocationInternal() { return "JobQueueFederated:wiki:{$this->wiki}" . sha1( serialize( array_keys( $this->partitionQueues ) ) ); @@ -495,7 +456,7 @@ class JobQueueFederated extends JobQueue { } } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -519,7 +480,7 @@ class JobQueueFederated extends JobQueue { } } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -527,6 +488,10 @@ class JobQueueFederated extends JobQueue { return $result; } + protected function logException( Exception $e ) { + wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() ); + } + /** * Throw an error if no partitions available * @@ -546,14 +511,4 @@ class JobQueueFederated extends JobQueue { $queue->setTestingPrefix( $key ); } } - - /** - * @param string $property - * @return string - */ - private function getCacheKey( $property ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); - } } diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php index 98a78c5e..ebd547a0 100644 --- a/includes/jobqueue/JobQueueGroup.php +++ b/includes/jobqueue/JobQueueGroup.php @@ -94,6 +94,7 @@ class JobQueueGroup { } else { $conf = $conf + $wgJobTypeConf['default']; } + $conf['aggregator'] = JobQueueAggregator::singleton(); return JobQueue::factory( $conf ); } @@ -104,7 +105,7 @@ class JobQueueGroup { * This inserts the jobs into the queue specified by $wgJobTypeConf * and updates the aggregate job queue information cache as needed. * - * @param Job|array $jobs A single Job or a list of Jobs + * @param Job|Job[] $jobs A single Job or a list of Jobs * @throws MWException * @return void */ @@ -125,7 +126,6 @@ class JobQueueGroup { foreach ( $jobsByType as $type => $jobs ) { $this->get( $type )->push( $jobs ); - JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); } if ( $this->cache->has( 'queues-ready', 'list' ) ) { @@ -153,9 +153,6 @@ class JobQueueGroup { if ( is_string( $qtype ) ) { // specific job type if ( !in_array( $qtype, $blacklist ) ) { $job = $this->get( $qtype )->pop(); - if ( !$job ) { - JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); - } } } else { // any job in the "default" jobs types if ( $flags & self::USE_CACHE ) { @@ -179,7 +176,6 @@ class JobQueueGroup { if ( $job ) { // found break; } else { // not found - JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); $this->cache->clear( 'queues-ready' ); } } @@ -220,12 +216,10 @@ class JobQueueGroup { public function waitForBackups() { global $wgJobTypeConf; - wfProfileIn( __METHOD__ ); // Try to avoid doing this more than once per queue storage medium foreach ( $wgJobTypeConf as $type => $conf ) { $this->get( $type )->waitForBackups(); } - wfProfileOut( __METHOD__ ); } /** @@ -383,10 +377,6 @@ class JobQueueGroup { } } } - // The tasks may have recycled jobs or release delayed jobs into the queue - if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) { - JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); - } } if ( $count === 0 ) { diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 3519eac8..6c823fb9 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -24,7 +24,7 @@ /** * Class to handle job queues stored in Redis * - * This is faster, less resource intensive, queue that JobQueueDB. + * This is a faster and less resource-intensive job queue than JobQueueDB. * All data for a queue using this class is placed into one redis server. * * There are eight main redis keys used to track jobs: @@ -49,7 +49,7 @@ * * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. * Additionally, it should be noted that redis has different persistence modes, such - * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be + * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be * made on the servers based on what queues are using it and what tolerance they have. * * @ingroup JobQueue @@ -64,8 +64,6 @@ class JobQueueRedis extends JobQueue { protected $server; /** @var string Compression method to use */ protected $compression; - /** @var bool */ - protected $daemonized; const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) @@ -90,7 +88,11 @@ class JobQueueRedis extends JobQueue { $this->server = $params['redisServer']; $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); - $this->daemonized = !empty( $params['daemonized'] ); + if ( empty( $params['daemonized'] ) ) { + throw new Exception( + "Non-daemonized mode is no longer supported. Please install the " . + "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); + } } protected function supportedOrders() { @@ -134,9 +136,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAcquiredCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { $conn->multi( Redis::PIPELINE ); @@ -155,9 +154,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetDelayedCount() { - if ( !$this->checkDelay ) { - return 0; // no delayed jobs - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); @@ -172,9 +168,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAbandonedCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); @@ -299,24 +292,10 @@ LUA; protected function doPop() { $job = false; - // Push ready delayed jobs into the queue every 10 jobs to spread the load. - // This is also done as a periodic task, but we don't want too much done at once. - if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $conn = $this->getConnection(); try { do { - if ( $this->claimTTL > 0 ) { - // Keep the claimed job list down for high-traffic queues - if ( mt_rand( 0, 99 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $blob = $this->popAndAcquireBlob( $conn ); - } else { - $blob = $this->popAndDeleteBlob( $conn ); - } + $blob = $this->popAndAcquireBlob( $conn ); if ( !is_string( $blob ) ) { break; // no jobs; nothing to do } @@ -328,7 +307,7 @@ LUA; continue; } - // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed + // If $item is invalid, the runner loop recyling will cleanup as needed $job = $this->getJobFromFields( $item ); // may be false } while ( !$job ); // job may be false if invalid } catch ( RedisException $e ) { @@ -343,39 +322,6 @@ LUA; * @return array Serialized string or false * @throws RedisException */ - protected function popAndDeleteBlob( RedisConnRef $conn ) { - static $script = -<<<LUA - local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS) - -- Pop an item off the queue - local id = redis.call('rpop',kUnclaimed) - if not id then return false end - -- Get the job data and remove it - local item = redis.call('hGet',kData,id) - redis.call('hDel',kData,id) - -- Allow new duplicates of this job - local sha1 = redis.call('hGet',kSha1ById,id) - if sha1 then redis.call('hDel',kIdBySha1,sha1) end - redis.call('hDel',kSha1ById,id) - -- Return the job data - return item -LUA; - return $conn->luaEval( $script, - array( - $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] - $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] - $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - ), - 4 # number of first argument(s) that are keys - ); - } - - /** - * @param RedisConnRef $conn - * @return array Serialized string or false - * @throws RedisException - */ protected function popAndAcquireBlob( RedisConnRef $conn ) { static $script = <<<LUA @@ -416,36 +362,35 @@ LUA; if ( !isset( $job->metadata['uuid'] ) ) { throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); } - if ( $this->claimTTL > 0 ) { - $conn = $this->getConnection(); - try { - static $script = + + $conn = $this->getConnection(); + try { + static $script = <<<LUA - local kClaimed, kAttempts, kData = unpack(KEYS) - -- Unmark the job as claimed - redis.call('zRem',kClaimed,ARGV[1]) - redis.call('hDel',kAttempts,ARGV[1]) - -- Delete the job data itself - return redis.call('hDel',kData,ARGV[1]) + local kClaimed, kAttempts, kData = unpack(KEYS) + -- Unmark the job as claimed + redis.call('zRem',kClaimed,ARGV[1]) + redis.call('hDel',kAttempts,ARGV[1]) + -- Delete the job data itself + return redis.call('hDel',kData,ARGV[1]) LUA; - $res = $conn->luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'h-data' ), # KEYS[3] - $job->metadata['uuid'] # ARGV[1] - ), - 3 # number of first argument(s) that are keys - ); - - if ( !$res ) { - wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); - - return false; - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $job->metadata['uuid'] # ARGV[1] + ), + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + + return false; } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); } return true; @@ -571,6 +516,29 @@ LUA; } } + /** + * @see JobQueue::getAllAbandonedJobs() + * @return Iterator + */ + public function getAllAbandonedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + + return new MappedIterator( // delayed jobs + $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ), + function ( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + public function getCoalesceLocationInternal() { return "RedisServer:" . $this->server; } @@ -630,115 +598,10 @@ LUA; } /** - * Recycle or destroy any jobs that have been claimed for too long - * and release any ready delayed jobs into the queue - * - * @return int Number of jobs recycled/deleted/undelayed - * @throws MWException|JobQueueError - */ - public function recyclePruneAndUndelayJobs() { - $count = 0; - // For each job item that can be retried, we need to add it back to the - // main queue and remove it from the list of currenty claimed job items. - // For those that cannot, they are marked as dead and kept around for - // investigation and manual job restoration but are eventually deleted. - $conn = $this->getConnection(); - try { - $now = time(); - static $script = -<<<LUA - local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS) - local released,abandoned,pruned,undelayed = 0,0,0,0 - -- Get all non-dead jobs that have an expired claim on them. - -- The score for each item is the last claim timestamp (UNIX). - local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1]) - for k,id in ipairs(staleClaims) do - local timestamp = redis.call('zScore',kClaimed,id) - local attempts = redis.call('hGet',kAttempts,id) - if attempts < ARGV[3] then - -- Claim expired and retries left: re-enqueue the job - redis.call('lPush',kUnclaimed,id) - redis.call('hIncrBy',kAttempts,id,1) - released = released + 1 - else - -- Claim expired and no retries left: mark the job as dead - redis.call('zAdd',kAbandoned,timestamp,id) - abandoned = abandoned + 1 - end - redis.call('zRem',kClaimed,id) - end - -- Get all of the dead jobs that have been marked as dead for too long. - -- The score for each item is the last claim timestamp (UNIX). - local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2]) - for k,id in ipairs(deadClaims) do - -- Stale and out of retries: remove any traces of the job - redis.call('zRem',kAbandoned,id) - redis.call('hDel',kAttempts,id) - redis.call('hDel',kData,id) - pruned = pruned + 1 - end - -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp) - local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4]) - -- Migrate the jobs from the "delayed" set to the "unclaimed" list - for k,id in ipairs(ids) do - redis.call('lPush',kUnclaimed,id) - redis.call('zRem',kDelayed,id) - end - undelayed = #ids - return {released,abandoned,pruned,undelayed} -LUA; - $res = $conn->luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - $this->getQueueKey( 'z-abandoned' ), # KEYS[5] - $this->getQueueKey( 'z-delayed' ), # KEYS[6] - $now - $this->claimTTL, # ARGV[1] - $now - self::MAX_AGE_PRUNE, # ARGV[2] - $this->maxTries, # ARGV[3] - $now # ARGV[4] - ), - 6 # number of first argument(s) that are keys - ); - if ( $res ) { - list( $released, $abandoned, $pruned, $undelayed ) = $res; - $count += $released + $pruned + $undelayed; - JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); - JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); - JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki ); - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); - } - - return $count; - } - - /** * @return array */ protected function doGetPeriodicTasks() { - if ( $this->daemonized ) { - return array(); // managed in the runner loop - } - $periods = array( 3600 ); // standard cleanup (useful on config change) - if ( $this->claimTTL > 0 ) { - $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing - } - if ( $this->checkDelay ) { - $periods[] = 300; // 5 minutes - } - $period = min( $periods ); - $period = max( $period, 30 ); // sanity - - return array( - 'recyclePruneAndUndelayJobs' => array( - 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), - 'period' => $period, - ) - ); + return array(); // managed in the runner loop } /** diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 8cccedaf..b8c5d6cf 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -21,13 +21,17 @@ * @ingroup JobQueue */ +use MediaWiki\Logger\LoggerFactory; +use Psr\Log\LoggerAwareInterface; +use Psr\Log\LoggerInterface; + /** * Job queue runner utility methods * * @ingroup JobQueue * @since 1.24 */ -class JobRunner { +class JobRunner implements LoggerAwareInterface { /** @var callable|null Debug output handler */ protected $debug; @@ -39,6 +43,28 @@ class JobRunner { } /** + * @var LoggerInterface $logger + */ + protected $logger; + + /** + * @param LoggerInterface $logger + */ + public function setLogger( LoggerInterface $logger ) { + $this->logger = $logger; + } + + /** + * @param LoggerInterface $logger + */ + public function __construct( LoggerInterface $logger = null ) { + if ( $logger === null ) { + $logger = LoggerFactory::getInstance( 'runJobs' ); + } + $this->setLogger( $logger ); + } + + /** * Run jobs of the specified number/type for the specified time * * The response map has a 'job' field that lists status of each job, including: @@ -62,6 +88,8 @@ class JobRunner { * @return array Summary response that can easily be JSON serialized */ public function run( array $options ) { + global $wgJobClasses; + $response = array( 'jobs' => array(), 'reached' => 'none-ready' ); $type = isset( $options['type'] ) ? $options['type'] : false; @@ -69,11 +97,31 @@ class JobRunner { $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false; $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; + if ( $type !== false && !isset( $wgJobClasses[$type] ) ) { + $response['reached'] = 'none-possible'; + return $response; + } + $group = JobQueueGroup::singleton(); // Handle any required periodic queue maintenance $count = $group->executeReadyPeriodicTasks(); if ( $count > 0 ) { - $this->runJobsLog( "Executed $count periodic queue task(s)." ); + $msg = "Executed $count periodic queue task(s)."; + $this->logger->debug( $msg ); + $this->debugCallback( $msg ); + } + + // Bail out if in read-only mode + if ( wfReadOnly() ) { + $response['reached'] = 'read-only'; + return $response; + } + + // Bail out if there is too much DB lag + list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag(); + if ( $maxLag >= 5 ) { + $response['reached'] = 'slave-lag-limit'; + return $response; } // Flush any pending DB writes for sanity @@ -87,8 +135,10 @@ class JobRunner { $jobsRun = 0; $timeMsTotal = 0; $flags = JobQueueGroup::USE_CACHE; + $checkPeriod = 5.0; // seconds + $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes $startTime = microtime( true ); // time since jobs started running - $lastTime = microtime( true ); // time since last slave check + $lastTime = microtime( true ) - $checkPhase; // time since last slave check do { // Sync the persistent backoffs with concurrent runners $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); @@ -117,24 +167,24 @@ class JobRunner { $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); } - $this->runJobsLog( $job->toString() . " STARTING" ); + $msg = $job->toString() . " STARTING"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); // Run the job... - wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); $jobStartTime = microtime( true ); try { ++$jobsRun; $status = $job->run(); $error = $job->getLastError(); wfGetLBFactory()->commitMasterChanges(); - } catch ( MWException $e ) { + } catch ( Exception $e ) { MWExceptionHandler::rollbackMasterChangesAndLog( $e ); $status = false; $error = get_class( $e ) . ': ' . $e->getMessage(); MWExceptionHandler::logException( $e ); } $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); - wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); $timeMsTotal += $timeMs; // Mark the job as done on success or when the job cannot be retried @@ -151,9 +201,13 @@ class JobRunner { } if ( $status === false ) { - $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); + $msg = $job->toString() . " t=$timeMs error={$error}"; + $this->logger->error( $msg ); + $this->debugCallback( $msg ); } else { - $this->runJobsLog( $job->toString() . " t=$timeMs good" ); + $msg = $job->toString() . " t=$timeMs good"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); } $response['jobs'][] = array( @@ -172,10 +226,15 @@ class JobRunner { break; } - // Don't let any of the main DB slaves get backed up + // Don't let any of the main DB slaves get backed up. + // This only waits for so long before exiting and letting + // other wikis in the farm (on different masters) get a chance. $timePassed = microtime( true ) - $lastTime; if ( $timePassed >= 5 || $timePassed < 0 ) { - wfWaitForSlaves( $lastTime ); + if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) { + $response['reached'] = 'slave-lag-limit'; + break; + } $lastTime = microtime( true ); } // Don't let any queue slaves/backups fall behind @@ -239,7 +298,6 @@ class JobRunner { * @return array Map of (job type => backoff expiry timestamp) */ private function loadBackoffs( array $backoffs, $mode = 'wait' ) { - $section = new ProfileSection( __METHOD__ ); $file = wfTempDir() . '/mw-runJobs-backoffs.json'; if ( is_file( $file ) ) { @@ -278,7 +336,6 @@ class JobRunner { * @return array The new backoffs account for $backoffs and the latest file data */ private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { - $section = new ProfileSection( __METHOD__ ); if ( !$deltas ) { return $this->loadBackoffs( $backoffs, $mode ); @@ -341,10 +398,9 @@ class JobRunner { * Log the job message * @param string $msg The message to log */ - private function runJobsLog( $msg ) { + private function debugCallback( $msg ) { if ( $this->debug ) { call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) ); } - wfDebugLog( 'runJobs', $msg ); } } diff --git a/includes/jobqueue/JobSpecification.php b/includes/jobqueue/JobSpecification.php index 9fa7747f..42d2a39b 100644 --- a/includes/jobqueue/JobSpecification.php +++ b/includes/jobqueue/JobSpecification.php @@ -91,8 +91,8 @@ class JobSpecification implements IJobSpecification { /** @var Title */ protected $title; - /** @var bool Expensive jobs may set this to true */ - protected $ignoreDuplicates; + /** @var array */ + protected $opts; /** * @param string $type @@ -104,11 +104,12 @@ class JobSpecification implements IJobSpecification { $type, array $params, array $opts = array(), Title $title = null ) { $this->validateParams( $params ); + $this->validateParams( $opts ); $this->type = $type; $this->params = $params; $this->title = $title ?: Title::newMainPage(); - $this->ignoreDuplicates = !empty( $opts['removeDuplicates'] ); + $this->opts = $opts; } /** @@ -158,7 +159,7 @@ class JobSpecification implements IJobSpecification { * @return bool Whether only one of each identical set of jobs should be run */ public function ignoreDuplicates() { - return $this->ignoreDuplicates; + return !empty( $this->opts['removeDuplicates'] ); } /** @@ -186,4 +187,31 @@ class JobSpecification implements IJobSpecification { return $info; } + + /** + * @return array Field/value map that can immediately be serialized + * @since 1.25 + */ + public function toSerializableArray() { + return array( + 'type' => $this->type, + 'params' => $this->params, + 'opts' => $this->opts, + 'title' => array( + 'ns' => $this->title->getNamespace(), + 'key' => $this->title->getDbKey() + ) + ); + } + + /** + * @param array $map Field/value map + * @return JobSpecification + * @since 1.25 + */ + public static function newFromArray( array $map ) { + $title = Title::makeTitle( $map['title']['ns'], $map['title']['key'] ); + + return new self( $map['type'], $map['params'], $map['opts'], $title ); + } } diff --git a/includes/jobqueue/aggregator/JobQueueAggregator.php b/includes/jobqueue/aggregator/JobQueueAggregator.php index 8600eed9..febc277a 100644 --- a/includes/jobqueue/aggregator/JobQueueAggregator.php +++ b/includes/jobqueue/aggregator/JobQueueAggregator.php @@ -34,7 +34,7 @@ abstract class JobQueueAggregator { /** * @param array $params */ - protected function __construct( array $params ) { + public function __construct( array $params ) { } /** @@ -73,9 +73,7 @@ abstract class JobQueueAggregator { * @return bool Success */ final public function notifyQueueEmpty( $wiki, $type ) { - wfProfileIn( __METHOD__ ); $ok = $this->doNotifyQueueEmpty( $wiki, $type ); - wfProfileOut( __METHOD__ ); return $ok; } @@ -93,9 +91,7 @@ abstract class JobQueueAggregator { * @return bool Success */ final public function notifyQueueNonEmpty( $wiki, $type ) { - wfProfileIn( __METHOD__ ); $ok = $this->doNotifyQueueNonEmpty( $wiki, $type ); - wfProfileOut( __METHOD__ ); return $ok; } @@ -111,9 +107,7 @@ abstract class JobQueueAggregator { * @return array (job type => (list of wiki IDs)) */ final public function getAllReadyWikiQueues() { - wfProfileIn( __METHOD__ ); $res = $this->doGetAllReadyWikiQueues(); - wfProfileOut( __METHOD__ ); return $res; } @@ -129,9 +123,7 @@ abstract class JobQueueAggregator { * @return bool Success */ final public function purge() { - wfProfileIn( __METHOD__ ); $res = $this->doPurge(); - wfProfileOut( __METHOD__ ); return $res; } @@ -160,3 +152,21 @@ abstract class JobQueueAggregator { return $pendingDBs; } } + +class JobQueueAggregatorNull extends JobQueueAggregator { + protected function doNotifyQueueEmpty( $wiki, $type ) { + return true; + } + + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + return true; + } + + protected function doGetAllReadyWikiQueues() { + return array(); + } + + protected function doPurge() { + return true; + } +}
\ No newline at end of file diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php b/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php deleted file mode 100644 index ae266ef3..00000000 --- a/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php +++ /dev/null @@ -1,125 +0,0 @@ -<?php -/** - * Job queue aggregator code that uses BagOStuff. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - * http://www.gnu.org/copyleft/gpl.html - * - * @file - * @author Aaron Schulz - */ - -/** - * Class to handle tracking information about all queues using BagOStuff - * - * @ingroup JobQueue - * @since 1.21 - */ -class JobQueueAggregatorMemc extends JobQueueAggregator { - /** @var BagOStuff */ - protected $cache; - - protected $cacheTTL; // integer; seconds - - /** - * @param array $params Possible keys: - * - objectCache : Name of an object cache registered in $wgObjectCaches. - * This defaults to the one specified by $wgMainCacheType. - * - cacheTTL : Seconds to cache the aggregate data before regenerating. - */ - protected function __construct( array $params ) { - parent::__construct( $params ); - $this->cache = isset( $params['objectCache'] ) - ? wfGetCache( $params['objectCache'] ) - : wfGetMainCache(); - $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min - } - - /** - * @see JobQueueAggregator::doNotifyQueueEmpty() - */ - protected function doNotifyQueueEmpty( $wiki, $type ) { - $key = $this->getReadyQueueCacheKey(); - // Delist the queue from the "ready queue" list - if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock - $curInfo = $this->cache->get( $key ); - if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { - if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { - $curInfo['pendingDBs'][$type] = array_diff( - $curInfo['pendingDBs'][$type], array( $wiki ) ); - $this->cache->set( $key, $curInfo ); - } - } - $this->cache->delete( "$key:lock" ); // unlock - } - - return true; - } - - /** - * @see JobQueueAggregator::doNotifyQueueNonEmpty() - */ - protected function doNotifyQueueNonEmpty( $wiki, $type ) { - return true; // updated periodically - } - - /** - * @see JobQueueAggregator::doAllGetReadyWikiQueues() - */ - protected function doGetAllReadyWikiQueues() { - $key = $this->getReadyQueueCacheKey(); - // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, - // regenerate the cache. Use any available stale cache if another process is - // currently regenerating the pending DB information. - $pendingDbInfo = $this->cache->get( $key ); - if ( !is_array( $pendingDbInfo ) - || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL - || mt_rand( 0, 999 ) == 0 - ) { - if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock - $pendingDbInfo = array( - 'pendingDBs' => $this->findPendingWikiQueues(), - 'timestamp' => time() - ); - for ( $attempts = 1; $attempts <= 25; ++$attempts ) { - if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock - $this->cache->set( $key, $pendingDbInfo ); - $this->cache->delete( "$key:lock" ); // unlock - break; - } - } - $this->cache->delete( "$key:rebuild" ); // unlock - } - } - - return is_array( $pendingDbInfo ) - ? $pendingDbInfo['pendingDBs'] - : array(); // cache is both empty and locked - } - - /** - * @see JobQueueAggregator::doPurge() - */ - protected function doPurge() { - return $this->cache->delete( $this->getReadyQueueCacheKey() ); - } - - /** - * @return string - */ - private function getReadyQueueCacheKey() { - return "jobqueue:aggregator:ready-queues:v1"; // global - } -} diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php index db9e764c..847dd6f4 100644 --- a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php +++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php @@ -44,7 +44,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { * If a hostname is specified but no port, the standard port number * 6379 will be used. Required. */ - protected function __construct( array $params ) { + public function __construct( array $params ) { parent::__construct( $params ); $this->servers = isset( $params['redisServers'] ) ? $params['redisServers'] diff --git a/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/includes/jobqueue/jobs/AssembleUploadChunksJob.php index 9e9bda6f..b7f09e77 100644 --- a/includes/jobqueue/jobs/AssembleUploadChunksJob.php +++ b/includes/jobqueue/jobs/AssembleUploadChunksJob.php @@ -35,26 +35,16 @@ class AssembleUploadChunksJob extends Job { public function run() { $scope = RequestContext::importScopedSession( $this->params['session'] ); $context = RequestContext::getMain(); + $user = $context->getUser(); try { - $user = $context->getUser(); if ( !$user->isLoggedIn() ) { $this->setLastError( "Could not load the author user from session." ); return false; } - if ( count( $_SESSION ) === 0 ) { - // Empty session probably indicates that we didn't associate - // with the session correctly. Note that being able to load - // the user does not necessarily mean the session was loaded. - // Most likely cause by suhosin.session.encrypt = On. - $this->setLastError( "Error associating with user session. " . - "Try setting suhosin.session.encrypt = Off" ); - - return false; - } - UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ) ); @@ -70,6 +60,7 @@ class AssembleUploadChunksJob extends Job { $status = $upload->concatenateChunks(); if ( !$status->isGood() ) { UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ) ); @@ -93,6 +84,7 @@ class AssembleUploadChunksJob extends Job { // Cache the info so the user doesn't have to wait forever to get the final info UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Success', @@ -102,8 +94,9 @@ class AssembleUploadChunksJob extends Job { 'status' => Status::newGood() ) ); - } catch ( MWException $e ) { + } catch ( Exception $e ) { UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Failure', diff --git a/includes/jobqueue/jobs/DuplicateJob.php b/includes/jobqueue/jobs/DuplicateJob.php index 1fa6cefe..c5e3a234 100644 --- a/includes/jobqueue/jobs/DuplicateJob.php +++ b/includes/jobqueue/jobs/DuplicateJob.php @@ -18,7 +18,7 @@ * http://www.gnu.org/copyleft/gpl.html * * @file - * @ingroup Cache + * @ingroup JobQueue */ /** diff --git a/includes/jobqueue/jobs/EnqueueJob.php b/includes/jobqueue/jobs/EnqueueJob.php new file mode 100644 index 00000000..46fb2aa7 --- /dev/null +++ b/includes/jobqueue/jobs/EnqueueJob.php @@ -0,0 +1,88 @@ +<?php +/** + * Router job that takes jobs and enqueues them. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Router job that takes jobs and enqueues them to their proper queues + * + * This can be used for several things: + * - a) Making multi-job enqueues more robust by atomically enqueueing + * a single job that pushes the actual jobs (with retry logic) + * - b) Masking the latency of pushing jobs to different queues/wikis + * - c) Low-latency enqueues to push jobs from warm to hot datacenters + * + * @ingroup JobQueue + * @since 1.25 + */ +final class EnqueueJob extends Job { + /** + * Callers should use the factory methods instead + * + * @param Title $title + * @param array $params Job parameters + */ + function __construct( $title, $params ) { + parent::__construct( 'enqueue', $title, $params ); + } + + /** + * @param Job|JobSpecification|array $jobs + * @return JobRouteJob + */ + public static function newFromLocalJobs( $jobs ) { + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + + return self::newFromJobsByWiki( array( wfWikiID() => $jobs ) ); + } + + /** + * @param array $jobsByWiki Map of (wiki => JobSpecification list) + * @return JobRouteJob + */ + public static function newFromJobsByWiki( array $jobsByWiki ) { + $jobMapsByWiki = array(); + foreach ( $jobsByWiki as $wiki => $jobs ) { + $jobMapsByWiki[$wiki] = array(); + foreach ( $jobs as $job ) { + if ( $job instanceof JobSpecification ) { + $jobMapsByWiki[$wiki][] = $job->toSerializableArray(); + } else { + throw new InvalidArgumentException( "Jobs must be of type JobSpecification." ); + } + } + } + + return new self( Title::newMainPage(), array( 'jobsByWiki' => $jobMapsByWiki ) ); + } + + public function run() { + foreach ( $this->params['jobsByWiki'] as $wiki => $jobMaps ) { + $jobSpecs = array(); + foreach ( $jobMaps as $jobMap ) { + $jobSpecs[] = JobSpecification::newFromArray( $jobMap ); + } + JobQueueGroup::singleton( $wiki )->push( $jobSpecs ); + } + + return true; + } +} diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php index 4d1e72c9..e5e521c3 100644 --- a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php +++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php @@ -18,6 +18,7 @@ * http://www.gnu.org/copyleft/gpl.html * * @file + * @ingroup JobQueue * @ingroup Cache */ @@ -26,9 +27,9 @@ * * This job comes in a few variants: * - a) Recursive jobs to purge caches for backlink pages for a given title. - * These jobs have have (recursive:true,table:<table>) set. + * These jobs have (recursive:true,table:<table>) set. * - b) Jobs to purge caches for a set of titles (the job title is ignored). - * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set. + * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set. * * @ingroup JobQueue */ @@ -42,17 +43,8 @@ class HTMLCacheUpdateJob extends Job { function run() { global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery; - static $expected = array( 'recursive', 'pages' ); // new jobs have one of these - - $oldRangeJob = false; - if ( !array_intersect( array_keys( $this->params ), $expected ) ) { - // B/C for older job params formats that lack these fields: - // a) base jobs with just ("table") and b) range jobs with ("table","start","end") - if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { - $oldRangeJob = true; - } else { - $this->params['recursive'] = true; // base job - } + if ( isset( $this->params['table'] ) && !isset( $this->params['pages'] ) ) { + $this->params['recursive'] = true; // b/c; base job } // Job to purge all (or a range of) backlink pages for a page @@ -67,29 +59,15 @@ class HTMLCacheUpdateJob extends Job { array( 'params' => $this->getRootJobParams() ) ); JobQueueGroup::singleton()->push( $jobs ); - // Job to purge pages for for a set of titles + // Job to purge pages for a set of titles } elseif ( isset( $this->params['pages'] ) ) { $this->invalidateTitles( $this->params['pages'] ); - // B/C for job to purge a range of backlink pages for a given page - } elseif ( $oldRangeJob ) { - $titleArray = $this->title->getBacklinkCache()->getLinks( - $this->params['table'], $this->params['start'], $this->params['end'] ); - - $pages = array(); // same format BacklinkJobUtils uses - foreach ( $titleArray as $tl ) { - $pages[$tl->getArticleId()] = array( $tl->getNamespace(), $tl->getDbKey() ); - } - - $jobs = array(); - foreach ( array_chunk( $pages, $wgUpdateRowsPerJob ) as $pageChunk ) { - $jobs[] = new HTMLCacheUpdateJob( $this->title, - array( - 'table' => $this->params['table'], - 'pages' => $pageChunk - ) + $this->getRootJobParams() // carry over information for de-duplication - ); - } - JobQueueGroup::singleton()->push( $jobs ); + // Job to update a single title + } else { + $t = $this->title; + $this->invalidateTitles( array( + $t->getArticleID() => array( $t->getNamespace(), $t->getDBkey() ) + ) ); } return true; diff --git a/includes/jobqueue/jobs/NullJob.php b/includes/jobqueue/jobs/NullJob.php index 66291e9d..f94d6ebc 100644 --- a/includes/jobqueue/jobs/NullJob.php +++ b/includes/jobqueue/jobs/NullJob.php @@ -18,7 +18,7 @@ * http://www.gnu.org/copyleft/gpl.html * * @file - * @ingroup Cache + * @ingroup JobQueue */ /** diff --git a/includes/jobqueue/jobs/PublishStashedFileJob.php b/includes/jobqueue/jobs/PublishStashedFileJob.php index 918a392d..a922dd3d 100644 --- a/includes/jobqueue/jobs/PublishStashedFileJob.php +++ b/includes/jobqueue/jobs/PublishStashedFileJob.php @@ -19,12 +19,14 @@ * * @file * @ingroup Upload + * @ingroup JobQueue */ /** * Upload a file from the upload stash into the local file repo. * * @ingroup Upload + * @ingroup JobQueue */ class PublishStashedFileJob extends Job { public function __construct( $title, $params ) { @@ -35,26 +37,16 @@ class PublishStashedFileJob extends Job { public function run() { $scope = RequestContext::importScopedSession( $this->params['session'] ); $context = RequestContext::getMain(); + $user = $context->getUser(); try { - $user = $context->getUser(); if ( !$user->isLoggedIn() ) { $this->setLastError( "Could not load the author user from session." ); return false; } - if ( count( $_SESSION ) === 0 ) { - // Empty session probably indicates that we didn't associate - // with the session correctly. Note that being able to load - // the user does not necessarily mean the session was loaded. - // Most likely cause by suhosin.session.encrypt = On. - $this->setLastError( "Error associating with user session. " . - "Try setting suhosin.session.encrypt = Off" ); - - return false; - } - UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ) ); @@ -72,6 +64,7 @@ class PublishStashedFileJob extends Job { $status = Status::newFatal( 'verification-error' ); $status->value = array( 'verification' => $verification ); UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) ); @@ -89,6 +82,7 @@ class PublishStashedFileJob extends Job { ); if ( !$status->isGood() ) { UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) ); @@ -106,6 +100,7 @@ class PublishStashedFileJob extends Job { // Cache the info so the user doesn't have to wait forever to get the final info UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Success', @@ -115,8 +110,9 @@ class PublishStashedFileJob extends Job { 'status' => Status::newGood() ) ); - } catch ( MWException $e ) { + } catch ( Exception $e ) { UploadBase::setSessionStatus( + $user, $this->params['filekey'], array( 'result' => 'Failure', diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php new file mode 100644 index 00000000..cc04595d --- /dev/null +++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php @@ -0,0 +1,223 @@ +<?php +/** + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + * @ingroup JobQueue + */ + +/** + * Job for pruning recent changes + * + * @ingroup JobQueue + * @since 1.25 + */ +class RecentChangesUpdateJob extends Job { + function __construct( $title, $params ) { + parent::__construct( 'recentChangesUpdate', $title, $params ); + + if ( !isset( $params['type'] ) ) { + throw new Exception( "Missing 'type' parameter." ); + } + + $this->removeDuplicates = true; + } + + /** + * @return RecentChangesUpdateJob + */ + final public static function newPurgeJob() { + return new self( + SpecialPage::getTitleFor( 'Recentchanges' ), array( 'type' => 'purge' ) + ); + } + + /** + * @return RecentChangesUpdateJob + * @since 1.26 + */ + final public static function newCacheUpdateJob() { + return new self( + SpecialPage::getTitleFor( 'Recentchanges' ), array( 'type' => 'cacheUpdate' ) + ); + } + + public function run() { + if ( $this->params['type'] === 'purge' ) { + $this->purgeExpiredRows(); + } elseif ( $this->params['type'] === 'cacheUpdate' ) { + $this->updateActiveUsers(); + } else { + throw new InvalidArgumentException( + "Invalid 'type' parameter '{$this->params['type']}'." ); + } + + return true; + } + + protected function purgeExpiredRows() { + global $wgRCMaxAge; + + $lockKey = wfWikiID() . ':recentchanges-prune'; + + $dbw = wfGetDB( DB_MASTER ); + if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) { + return; // already in progress + } + $batchSize = 100; // Avoid slave lag + + $cutoff = $dbw->timestamp( time() - $wgRCMaxAge ); + do { + $rcIds = $dbw->selectFieldValues( 'recentchanges', + 'rc_id', + array( 'rc_timestamp < ' . $dbw->addQuotes( $cutoff ) ), + __METHOD__, + array( 'LIMIT' => $batchSize ) + ); + if ( $rcIds ) { + $dbw->delete( 'recentchanges', array( 'rc_id' => $rcIds ), __METHOD__ ); + } + // Commit in chunks to avoid slave lag + $dbw->commit( __METHOD__, 'flush' ); + + if ( count( $rcIds ) === $batchSize ) { + // There might be more, so try waiting for slaves + if ( !wfWaitForSlaves( null, false, false, /* $timeout = */ 3 ) ) { + // Another job will continue anyway + break; + } + } + } while ( $rcIds ); + + $dbw->unlock( $lockKey, __METHOD__ ); + } + + protected function updateActiveUsers() { + global $wgActiveUserDays; + + // Users that made edits at least this many days ago are "active" + $days = $wgActiveUserDays; + // Pull in the full window of active users in this update + $window = $wgActiveUserDays * 86400; + + $dbw = wfGetDB( DB_MASTER ); + // JobRunner uses DBO_TRX, but doesn't call begin/commit itself; + // onTransactionIdle() will run immediately since there is no trx. + $dbw->onTransactionIdle( function() use ( $dbw, $days, $window ) { + // Avoid disconnect/ping() cycle that makes locks fall off + $dbw->setSessionOptions( array( 'connTimeout' => 900 ) ); + + $lockKey = wfWikiID() . '-activeusers'; + if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) { + return false; // exclusive update (avoids duplicate entries) + } + + $nowUnix = time(); + // Get the last-updated timestamp for the cache + $cTime = $dbw->selectField( 'querycache_info', + 'qci_timestamp', + array( 'qci_type' => 'activeusers' ) + ); + $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1; + + // Pick the date range to fetch from. This is normally from the last + // update to till the present time, but has a limited window for sanity. + // If the window is limited, multiple runs are need to fully populate it. + $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 ); + $eTimestamp = min( $sTimestamp + $window, $nowUnix ); + + // Get all the users active since the last update + $res = $dbw->select( + array( 'recentchanges' ), + array( 'rc_user_text', 'lastedittime' => 'MAX(rc_timestamp)' ), + array( + 'rc_user > 0', // actual accounts + 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata + 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ), + 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ), + 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) ) + ), + __METHOD__, + array( + 'GROUP BY' => array( 'rc_user_text' ), + 'ORDER BY' => 'NULL' // avoid filesort + ) + ); + $names = array(); + foreach ( $res as $row ) { + $names[$row->rc_user_text] = $row->lastedittime; + } + + // Rotate out users that have not edited in too long (according to old data set) + $dbw->delete( 'querycachetwo', + array( + 'qcc_type' => 'activeusers', + 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX + ), + __METHOD__ + ); + + // Find which of the recently active users are already accounted for + if ( count( $names ) ) { + $res = $dbw->select( 'querycachetwo', + array( 'user_name' => 'qcc_title' ), + array( + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => array_keys( $names ) ), + __METHOD__ + ); + foreach ( $res as $row ) { + unset( $names[$row->user_name] ); + } + } + + // Insert the users that need to be added to the list + if ( count( $names ) ) { + $newRows = array(); + foreach ( $names as $name => $lastEditTime ) { + $newRows[] = array( + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => $name, + 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ), + 'qcc_namespacetwo' => 0, // unused + 'qcc_titletwo' => '' // unused + ); + } + foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) { + $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ ); + wfWaitForSlaves(); + } + } + + // If a transaction was already started, it might have an old + // snapshot, so kludge the timestamp range back as needed. + $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() ); + + // Touch the data freshness timestamp + $dbw->replace( 'querycache_info', + array( 'qci_type' ), + array( 'qci_type' => 'activeusers', + 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ), // not always $now + __METHOD__ + ); + + $dbw->unlock( $lockKey, __METHOD__ ); + } ); + } +} diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php index f82af273..1252b0b5 100644 --- a/includes/jobqueue/jobs/RefreshLinksJob.php +++ b/includes/jobqueue/jobs/RefreshLinksJob.php @@ -26,9 +26,9 @@ * * This job comes in a few variants: * - a) Recursive jobs to update links for backlink pages for a given title. - * These jobs have have (recursive:true,table:<table>) set. + * These jobs have (recursive:true,table:<table>) set. * - b) Jobs to update links for a set of pages (the job title is ignored). - * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set. + * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set. * - c) Jobs to update links for a single page (the job title) * These jobs need no extra fields set. * @@ -39,6 +39,10 @@ class RefreshLinksJob extends Job { function __construct( $title, $params = '' ) { parent::__construct( 'refreshLinks', $title, $params ); + // A separate type is used just for cascade-protected backlinks + if ( !empty( $this->params['prioritize'] ) ) { + $this->command .= 'Prioritized'; + } // Base backlink update jobs and per-title update jobs can be de-duplicated. // If template A changes twice before any jobs run, a clean queue will have: // (A base, A base) @@ -86,7 +90,7 @@ class RefreshLinksJob extends Job { array( 'params' => $extraParams ) ); JobQueueGroup::singleton()->push( $jobs ); - // Job to update link tables for for a set of titles + // Job to update link tables for a set of titles } elseif ( isset( $this->params['pages'] ) ) { foreach ( $this->params['pages'] as $pageId => $nsAndKey ) { list( $ns, $dbKey ) = $nsAndKey; @@ -100,6 +104,10 @@ class RefreshLinksJob extends Job { return true; } + /** + * @param Title $title + * @return bool + */ protected function runForTitle( Title $title = null ) { $linkCache = LinkCache::singleton(); $linkCache->clear(); @@ -157,7 +165,7 @@ class RefreshLinksJob extends Job { $ellapsed = microtime( true ) - $start; // If it took a long time to render, then save this back to the cache to avoid // wasted CPU by other apaches or job runners. We don't want to always save to - // cache as this cause cause high cache I/O and LRU churn when a template changes. + // cache as this can cause high cache I/O and LRU churn when a template changes. if ( $ellapsed >= self::PARSE_THRESHOLD_SEC && $page->isParserCacheUsed( $parserOptions, $revision->getId() ) && $parserOutput->isCacheable() diff --git a/includes/jobqueue/jobs/RefreshLinksJob2.php b/includes/jobqueue/jobs/RefreshLinksJob2.php deleted file mode 100644 index 97405aeb..00000000 --- a/includes/jobqueue/jobs/RefreshLinksJob2.php +++ /dev/null @@ -1,141 +0,0 @@ -<?php -/** - * Job to update links for a given title. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - * http://www.gnu.org/copyleft/gpl.html - * - * @file - * @ingroup JobQueue - */ - -/** - * Background job to update links for titles in certain backlink range by page ID. - * Newer version for high use templates. This is deprecated by RefreshLinksPartitionJob. - * - * @ingroup JobQueue - * @deprecated since 1.23 - */ -class RefreshLinksJob2 extends Job { - function __construct( $title, $params ) { - parent::__construct( 'refreshLinks2', $title, $params ); - // Base jobs for large templates can easily be de-duplicated - $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] ); - } - - /** - * Run a refreshLinks2 job - * @return bool Success - */ - function run() { - global $wgUpdateRowsPerJob; - - $linkCache = LinkCache::singleton(); - $linkCache->clear(); - - if ( is_null( $this->title ) ) { - $this->error = "refreshLinks2: Invalid title"; - return false; - } - - // Back compat for pre-r94435 jobs - $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks'; - - // Avoid slave lag when fetching templates. - // When the outermost job is run, we know that the caller that enqueued it must have - // committed the relevant changes to the DB by now. At that point, record the master - // position and pass it along as the job recursively breaks into smaller range jobs. - // Hopefully, when leaf jobs are popped, the slaves will have reached that position. - if ( isset( $this->params['masterPos'] ) ) { - $masterPos = $this->params['masterPos']; - } elseif ( wfGetLB()->getServerCount() > 1 ) { - $masterPos = wfGetLB()->getMasterPos(); - } else { - $masterPos = false; - } - - $tbc = $this->title->getBacklinkCache(); - - $jobs = array(); // jobs to insert - if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { - # This is a partition job to trigger the insertion of leaf jobs... - $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); - } else { - # This is a base job to trigger the insertion of partitioned jobs... - if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) { - # Just directly insert the single per-title jobs - $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); - } else { - # Insert the partition jobs to make per-title jobs - foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) { - list( $start, $end ) = $batch; - $jobs[] = new RefreshLinksJob2( $this->title, - array( - 'table' => $table, - 'start' => $start, - 'end' => $end, - 'masterPos' => $masterPos, - ) + $this->getRootJobParams() // carry over information for de-duplication - ); - } - } - } - - if ( count( $jobs ) ) { - JobQueueGroup::singleton()->push( $jobs ); - } - - return true; - } - - /** - * @param string $table - * @param mixed $masterPos - * @return array - */ - protected function getSingleTitleJobs( $table, $masterPos ) { - # The "start"/"end" fields are not set for the base jobs - $start = isset( $this->params['start'] ) ? $this->params['start'] : false; - $end = isset( $this->params['end'] ) ? $this->params['end'] : false; - $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end ); - # Convert into single page refresh links jobs. - # This handles well when in sapi mode and is useful in any case for job - # de-duplication. If many pages use template A, and that template itself - # uses template B, then an edit to both will create many duplicate jobs. - # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will - # get run first, and when it does, it will remove the duplicates. Of course, - # one page could have its job popped when the other page's job is still - # buried within the logic of a refreshLinks2 job. - $jobs = array(); - foreach ( $titles as $title ) { - $jobs[] = new RefreshLinksJob( $title, - array( 'masterPos' => $masterPos ) + $this->getRootJobParams() - ); // carry over information for de-duplication - } - return $jobs; - } - - /** - * @return array - */ - public function getDeduplicationInfo() { - $info = parent::getDeduplicationInfo(); - // Don't let highly unique "masterPos" values ruin duplicate detection - if ( is_array( $info['params'] ) ) { - unset( $info['params']['masterPos'] ); - } - return $info; - } -} diff --git a/includes/jobqueue/jobs/ThumbnailRenderJob.php b/includes/jobqueue/jobs/ThumbnailRenderJob.php new file mode 100644 index 00000000..ab381388 --- /dev/null +++ b/includes/jobqueue/jobs/ThumbnailRenderJob.php @@ -0,0 +1,109 @@ +<?php +/** + * Job for asynchronous rendering of thumbnails. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job for asynchronous rendering of thumbnails. + * + * @ingroup JobQueue + */ +class ThumbnailRenderJob extends Job { + public function __construct( $title, $params ) { + parent::__construct( 'ThumbnailRender', $title, $params ); + } + + public function run() { + global $wgUploadThumbnailRenderMethod; + + $transformParams = $this->params['transformParams']; + + $file = wfLocalFile( $this->title ); + $file->load( File::READ_LATEST ); + + if ( $file && $file->exists() ) { + if ( $wgUploadThumbnailRenderMethod === 'jobqueue' ) { + $thumb = $file->transform( $transformParams, File::RENDER_NOW ); + + if ( $thumb && !$thumb->isError() ) { + return true; + } else { + $this->setLastError( __METHOD__ . ': thumbnail couln\'t be generated' ); + return false; + } + } elseif ( $wgUploadThumbnailRenderMethod === 'http' ) { + $status = $this->hitThumbUrl( $file, $transformParams ); + + wfDebug( __METHOD__ . ": received status {$status}\n" ); + + if ( $status === 200 || $status === 301 || $status === 302 ) { + return true; + } elseif ( $status ) { + // Note that this currently happens (500) when requesting sizes larger then or + // equal to the original, which is harmless. + $this->setLastError( __METHOD__ . ': incorrect HTTP status ' . $status ); + return false; + } else { + $this->setLastError( __METHOD__ . ': HTTP request failure' ); + return false; + } + } else { + $this->setLastError( __METHOD__ . ': unknown thumbnail render method ' . $wgUploadThumbnailRenderMethod ); + return false; + } + } else { + $this->setLastError( __METHOD__ . ': file doesn\'t exist' ); + return false; + } + } + + protected function hitThumbUrl( $file, $transformParams ) { + global $wgUploadThumbnailRenderHttpCustomHost, $wgUploadThumbnailRenderHttpCustomDomain; + + $thumbName = $file->thumbName( $transformParams ); + $thumbUrl = $file->getThumbUrl( $thumbName ); + + if ( $wgUploadThumbnailRenderHttpCustomDomain ) { + $parsedUrl = wfParseUrl( $thumbUrl ); + + if ( !$parsedUrl || !isset( $parsedUrl['path'] ) || !strlen( $parsedUrl['path'] ) ) { + return false; + } + + $thumbUrl = '//' . $wgUploadThumbnailRenderHttpCustomDomain . $parsedUrl['path']; + } + + wfDebug( __METHOD__ . ": hitting url {$thumbUrl}\n" ); + + $request = MWHttpRequest::factory( $thumbUrl, + array( 'method' => 'HEAD', 'followRedirects' => true ), + __METHOD__ + ); + + if ( $wgUploadThumbnailRenderHttpCustomHost ) { + $request->setHeader( 'Host', $wgUploadThumbnailRenderHttpCustomHost ); + } + + $status = $request->execute(); + + return $request->getStatus(); + } +} diff --git a/includes/jobqueue/jobs/UploadFromUrlJob.php b/includes/jobqueue/jobs/UploadFromUrlJob.php index a09db15a..d15fd025 100644 --- a/includes/jobqueue/jobs/UploadFromUrlJob.php +++ b/includes/jobqueue/jobs/UploadFromUrlJob.php @@ -81,7 +81,7 @@ class UploadFromUrlJob extends Job { if ( $warnings ) { # Stash the upload - $key = $this->upload->stashFile(); + $key = $this->upload->stashFile( $this->user ); // @todo FIXME: This has been broken for a while. // User::leaveUserMessage() does not exist. |