diff options
Diffstat (limited to 'includes/job')
18 files changed, 2316 insertions, 408 deletions
diff --git a/includes/job/Job.php b/includes/job/Job.php index bcf582e7..ab7df5d2 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -68,7 +68,7 @@ abstract class Job { */ public static function factory( $command, Title $title, $params = false, $id = 0 ) { global $wgJobClasses; - if( isset( $wgJobClasses[$command] ) ) { + if ( isset( $wgJobClasses[$command] ) ) { $class = $wgJobClasses[$command]; return new $class( $title, $params, $id ); } @@ -84,7 +84,7 @@ abstract class Job { * * @param array $jobs of Job objects * @return bool - * @deprecated 1.21 + * @deprecated since 1.21 */ public static function batchInsert( $jobs ) { return JobQueueGroup::singleton()->push( $jobs ); @@ -99,10 +99,10 @@ abstract class Job { * * @param array $jobs of Job objects * @return bool - * @deprecated 1.21 + * @deprecated since 1.21 */ public static function safeBatchInsert( $jobs ) { - return JobQueueGroup::singleton()->push( $jobs, JobQueue::QoS_Atomic ); + return JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC ); } /** @@ -112,7 +112,7 @@ abstract class Job { * * @param $type string * @return Job|bool Returns false if there are no jobs - * @deprecated 1.21 + * @deprecated since 1.21 */ public static function pop_type( $type ) { return JobQueueGroup::singleton()->get( $type )->pop(); @@ -123,7 +123,7 @@ abstract class Job { * This is subject to $wgJobTypesExcludedFromDefaultQueue. * * @return Job or false if there's no jobs - * @deprecated 1.21 + * @deprecated since 1.21 */ public static function pop() { return JobQueueGroup::singleton()->pop(); @@ -150,6 +150,7 @@ abstract class Job { /** * @return integer May be 0 for jobs stored outside the DB + * @deprecated since 1.22 */ public function getId() { return $this->id; @@ -177,6 +178,16 @@ abstract class Job { } /** + * @return integer|null UNIX timestamp to delay running this job until, otherwise null + * @since 1.22 + */ + public function getReleaseTimestamp() { + return isset( $this->params['jobReleaseTimestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) + : null; + } + + /** * @return bool Whether only one of each identical set of jobs should be run */ public function ignoreDuplicates() { @@ -185,34 +196,43 @@ abstract class Job { /** * @return bool Whether this job can be retried on failure by job runners + * @since 1.21 */ public function allowRetries() { return true; } /** - * Subclasses may need to override this to make duplication detection work + * Subclasses may need to override this to make duplication detection work. + * The resulting map conveys everything that makes the job unique. This is + * only checked if ignoreDuplicates() returns true, meaning that duplicate + * jobs are supposed to be ignored. * * @return Array Map of key/values + * @since 1.21 */ public function getDeduplicationInfo() { $info = array( - 'type' => $this->getType(), + 'type' => $this->getType(), 'namespace' => $this->getTitle()->getNamespace(), - 'title' => $this->getTitle()->getDBkey(), - 'params' => $this->getParams() + 'title' => $this->getTitle()->getDBkey(), + 'params' => $this->getParams() ); - // Identical jobs with different "root" jobs should count as duplicates if ( is_array( $info['params'] ) ) { + // Identical jobs with different "root" jobs should count as duplicates unset( $info['params']['rootJobSignature'] ); unset( $info['params']['rootJobTimestamp'] ); + // Likewise for jobs with different delay times + unset( $info['params']['jobReleaseTimestamp'] ); } return $info; } /** + * @see JobQueue::deduplicateRootJob() * @param string $key A key that identifies the task * @return Array + * @since 1.21 */ public static function newRootJobParams( $key ) { return array( @@ -222,7 +242,9 @@ abstract class Job { } /** + * @see JobQueue::deduplicateRootJob() * @return Array + * @since 1.21 */ public function getRootJobParams() { return array( @@ -236,9 +258,19 @@ abstract class Job { } /** + * @see JobQueue::deduplicateRootJob() + * @return bool + * @since 1.22 + */ + public function hasRootJobParams() { + return isset( $this->params['rootJobSignature'] ) + && isset( $this->params['rootJobTimestamp'] ); + } + + /** * Insert a single job into the queue. * @return bool true on success - * @deprecated 1.21 + * @deprecated since 1.21 */ public function insert() { return JobQueueGroup::singleton()->push( $this ); diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index b0dd9258..6556ee85 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -34,8 +34,14 @@ abstract class JobQueue { protected $order; // string; job priority for pop() protected $claimTTL; // integer; seconds protected $maxTries; // integer; maximum number of times to try a job + protected $checkDelay; // boolean; allow delayed jobs - const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions + /** @var BagOStuff */ + protected $dupCache; + + const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions + + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) /** * @param $params array @@ -53,28 +59,37 @@ abstract class JobQueue { if ( !in_array( $this->order, $this->supportedOrders() ) ) { throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); } + $this->checkDelay = !empty( $params['checkDelay'] ); + if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { + throw new MWException( __CLASS__ . " does not support delayed jobs." ); + } + $this->dupCache = wfGetCache( CACHE_ANYTHING ); } /** * Get a job queue object of the specified type. * $params includes: - * - class : What job class to use (determines job type) - * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) - * - type : The name of the job types this queue handles - * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". - * If "fifo" is used, the queue will effectively be FIFO. Note that - * job completion will not appear to be exactly FIFO if there are multiple - * job runners since jobs can take different times to finish once popped. - * If "timestamp" is used, the queue will at least be loosely ordered - * by timestamp, allowing for some jobs to be popped off out of order. - * If "random" is used, pop() will pick jobs in random order. - * Note that it may only be weakly random (e.g. a lottery of the oldest X). - * If "any" is choosen, the queue will use whatever order is the fastest. - * This might be useful for improving concurrency for job acquisition. - * - claimTTL : If supported, the queue will recycle jobs that have been popped - * but not acknowledged as completed after this many seconds. Recycling - * of jobs simple means re-inserting them into the queue. Jobs can be - * attempted up to three times before being discarded. + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. + * This lets delayed jobs wait in a staging area until a given timestamp is + * reached, at which point they will enter the queue. If this is not enabled + * or not supported, an exception will be thrown on delayed job insertion. * * Queue classes should throw an exception if they do not support the options given. * @@ -84,7 +99,7 @@ abstract class JobQueue { */ final public static function factory( array $params ) { $class = $params['class']; - if ( !MWInit::classExists( $class ) ) { + if ( !class_exists( $class ) ) { throw new MWException( "Invalid job queue class '$class'." ); } $obj = new $class( $params ); @@ -109,24 +124,45 @@ abstract class JobQueue { } /** - * @return string One of (random, timestamp, fifo) + * @return string One of (random, timestamp, fifo, undefined) */ final public function getOrder() { return $this->order; } /** - * @return Array Subset of (random, timestamp, fifo) + * @return bool Whether delayed jobs are enabled + * @since 1.22 + */ + final public function delayedJobsEnabled() { + return $this->checkDelay; + } + + /** + * Get the allowed queue orders for configuration validation + * + * @return Array Subset of (random, timestamp, fifo, undefined) */ abstract protected function supportedOrders(); /** - * @return string One of (random, timestamp, fifo) + * Get the default queue order to use if configuration does not specify one + * + * @return string One of (random, timestamp, fifo, undefined) */ abstract protected function optimalOrder(); /** - * Quickly check if the queue is empty (has no available jobs). + * Find out if delayed jobs are supported for configuration validation + * + * @return boolean Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this might return false when there are actually no jobs. @@ -135,7 +171,7 @@ abstract class JobQueue { * not distinguishable from the race condition between isEmpty() and pop(). * * @return bool - * @throws MWException + * @throws JobQueueError */ final public function isEmpty() { wfProfileIn( __METHOD__ ); @@ -151,13 +187,13 @@ abstract class JobQueue { abstract protected function doIsEmpty(); /** - * Get the number of available (unacquired) jobs in the queue. + * Get the number of available (unacquired, non-delayed) jobs in the queue. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError */ final public function getSize() { wfProfileIn( __METHOD__ ); @@ -179,7 +215,7 @@ abstract class JobQueue { * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError */ final public function getAcquiredCount() { wfProfileIn( __METHOD__ ); @@ -195,14 +231,63 @@ abstract class JobQueue { abstract protected function doGetAcquiredCount(); /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + * @since 1.22 + */ + final public function getDelayedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetDelayedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return integer + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + + /** + * Get the number of acquired jobs that can no longer be attempted. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + */ + final public function getAbandonedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAbandonedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getAbandonedCount() + * @return integer + */ + protected function doGetAbandonedCount() { + return 0; // not implemented + } + + /** * Push a single jobs into the queue. * This does not require $wgJobClasses to be set for the given job type. * Outside callers should use JobQueueGroup::push() instead of this function. * * @param $jobs Job|Array - * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure - * @throws MWException + * @throws JobQueueError */ final public function push( $jobs, $flags = 0 ) { return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); @@ -214,9 +299,9 @@ abstract class JobQueue { * Outside callers should use JobQueueGroup::push() instead of this function. * * @param array $jobs List of Jobs - * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure - * @throws MWException + * @throws JobQueueError */ final public function batchPush( array $jobs, $flags = 0 ) { if ( !count( $jobs ) ) { @@ -225,7 +310,11 @@ abstract class JobQueue { foreach ( $jobs as $job ) { if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); } } @@ -247,7 +336,7 @@ abstract class JobQueue { * Outside callers should use JobQueueGroup::pop() instead of this function. * * @return Job|bool Returns false if there are no jobs - * @throws MWException + * @throws JobQueueError */ final public function pop() { global $wgJobClasses; @@ -262,6 +351,15 @@ abstract class JobQueue { wfProfileIn( __METHOD__ ); $job = $this->doPop(); wfProfileOut( __METHOD__ ); + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + JobQueue::incrStats( 'job-pop-duplicate', $this->type ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) {} // don't lose jobs over this + return $job; } @@ -279,7 +377,7 @@ abstract class JobQueue { * * @param $job Job * @return bool - * @throws MWException + * @throws JobQueueError */ final public function ack( Job $job ) { if ( $job->getType() !== $this->type ) { @@ -326,7 +424,7 @@ abstract class JobQueue { * * @param $job Job * @return bool - * @throws MWException + * @throws JobQueueError */ final public function deduplicateRootJob( Job $job ) { if ( $job->getType() !== $this->type ) { @@ -344,7 +442,91 @@ abstract class JobQueue { * @return bool */ protected function doDeduplicateRootJob( Job $job ) { - return true; + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + * @throws JobQueueError + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + wfProfileOut( __METHOD__ ); + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Get the last time this root job was enqueued + $timestamp = $this->dupCache->get( $key ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } + + /** + * Deleted all unclaimed and delayed jobs from the queue + * + * @return bool Success + * @throws JobQueueError + * @since 1.22 + */ + final public function delete() { + wfProfileIn( __METHOD__ ); + $res = $this->doDelete(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::delete() + * @return bool Success + */ + protected function doDelete() { + throw new MWException( "This method is not implemented." ); } /** @@ -353,7 +535,7 @@ abstract class JobQueue { * This does nothing for certain queue classes. * * @return void - * @throws MWException + * @throws JobQueueError */ final public function waitForBackups() { wfProfileIn( __METHOD__ ); @@ -413,16 +595,98 @@ abstract class JobQueue { protected function doFlushCaches() {} /** - * Get an iterator to traverse over all of the jobs in this queue. - * This does not include jobs that are current acquired. In general, - * this should only be called on a queue that is no longer being popped. + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * Note: results may be stale if the queue is concurrently modified. * - * @return Iterator|Traversable|Array - * @throws MWException + * @return Iterator + * @throws JobQueueError */ abstract public function getAllQueuedJobs(); /** + * Get an iterator to traverse over all delayed jobs in this queue. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + * @since 1.22 + */ + public function getAllDelayedJobs() { + return new ArrayIterator( array() ); // not implemented + } + + /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return null; + } + + /** + * Check whether each of the given queues are empty. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (list of non-empty queue types) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueuesWithJobs( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueuesWithJobs( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesWithJobs() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesWithJobs( array $types ) { + return null; // not supported + } + + /** + * Check the size of each of the given queues. + * For queues not served by the same store as this one, 0 is returned. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (job type => whether queue is empty) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueueSizes( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueueSizes( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return null; // not supported + } + + /** + * Call wfIncrStats() for the queue overall and for the queue type + * + * @param string $key Event type + * @param string $type Job type + * @param integer $delta + * @since 1.22 + */ + public static function incrStats( $key, $type, $delta = 1 ) { + wfIncrStats( $key, $delta ); + wfIncrStats( "{$key}-{$type}", $delta ); + } + + /** * Namespace the queue with a key to isolate it for testing * * @param $key string @@ -433,3 +697,10 @@ abstract class JobQueue { throw new MWException( "Queue namespacing not supported for this queue type." ); } } + +/** + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueError extends MWException {} +class JobQueueConnectionError extends JobQueueError {} diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index ff7f7abc..c39083df 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -28,13 +28,15 @@ * @since 1.21 */ class JobQueueDB extends JobQueue { - const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days) const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random const MAX_OFFSET = 255; // integer; maximum number of rows to skip + /** @var BagOStuff */ + protected $cache; + protected $cluster = false; // string; name of an external DB cluster /** @@ -46,8 +48,13 @@ class JobQueueDB extends JobQueue { * @param $params array */ protected function __construct( array $params ) { + global $wgMemc; + parent::__construct( $params ); + $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; + // Make sure that we don't use the SQL cache, which would be harmful + $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc; } protected function supportedOrders() { @@ -63,22 +70,24 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doIsEmpty() { - global $wgMemc; - $key = $this->getCacheKey( 'empty' ); - $isEmpty = $wgMemc->get( $key ); + $isEmpty = $this->cache->get( $key ); if ( $isEmpty === 'true' ) { return true; } elseif ( $isEmpty === 'false' ) { return false; } - list( $dbr, $scope ) = $this->getSlaveDB(); - $found = $dbr->selectField( // unclaimed job - 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ - ); - $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); + $dbr = $this->getSlaveDB(); + try { + $found = $dbr->selectField( // unclaimed job + 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); return !$found; } @@ -88,21 +97,23 @@ class JobQueueDB extends JobQueue { * @return integer */ protected function doGetSize() { - global $wgMemc; - $key = $this->getCacheKey( 'size' ); - $size = $wgMemc->get( $key ); + $size = $this->cache->get( $key ); if ( is_int( $size ) ) { return $size; } - list( $dbr, $scope ) = $this->getSlaveDB(); - $size = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, 'job_token' => '' ), - __METHOD__ - ); - $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT ); + try { + $dbr = $this->getSlaveDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); return $size; } @@ -112,24 +123,63 @@ class JobQueueDB extends JobQueue { * @return integer */ protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'acquiredcount' ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return integer + * @throws MWException + */ + protected function doGetAbandonedCount() { global $wgMemc; if ( $this->claimTTL <= 0 ) { return 0; // no acknowledgements } - $key = $this->getCacheKey( 'acquiredcount' ); + $key = $this->getCacheKey( 'abandonedcount' ); $count = $wgMemc->get( $key ); if ( is_int( $count ) ) { return $count; } - list( $dbr, $scope ) = $this->getSlaveDB(); - $count = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), - __METHOD__ - ); + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( + 'job_cmd' => $this->type, + "job_token != {$dbr->addQuotes( '' )}", + "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) + ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; @@ -143,70 +193,84 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doBatchPush( array $jobs, $flags ) { - if ( count( $jobs ) ) { - list( $dbw, $scope ) = $this->getMasterDB(); - - $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated - $rowList = array(); // list of jobs for jobs that are are not de-duplicated - - foreach ( $jobs as $job ) { - $row = $this->insertFields( $job ); - if ( $job->ignoreDuplicates() ) { - $rowSet[$row['job_sha1']] = $row; - } else { - $rowList[] = $row; - } + $dbw = $this->getMasterDB(); + + $that = $this; + $method = __METHOD__; + $dbw->onTransactionIdle( + function() use ( $dbw, $that, $jobs, $flags, $method ) { + $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); } + ); - $key = $this->getCacheKey( 'empty' ); - $atomic = ( $flags & self::QoS_Atomic ); + return true; + } - $dbw->onTransactionIdle( - function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $scope - ) { - global $wgMemc; + /** + * This function should *not* be called outside of JobQueueDB + * + * @param DatabaseBase $dbw + * @param array $jobs + * @param int $flags + * @param string $method + * @return boolean + * @throws type + */ + public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { + if ( !count( $jobs ) ) { + return true; + } - if ( $atomic ) { - $dbw->begin( __METHOD__ ); // wrap all the job additions in one transaction - } - try { - // Strip out any duplicate jobs that are already in the queue... - if ( count( $rowSet ) ) { - $res = $dbw->select( 'job', 'job_sha1', - array( - // No job_type condition since it's part of the job_sha1 hash - 'job_sha1' => array_keys( $rowSet ), - 'job_token' => '' // unclaimed - ), - __METHOD__ - ); - foreach ( $res as $row ) { - wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); - unset( $rowSet[$row->job_sha1] ); // already enqueued - } - } - // Build the full list of job rows to insert - $rows = array_merge( $rowList, array_values( $rowSet ) ); - // Insert the job rows in chunks to avoid slave lag... - foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { - $dbw->insert( 'job', $rowBatch, __METHOD__ ); - } - wfIncrStats( 'job-insert', count( $rows ) ); - wfIncrStats( 'job-insert-duplicate', - count( $rowSet ) + count( $rowList ) - count( $rows ) ); - } catch ( DBError $e ) { - if ( $atomic ) { - $dbw->rollback( __METHOD__ ); - } - throw $e; - } - if ( $atomic ) { - $dbw->commit( __METHOD__ ); - } + $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated + $rowList = array(); // list of jobs for jobs that are are not de-duplicated + foreach ( $jobs as $job ) { + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } + } - $wgMemc->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); - } ); + if ( $flags & self::QOS_ATOMIC ) { + $dbw->begin( $method ); // wrap all the job additions in one transaction + } + try { + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + array( + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ), + $method + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid slave lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { + $dbw->insert( 'job', $rowBatch, $method ); + } + JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) ); + } catch ( DBError $e ) { + if ( $flags & self::QOS_ATOMIC ) { + $dbw->rollback( $method ); + } + throw $e; } + if ( $flags & self::QOS_ATOMIC ) { + $dbw->commit( $method ); + } + + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); return true; } @@ -216,54 +280,52 @@ class JobQueueDB extends JobQueue { * @return Job|bool */ protected function doPop() { - global $wgMemc; - - if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { + if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { return false; // queue is empty } - list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); - $uuid = wfRandomString( 32 ); // pop attempt - $job = false; // job popped off - do { // retry when our row is invalid or deleted as a duplicate - // Try to reserve a row in the DB... - if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { - $row = $this->claimOldest( $uuid ); - } else { // random first - $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs - $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand - $row = $this->claimRandom( $uuid, $rand, $gte ); - } - // Check if we found a row to reserve... - if ( !$row ) { - $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); - break; // nothing to do - } - wfIncrStats( 'job-pop' ); - // Get the job object from the row... - $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); - if ( !$title ) { - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." ); - continue; // try again - } - $job = Job::factory( $row->job_cmd, $title, - self::extractBlob( $row->job_params ), $row->job_id ); - $job->id = $row->job_id; // XXX: work around broken subclasses - // Flag this job as an old duplicate based on its "root" job... - if ( $this->isRootJobOldDuplicate( $job ) ) { - wfIncrStats( 'job-pop-duplicate' ); - $job = DuplicateJob::newFromJob( $job ); // convert to a no-op - } - break; // done - } while( true ); + $uuid = wfRandomString( 32 ); // pop attempt + $job = false; // job popped off + do { // retry when our row is invalid or deleted as a duplicate + // Try to reserve a row in the DB... + if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { + $row = $this->claimOldest( $uuid ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $row = $this->claimRandom( $uuid, $rand, $gte ); + } + // Check if we found a row to reserve... + if ( !$row ) { + $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); + break; // nothing to do + } + JobQueue::incrStats( 'job-pop', $this->type ); + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebug( "Row has invalid title '{$row->job_title}'." ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + break; // done + } while ( true ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } return $job; } @@ -277,11 +339,9 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimRandom( $uuid, $rand, $gte ) { - global $wgMemc; - - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); // Check cache to see if the queue has <= OFFSET items - $tinyQueue = $wgMemc->get( $this->getCacheKey( 'small' ) ); + $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); $row = false; // the row acquired $invertedDirection = false; // whether one job_random direction was already scanned @@ -297,7 +357,7 @@ class JobQueueDB extends JobQueue { $dir = $gte ? 'ASC' : 'DESC'; $row = $dbw->selectRow( 'job', '*', // find a random job array( - 'job_cmd' => $this->type, + 'job_cmd' => $this->type, 'job_token' => '', // unclaimed "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), __METHOD__, @@ -314,7 +374,7 @@ class JobQueueDB extends JobQueue { // instead of job_random for reducing excess claim retries. $row = $dbw->selectRow( 'job', '*', // find a random job array( - 'job_cmd' => $this->type, + 'job_cmd' => $this->type, 'job_token' => '', // unclaimed ), __METHOD__, @@ -322,14 +382,14 @@ class JobQueueDB extends JobQueue { ); if ( !$row ) { $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows - $wgMemc->set( $this->getCacheKey( 'small' ), 1, 30 ); + $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); continue; // use job_random } } if ( $row ) { // claim the job $dbw->update( 'job', // update by PK array( - 'job_token' => $uuid, + 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp(), 'job_attempts = job_attempts+1' ), array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), @@ -355,7 +415,7 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimOldest( $uuid ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); $row = false; // the row acquired do { @@ -380,7 +440,7 @@ class JobQueueDB extends JobQueue { // This uses as much of the DB wrapper functions as possible. $dbw->update( 'job', array( - 'job_token' => $uuid, + 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp(), 'job_attempts = job_attempts+1' ), array( 'job_id = (' . @@ -399,7 +459,7 @@ class JobQueueDB extends JobQueue { array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ ); if ( !$row ) { // raced out by duplicate job removal - wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." ); + wfDebug( "Row deleted as duplicate by another process." ); } } else { break; // nothing to do @@ -410,100 +470,31 @@ class JobQueueDB extends JobQueue { } /** - * Recycle or destroy any jobs that have been claimed for too long - * - * @return integer Number of jobs recycled/deleted - */ - public function recycleAndDeleteStaleJobs() { - global $wgMemc; - - $now = time(); - list( $dbw, $scope ) = $this->getMasterDB(); - $count = 0; // affected rows - - if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { - return $count; // already in progress - } - - // Remove claims on jobs acquired for too long if enabled... - if ( $this->claimTTL > 0 ) { - $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); - // Get the IDs of jobs that have be claimed but not finished after too long. - // These jobs can be recycled into the queue by expiring the claim. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', - array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale - "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left - __METHOD__ - ); - $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); - if ( count( $ids ) ) { - // Reset job_token for these jobs so that other runners will pick them up. - // Set the timestamp to the current time, as it is useful to now that the job - // was already tried before (the timestamp becomes the "released" time). - $dbw->update( 'job', - array( - 'job_token' => '', - 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release - array( - 'job_id' => $ids ), - __METHOD__ - ); - $count += $dbw->affectedRows(); - wfIncrStats( 'job-recycle', $dbw->affectedRows() ); - $wgMemc->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); - } - } - - // Just destroy any stale jobs... - $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); - $conds = array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale - ); - if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... - $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; - } - // Get the IDs of jobs that are considered stale and should be removed. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); - $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); - if ( count( $ids ) ) { - $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); - $count += $dbw->affectedRows(); - } - - $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); - - return $count; - } - - /** * @see JobQueue::doAck() * @param Job $job * @throws MWException * @return Job|bool */ protected function doAck( Job $job ) { - if ( !$job->getId() ) { + if ( !isset( $job->metadata['id'] ) ) { throw new MWException( "Job of type '{$job->getType()}' has no ID." ); } - list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); - // Delete a row with a single DELETE without holding row locks over RTTs... - $dbw->delete( 'job', - array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ), __METHOD__ ); + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', + array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } return true; } @@ -527,44 +518,33 @@ class JobQueueDB extends JobQueue { // deferred till "transaction idle", do the same here, so that the ordering is // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. - list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->onTransactionIdle( function() use ( $params, $key, $scope ) { - global $wgMemc; - - $timestamp = $wgMemc->get( $key ); // current last timestamp of this job + $dbw = $this->getMasterDB(); + $cache = $this->dupCache; + $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) { + $timestamp = $cache->get( $key ); // current last timestamp of this job if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued } // Update the timestamp of the last root job started at the location... - return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); } ); return true; } /** - * Check if the "root" job of a given job has been superseded by a newer one - * - * @param $job Job + * @see JobQueue::doDelete() * @return bool */ - protected function isRootJobOldDuplicate( Job $job ) { - global $wgMemc; - - $params = $job->getParams(); - if ( !isset( $params['rootJobSignature'] ) ) { - return false; // job has no de-deplication info - } elseif ( !isset( $params['rootJobTimestamp'] ) ) { - trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." ); - return false; + protected function doDelete() { + $dbw = $this->getMasterDB(); + try { + $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); } - - // Get the last time this root job was enqueued - $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); - - // Check if a new root job was started at the location after this one's... - return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + return true; } /** @@ -582,7 +562,7 @@ class JobQueueDB extends JobQueue { return array( 'recycleAndDeleteStaleJobs' => array( 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) + 'period' => ceil( $this->claimTTL / 2 ) ) ); } @@ -591,10 +571,8 @@ class JobQueueDB extends JobQueue { * @return void */ protected function doFlushCaches() { - global $wgMemc; - foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { - $wgMemc->delete( $this->getCacheKey( $type ) ); + $this->cache->delete( $this->getCacheKey( $type ) ); } } @@ -603,51 +581,140 @@ class JobQueueDB extends JobQueue { * @return Iterator */ public function getAllQueuedJobs() { - list( $dbr, $scope ) = $this->getSlaveDB(); - return new MappedIterator( - $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function( $row ) use ( $scope ) { - $job = Job::factory( - $row->job_cmd, - Title::makeTitle( $row->job_namespace, $row->job_title ), - strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, - $row->job_id - ); - $job->id = $row->job_id; // XXX: work around broken subclasses - return $job; - } - ); + $dbr = $this->getSlaveDB(); + try { + return new MappedIterator( + $dbr->select( 'job', '*', + array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), + function( $row ) use ( $dbr ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, + $row->job_id + ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + return $job; + } + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } } - /** - * @return Array (DatabaseBase, ScopedCallback) - */ - protected function getSlaveDB() { - return $this->getDB( DB_SLAVE ); + public function getCoalesceLocationInternal() { + return $this->cluster + ? "DBCluster:{$this->cluster}:{$this->wiki}" + : "LBFactory:{$this->wiki}"; } - /** - * @return Array (DatabaseBase, ScopedCallback) - */ - protected function getMasterDB() { - return $this->getDB( DB_MASTER ); + protected function doGetSiblingQueuesWithJobs( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + array( 'job_cmd' => $types ), __METHOD__ ); + + $types = array(); + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), + array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); + + $sizes = array(); + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + return $sizes; } /** - * @param $index integer (DB_SLAVE/DB_MASTER) - * @return Array (DatabaseBase, ScopedCallback) + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted */ - protected function getDB( $index ) { - $lb = ( $this->cluster !== false ) - ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) - : wfGetLB( $this->wiki ); - $conn = $lb->getConnection( $index, array(), $this->wiki ); - return array( - $conn, - new ScopedCallback( function() use ( $lb, $conn ) { - $lb->reuseConnection( $conn ); - } ) - ); + public function recycleAndDeleteStaleJobs() { + $now = time(); + $count = 0; // affected rows + $dbw = $this->getMasterDB(); + + try { + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } + + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', + array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left + __METHOD__ + ); + $ids = array_map( + function( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_id' => $ids ), + __METHOD__ + ); + $count += $dbw->affectedRows(); + JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() ); + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + } + } + + // Just destroy any stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale + ); + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; + } + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); + $ids = array_map( + function( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); + $count += $dbw->affectedRows(); + JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() ); + } + + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $count; } /** @@ -655,7 +722,7 @@ class JobQueueDB extends JobQueue { * @return array */ protected function insertFields( Job $job ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); return array( // Fields that describe the nature of the job 'job_cmd' => $job->getType(), @@ -674,20 +741,45 @@ class JobQueueDB extends JobQueue { } /** - * @return string + * @return DBConnRef */ - private function getCacheKey( $property ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); + protected function getSlaveDB() { + try { + return $this->getDB( DB_SLAVE ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @return DBConnRef + */ + protected function getMasterDB() { + try { + return $this->getDB( DB_MASTER ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @param $index integer (DB_SLAVE/DB_MASTER) + * @return DBConnRef + */ + protected function getDB( $index ) { + $lb = ( $this->cluster !== false ) + ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) + : wfGetLB( $this->wiki ); + return $lb->getConnectionRef( $index, array(), $this->wiki ); } /** - * @param string $signature Hash identifier of the root job * @return string */ - private function getRootJobCacheKey( $signature ) { + private function getCacheKey( $property ) { list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); } /** @@ -713,4 +805,12 @@ class JobQueueDB extends JobQueue { return false; } } + + /** + * @param DBError $e + * @throws JobQueueError + */ + protected function throwDBException( DBError $e ) { + throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); + } } diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php new file mode 100644 index 00000000..d3ce164a --- /dev/null +++ b/includes/job/JobQueueFederated.php @@ -0,0 +1,473 @@ +<?php +/** + * Job queue code for federated queues. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle enqueueing and running of background jobs for federated queues + * + * This class allows for queues to be partitioned into smaller queues. + * A partition is defined by the configuration for a JobQueue instance. + * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a + * JobQueueFederated instance, which itself would consist of three JobQueueRedis + * instances, each using their own redis server. This would allow for the jobs + * to be split (evenly or based on weights) accross multiple servers if a single + * server becomes impractical or expensive. Different JobQueue classes can be mixed. + * + * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue + * is inherited by the partition queues. Additional configuration defines what + * section each wiki is in, what partition queues each section uses (and their weight), + * and the JobQueue configuration for each partition. Some sections might only need a + * single queue partition, like the sections for groups of small wikis. + * + * If used for performance, then $wgMainCacheType should be set to memcached/redis. + * Note that "fifo" cannot be used for the ordering, since the data is distributed. + * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, + * queue classes used by this should ignore down servers (with TTL) to avoid slowness. + * + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueFederated extends JobQueue { + /** @var Array (partition name => weight) reverse sorted by weight */ + protected $partitionMap = array(); + /** @var Array (partition name => JobQueue) reverse sorted by weight */ + protected $partitionQueues = array(); + /** @var HashRing */ + protected $partitionPushRing; + /** @var BagOStuff */ + protected $cache; + + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date + + /** + * @params include: + * - sectionsByWiki : A map of wiki IDs to section names. + * Wikis will default to using the section "default". + * - partitionsBySection : Map of section names to maps of (partition name => weight). + * A section called 'default' must be defined if not all wikis + * have explicitly defined sections. + * - configByPartition : Map of queue partition names to configuration arrays. + * These configuration arrays are passed to JobQueue::factory(). + * The options set here are overriden by those passed to this + * the federated queue itself (e.g. 'order' and 'claimTTL'). + * - partitionsNoPush : List of partition names that can handle pop() but not push(). + * This can be used to migrate away from a certain partition. + * @param array $params + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $section = isset( $params['sectionsByWiki'][$this->wiki] ) + ? $params['sectionsByWiki'][$this->wiki] + : 'default'; + if ( !isset( $params['partitionsBySection'][$section] ) ) { + throw new MWException( "No configuration for section '$section'." ); + } + // Get the full partition map + $this->partitionMap = $params['partitionsBySection'][$section]; + arsort( $this->partitionMap, SORT_NUMERIC ); + // Get the partitions jobs can actually be pushed to + $partitionPushMap = $this->partitionMap; + if ( isset( $params['partitionsNoPush'] ) ) { + foreach ( $params['partitionsNoPush'] as $partition ) { + unset( $partitionPushMap[$partition] ); + } + } + // Get the config to pass to merge into each partition queue config + $baseConfig = $params; + foreach ( array( 'class', 'sectionsByWiki', + 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o ) + { + unset( $baseConfig[$o] ); + } + // Get the partition queue objects + foreach ( $this->partitionMap as $partition => $w ) { + if ( !isset( $params['configByPartition'][$partition] ) ) { + throw new MWException( "No configuration for partition '$partition'." ); + } + $this->partitionQueues[$partition] = JobQueue::factory( + $baseConfig + $params['configByPartition'][$partition] ); + } + // Get the ring of partitions to push jobs into + $this->partitionPushRing = new HashRing( $partitionPushMap ); + // Aggregate cache some per-queue values if there are multiple partition queues + $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); + } + + protected function supportedOrders() { + // No FIFO due to partitioning, though "rough timestamp order" is supported + return array( 'undefined', 'random', 'timestamp' ); + } + + protected function optimalOrder() { + return 'undefined'; // defer to the partitions + } + + protected function supportsDelayedJobs() { + return true; // defer checks to the partitions + } + + protected function doIsEmpty() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + foreach ( $this->partitionQueues as $queue ) { + try { + if ( !$queue->doIsEmpty() ) { + $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); + return false; + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + + $this->cache->add( $key, 'true', self::CACHE_TTL_LONG ); + return true; + } + + protected function doGetSize() { + return $this->getCrossPartitionSum( 'size', 'doGetSize' ); + } + + protected function doGetAcquiredCount() { + return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); + } + + protected function doGetDelayedCount() { + return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); + } + + protected function doGetAbandonedCount() { + return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); + } + + /** + * @param string $type + * @param string $method + * @return integer + */ + protected function getCrossPartitionSum( $type, $method ) { + $key = $this->getCacheKey( $type ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $count = 0; + foreach ( $this->partitionQueues as $queue ) { + try { + $count += $queue->$method(); + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + return $count; + } + + protected function doBatchPush( array $jobs, $flags ) { + if ( !count( $jobs ) ) { + return true; // nothing to do + } + // Local ring variable that may be changed to point to a new ring on failure + $partitionRing = $this->partitionPushRing; + // Try to insert the jobs and update $partitionsTry on any failures + $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags ); + if ( count( $jobsLeft ) ) { // some jobs failed to insert? + // Try to insert the remaning jobs once more, ignoring the bad partitions + return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) ); + } + return true; + } + + /** + * @param array $jobs + * @param HashRing $partitionRing + * @param integer $flags + * @return array List of Job object that could not be inserted + */ + protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { + $jobsLeft = array(); + + // Because jobs are spread across partitions, per-job de-duplication needs + // to use a consistent hash to avoid allowing duplicate jobs per partition. + // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. + $uJobsByPartition = array(); // (partition name => job list) + foreach ( $jobs as $key => $job ) { + if ( $job->ignoreDuplicates() ) { + $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); + $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job; + unset( $jobs[$key] ); + } + } + // Get the batches of jobs that are not de-duplicated + if ( $flags & self::QOS_ATOMIC ) { + $nuJobBatches = array( $jobs ); // all or nothing + } else { + // Split the jobs into batches and spread them out over servers if there + // are many jobs. This helps keep the partitions even. Otherwise, send all + // the jobs to a single partition queue to avoids the extra connections. + $nuJobBatches = array_chunk( $jobs, 300 ); + } + + // Insert the de-duplicated jobs into the queues... + foreach ( $uJobsByPartition as $partition => $jobBatch ) { + $queue = $this->partitionQueues[$partition]; + try { + $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + MWExceptionHandler::logException( $e ); + } + if ( $ok ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); + } else { + $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist + if ( !$partitionRing ) { + throw new JobQueueError( "Could not insert job(s), all partitions are down." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + // Insert the jobs that are not de-duplicated into the queues... + foreach ( $nuJobBatches as $jobBatch ) { + $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() ); + $queue = $this->partitionQueues[$partition]; + try { + $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + MWExceptionHandler::logException( $e ); + } + if ( $ok ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); + } else { + $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist + if ( !$partitionRing ) { + throw new JobQueueError( "Could not insert job(s), all partitions are down." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + return $jobsLeft; + } + + protected function doPop() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return false; + } + + $partitionsTry = $this->partitionMap; // (partition => weight) + + while ( count( $partitionsTry ) ) { + $partition = ArrayUtils::pickRandom( $partitionsTry ); + if ( $partition === false ) { + break; // all partitions at 0 weight + } + $queue = $this->partitionQueues[$partition]; + try { + $job = $queue->pop(); + } catch ( JobQueueError $e ) { + $job = false; + MWExceptionHandler::logException( $e ); + } + if ( $job ) { + $job->metadata['QueuePartition'] = $partition; + return $job; + } else { + unset( $partitionsTry[$partition] ); // blacklist partition + } + } + + $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG ); + return false; + } + + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['QueuePartition'] ) ) { + throw new MWException( "The given job has no defined partition name." ); + } + return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); + } + + protected function doIsRootJobOldDuplicate( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); + } catch ( JobQueueError $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); + } + } + return false; + } + + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); + } catch ( JobQueueError $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); + } + } + return false; + } + + protected function doDelete() { + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->doDelete(); + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + } + + protected function doWaitForBackups() { + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->waitForBackups(); + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + } + + protected function doGetPeriodicTasks() { + $tasks = array(); + foreach ( $this->partitionQueues as $partition => $queue ) { + foreach ( $queue->getPeriodicTasks() as $task => $def ) { + $tasks["{$partition}:{$task}"] = $def; + } + } + return $tasks; + } + + protected function doFlushCaches() { + static $types = array( + 'empty', + 'size', + 'acquiredcount', + 'delayedcount', + 'abandonedcount' + ); + foreach ( $types as $type ) { + $this->cache->delete( $this->getCacheKey( $type ) ); + } + foreach ( $this->partitionQueues as $queue ) { + $queue->doFlushCaches(); + } + } + + public function getAllQueuedJobs() { + $iterator = new AppendIterator(); + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllQueuedJobs() ); + } + return $iterator; + } + + public function getAllDelayedJobs() { + $iterator = new AppendIterator(); + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllDelayedJobs() ); + } + return $iterator; + } + + public function getCoalesceLocationInternal() { + return "JobQueueFederated:wiki:{$this->wiki}" . + sha1( serialize( array_keys( $this->partitionMap ) ) ); + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + try { + $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); + if ( is_array( $nonEmpty ) ) { + $result = array_unique( array_merge( $result, $nonEmpty ) ); + } else { + return null; // not supported on all partitions; bail + } + if ( count( $result ) == count( $types ) ) { + break; // short-circuit + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + return array_values( $result ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + try { + $sizes = $queue->doGetSiblingQueueSizes( $types ); + if ( is_array( $sizes ) ) { + foreach ( $sizes as $type => $size ) { + $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + } + } else { + return null; // not supported on all partitions; bail + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + return $result; + } + + public function setTestingPrefix( $key ) { + foreach ( $this->partitionQueues as $queue ) { + $queue->setTestingPrefix( $key ); + } + } + + /** + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); + } +} diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 351c71a3..fa7fee5f 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -36,10 +36,14 @@ class JobQueueGroup { protected $wiki; // string; wiki ID + /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ + protected $coalescedQueues; + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job const USE_CACHE = 1; // integer; use process or persistent cache + const USE_PRIORITY = 2; // integer; respect deprioritization const PROC_CACHE_TTL = 15; // integer; seconds @@ -146,6 +150,9 @@ class JobQueueGroup { */ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { if ( is_string( $qtype ) ) { // specific job type + if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) { + return false; // back off + } $job = $this->get( $qtype )->pop(); if ( !$job ) { JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); @@ -167,6 +174,9 @@ class JobQueueGroup { shuffle( $types ); // avoid starvation foreach ( $types as $type ) { // for each queue... + if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) { + continue; // back off + } $job = $this->get( $type )->pop(); if ( $job ) { // found return $job; @@ -247,15 +257,72 @@ class JobQueueGroup { */ public function getQueuesWithJobs() { $types = array(); - foreach ( $this->getQueueTypes() as $type ) { - if ( !$this->get( $type )->isEmpty() ) { - $types[] = $type; + foreach ( $this->getCoalescedQueues() as $info ) { + $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); + if ( is_array( $nonEmpty ) ) { // batching features supported + $types = array_merge( $types, $nonEmpty ); + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } } } return $types; } /** + * Get the size of the queus for a list of job types + * + * @return Array Map of (job type => size) + */ + public function getQueueSizes() { + $sizeMap = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); + if ( is_array( $sizes ) ) { // batching features supported + $sizeMap = $sizeMap + $sizes; + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + $sizeMap[$type] = $this->get( $type )->getSize(); + } + } + } + return $sizeMap; + } + + /** + * @return array + */ + protected function getCoalescedQueues() { + global $wgJobTypeConf; + + if ( $this->coalescedQueues === null ) { + $this->coalescedQueues = array(); + foreach ( $wgJobTypeConf as $type => $conf ) { + $queue = JobQueue::factory( + array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); + $loc = $queue->getCoalesceLocationInternal(); + if ( !isset( $this->coalescedQueues[$loc] ) ) { + $this->coalescedQueues[$loc]['queue'] = $queue; + $this->coalescedQueues[$loc]['types'] = array(); + } + if ( $type === 'default' ) { + $this->coalescedQueues[$loc]['types'] = array_merge( + $this->coalescedQueues[$loc]['types'], + array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) + ); + } else { + $this->coalescedQueues[$loc]['types'][] = $type; + } + } + } + + return $this->coalescedQueues; + } + + /** * Check if jobs should not be popped of a queue right now. * This is only used for performance, such as to avoid spamming * the queue with many sub-jobs before they actually get run. @@ -264,10 +331,15 @@ class JobQueueGroup { * @return bool */ public function isQueueDeprioritized( $type ) { + if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) { + return $this->cache->get( 'isDeprioritized', $type ); + } if ( $type === 'refreshLinks2' ) { // Don't keep converting refreshLinks2 => refreshLinks jobs if the // later jobs have not been done yet. This helps throttle queue spam. - return !$this->get( 'refreshLinks' )->isEmpty(); + $deprioritized = !$this->get( 'refreshLinks' )->isEmpty(); + $this->cache->set( 'isDeprioritized', $type, $deprioritized ); + return $deprioritized; } return false; } @@ -298,9 +370,13 @@ class JobQueueGroup { } elseif ( !isset( $lastRuns[$type][$task] ) || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) { - if ( call_user_func( $definition['callback'] ) !== null ) { - $tasksRun[$type][$task] = time(); - ++$count; + try { + if ( call_user_func( $definition['callback'] ) !== null ) { + $tasksRun[$type][$task] = time(); + ++$count; + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); } } } diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php new file mode 100644 index 00000000..378e1755 --- /dev/null +++ b/includes/job/JobQueueRedis.php @@ -0,0 +1,856 @@ +<?php +/** + * Redis-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle job queues stored in Redis + * + * This is faster, less resource intensive, queue that JobQueueDB. + * All data for a queue using this class is placed into one redis server. + * + * There are eight main redis keys used to track jobs: + * - l-unclaimed : A list of job IDs used for ready unclaimed jobs + * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries + * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs + * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs + * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication + * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication + * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries + * - h-data : A hash of (job ID => serialized blobs) for job storage + * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. + * If an ID appears in any of those lists, it should have a h-data entry for its ID. + * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then + * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById + * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its + * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. + * + * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. + * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. + * All the keys are prefixed with the relevant wiki ID information. + * + * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. + * Additionally, it should be noted that redis has different persistence modes, such + * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be + * made on the servers based on what queues are using it and what tolerance they have. + * + * @ingroup JobQueue + * @ingroup Redis + * @since 1.22 + */ +class JobQueueRedis extends JobQueue { + /** @var RedisConnectionPool */ + protected $redisPool; + + protected $server; // string; server address + protected $compression; // string; compression method to use + + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) + + protected $key; // string; key to prefix the queue keys with (used for testing) + + /** + * @params include: + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * Note that the serializer option is ignored "none" is always used. + * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. + * - compression : The type of compression to use; one of (none,gzip). + * @param array $params + */ + public function __construct( array $params ) { + parent::__construct( $params ); + $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua + $this->server = $params['redisServer']; + $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + } + + protected function supportedOrders() { + return array( 'timestamp', 'fifo' ); + } + + protected function optimalOrder() { + return 'fifo'; + } + + protected function supportsDelayedJobs() { + return true; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + * @throws MWException + */ + protected function doIsEmpty() { + return $this->doGetSize() == 0; + } + + /** + * @see JobQueue::doGetSize() + * @return integer + * @throws MWException + */ + protected function doGetSize() { + $conn = $this->getConnection(); + try { + return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return integer + * @throws MWException + */ + protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + $conn = $this->getConnection(); + try { + $conn->multi( Redis::PIPELINE ); + $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); + $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + return array_sum( $conn->exec() ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetDelayedCount() + * @return integer + * @throws MWException + */ + protected function doGetDelayedCount() { + if ( !$this->checkDelay ) { + return 0; // no delayed jobs + } + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return integer + * @throws MWException + */ + protected function doGetAbandonedCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doBatchPush() + * @param array $jobs + * @param $flags + * @return bool + * @throws MWException + */ + protected function doBatchPush( array $jobs, $flags ) { + // Convert the jobs into field maps (de-duplicated against each other) + $items = array(); // (job ID => job fields map) + foreach ( $jobs as $job ) { + $item = $this->getNewJobFields( $job ); + if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate + $items[$item['sha1']] = $item; + } else { + $items[$item['uuid']] = $item; + } + } + + if ( !count( $items ) ) { + return true; // nothing to do + } + + $conn = $this->getConnection(); + try { + // Actually push the non-duplicate jobs into the queue... + if ( $flags & self::QOS_ATOMIC ) { + $batches = array( $items ); // all or nothing + } else { + $batches = array_chunk( $items, 500 ); // avoid tying up the server + } + $failed = 0; + $pushed = 0; + foreach ( $batches as $itemBatch ) { + $added = $this->pushBlobs( $conn, $itemBatch ); + if ( is_int( $added ) ) { + $pushed += $added; + } else { + $failed += count( $itemBatch ); + } + } + if ( $failed > 0 ) { + wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); + return false; + } + JobQueue::incrStats( 'job-insert', $this->type, count( $items ) ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $items ) - $failed - $pushed ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return true; + } + + /** + * @param RedisConnRef $conn + * @param array $items List of results from JobQueueRedis::getNewJobFields() + * @return integer Number of jobs inserted (duplicates are ignored) + * @throws RedisException + */ + protected function pushBlobs( RedisConnRef $conn, array $items ) { + $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) + foreach ( $items as $item ) { + $args[] = (string)$item['uuid']; + $args[] = (string)$item['sha1']; + $args[] = (string)$item['rtimestamp']; + $args[] = (string)$this->serialize( $item ); + } + static $script = +<<<LUA + if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end + local pushed = 0 + for i = 1,#ARGV,4 do + local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] + if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then + if 1*rtimestamp > 0 then + -- Insert into delayed queue (release time as score) + redis.call('zAdd',KEYS[4],rtimestamp,id) + else + -- Insert into unclaimed queue + redis.call('lPush',KEYS[1],id) + end + if sha1 ~= '' then + redis.call('hSet',KEYS[2],id,sha1) + redis.call('hSet',KEYS[3],sha1,id) + end + redis.call('hSet',KEYS[5],id,blob) + pushed = pushed + 1 + end + end + return pushed +LUA; + return $conn->luaEval( $script, + array_merge( + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-delayed' ), # KEYS[4] + $this->getQueueKey( 'h-data' ), # KEYS[5] + ), + $args + ), + 5 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + * @throws MWException + */ + protected function doPop() { + $job = false; + + // Push ready delayed jobs into the queue every 10 jobs to spread the load. + // This is also done as a periodic task, but we don't want too much done at once. + if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { + $this->releaseReadyDelayedJobs(); + } + + $conn = $this->getConnection(); + try { + do { + if ( $this->claimTTL > 0 ) { + // Keep the claimed job list down for high-traffic queues + if ( mt_rand( 0, 99 ) == 0 ) { + $this->recycleAndDeleteStaleJobs(); + } + $blob = $this->popAndAcquireBlob( $conn ); + } else { + $blob = $this->popAndDeleteBlob( $conn ); + } + if ( $blob === false ) { + break; // no jobs; nothing to do + } + + JobQueue::incrStats( 'job-pop', $this->type ); + $item = $this->unserialize( $blob ); + if ( $item === false ) { + wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); + continue; + } + + // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed + $job = $this->getJobFromFields( $item ); // may be false + } while ( !$job ); // job may be false if invalid + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $job; + } + + /** + * @param RedisConnRef $conn + * @return array serialized string or false + * @throws RedisException + */ + protected function popAndDeleteBlob( RedisConnRef $conn ) { + static $script = +<<<LUA + -- Pop an item off the queue + local id = redis.call('rpop',KEYS[1]) + if not id then return false end + -- Get the job data and remove it + local item = redis.call('hGet',KEYS[4],id) + redis.call('hDel',KEYS[4],id) + -- Allow new duplicates of this job + local sha1 = redis.call('hGet',KEYS[2],id) + if sha1 then redis.call('hDel',KEYS[3],sha1) end + redis.call('hDel',KEYS[2],id) + -- Return the job data + return item +LUA; + return $conn->luaEval( $script, + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'h-data' ), # KEYS[4] + ), + 4 # number of first argument(s) that are keys + ); + } + + /** + * @param RedisConnRef $conn + * @return array serialized string or false + * @throws RedisException + */ + protected function popAndAcquireBlob( RedisConnRef $conn ) { + static $script = +<<<LUA + -- Pop an item off the queue + local id = redis.call('rPop',KEYS[1]) + if not id then return false end + -- Allow new duplicates of this job + local sha1 = redis.call('hGet',KEYS[2],id) + if sha1 then redis.call('hDel',KEYS[3],sha1) end + redis.call('hDel',KEYS[2],id) + -- Mark the jobs as claimed and return it + redis.call('zAdd',KEYS[4],ARGV[1],id) + redis.call('hIncrBy',KEYS[5],id,1) + return redis.call('hGet',KEYS[6],id) +LUA; + return $conn->luaEval( $script, + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-claimed' ), # KEYS[4] + $this->getQueueKey( 'h-attempts' ), # KEYS[5] + $this->getQueueKey( 'h-data' ), # KEYS[6] + time(), # ARGV[1] (injected to be replication-safe) + ), + 6 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @return Job|bool + * @throws MWException + */ + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['uuid'] ) ) { + throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); + } + if ( $this->claimTTL > 0 ) { + $conn = $this->getConnection(); + try { + static $script = +<<<LUA + -- Unmark the job as claimed + redis.call('zRem',KEYS[1],ARGV[1]) + redis.call('hDel',KEYS[2],ARGV[1]) + -- Delete the job data itself + return redis.call('hDel',KEYS[3],ARGV[1]) +LUA; + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $job->metadata['uuid'] # ARGV[1] + ), + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + return false; + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @return bool + * @throws MWException + */ + protected function doDeduplicateRootJob( Job $job ) { + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + + $conn = $this->getConnection(); + try { + $timestamp = $conn->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + // Update the timestamp of the last root job started at the location... + return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doIsRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $conn = $this->getConnection(); + try { + // Get the last time this root job was enqueued + $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @see JobQueue::doDelete() + * @return bool + */ + protected function doDelete() { + static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', + 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); + + $conn = $this->getConnection(); + try { + $keys = array(); + foreach ( $props as $prop ) { + $keys[] = $this->getQueueKey( $prop ); + } + return ( $conn->delete( $keys ) !== false ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + return new MappedIterator( + $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), + function( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { return is_object( $job ); } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllDelayedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + return new MappedIterator( // delayed jobs + $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), + function( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { return is_object( $job ); } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + public function getCoalesceLocationInternal() { + return "RedisServer:" . $this->server; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $sizes = array(); // (type => size) + $types = array_values( $types ); // reindex + try { + $conn = $this->getConnection(); + $conn->multi( Redis::PIPELINE ); + foreach ( $types as $type ) { + $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); + } + $res = $conn->exec(); + if ( is_array( $res ) ) { + foreach ( $res as $i => $size ) { + $sizes[$types[$i]] = $size; + } + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + return $sizes; + } + + /** + * This function should not be called outside JobQueueRedis + * + * @param $uid string + * @param $conn RedisConnRef + * @return Job|bool Returns false if the job does not exist + * @throws MWException + */ + public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { + try { + $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); + if ( $data === false ) { + return false; // not found + } + $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); + if ( !is_array( $item ) ) { // this shouldn't happen + throw new MWException( "Could not find job with ID '$uid'." ); + } + $title = Title::makeTitle( $item['namespace'], $item['title'] ); + $job = Job::factory( $item['type'], $title, $item['params'] ); + $job->metadata['uuid'] = $item['uuid']; + return $job; + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * Release any ready delayed jobs into the queue + * + * @return integer Number of jobs released + * @throws MWException + */ + public function releaseReadyDelayedJobs() { + $count = 0; + + $conn = $this->getConnection(); + try { + static $script = +<<<LUA + -- Get the list of ready delayed jobs, sorted by readiness + local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1]) + -- Migrate the jobs from the "delayed" set to the "unclaimed" list + for k,id in ipairs(ids) do + redis.call('lPush',KEYS[2],id) + redis.call('zRem',KEYS[1],id) + end + return #ids +LUA; + $count += (int)$conn->luaEval( $script, + array( + $this->getQueueKey( 'z-delayed' ), // KEYS[1] + $this->getQueueKey( 'l-unclaimed' ), // KEYS[2] + time() // ARGV[1]; max "delay until" UNIX timestamp + ), + 2 # first two arguments are keys + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $count; + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted + * @throws MWException + */ + public function recycleAndDeleteStaleJobs() { + if ( $this->claimTTL <= 0 ) { // sanity + throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." ); + } + $count = 0; + // For each job item that can be retried, we need to add it back to the + // main queue and remove it from the list of currenty claimed job items. + // For those that cannot, they are marked as dead and kept around for + // investigation and manual job restoration but are eventually deleted. + $conn = $this->getConnection(); + try { + $now = time(); + static $script = +<<<LUA + local released,abandoned,pruned = 0,0,0 + -- Get all non-dead jobs that have an expired claim on them. + -- The score for each item is the last claim timestamp (UNIX). + local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1]) + for k,id in ipairs(staleClaims) do + local timestamp = redis.call('zScore',KEYS[1],id) + local attempts = redis.call('hGet',KEYS[2],id) + if attempts < ARGV[3] then + -- Claim expired and retries left: re-enqueue the job + redis.call('lPush',KEYS[3],id) + redis.call('hIncrBy',KEYS[2],id,1) + released = released + 1 + else + -- Claim expired and no retries left: mark the job as dead + redis.call('zAdd',KEYS[5],timestamp,id) + abandoned = abandoned + 1 + end + redis.call('zRem',KEYS[1],id) + end + -- Get all of the dead jobs that have been marked as dead for too long. + -- The score for each item is the last claim timestamp (UNIX). + local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2]) + for k,id in ipairs(deadClaims) do + -- Stale and out of retries: remove any traces of the job + redis.call('zRem',KEYS[5],id) + redis.call('hDel',KEYS[2],id) + redis.call('hDel',KEYS[4],id) + pruned = pruned + 1 + end + return {released,abandoned,pruned} +LUA; + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] + $this->getQueueKey( 'h-data' ), # KEYS[4] + $this->getQueueKey( 'z-abandoned' ), # KEYS[5] + $now - $this->claimTTL, # ARGV[1] + $now - self::MAX_AGE_PRUNE, # ARGV[2] + $this->maxTries # ARGV[3] + ), + 5 # number of first argument(s) that are keys + ); + if ( $res ) { + list( $released, $abandoned, $pruned ) = $res; + $count += $released + $pruned; + JobQueue::incrStats( 'job-recycle', $this->type, $released ); + JobQueue::incrStats( 'job-abandon', $this->type, $abandoned ); + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $count; + } + + /** + * @return Array + */ + protected function doGetPeriodicTasks() { + $tasks = array(); + if ( $this->claimTTL > 0 ) { + $tasks['recycleAndDeleteStaleJobs'] = array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) + ); + } + if ( $this->checkDelay ) { + $tasks['releaseReadyDelayedJobs'] = array( + 'callback' => array( $this, 'releaseReadyDelayedJobs' ), + 'period' => 300 // 5 minutes + ); + } + return $tasks; + } + + /** + * @param $job Job + * @return array + */ + protected function getNewJobFields( Job $job ) { + return array( + // Fields that describe the nature of the job + 'type' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getDBkey(), + 'params' => $job->getParams(), + // Some jobs cannot run until a "release timestamp" + 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, + // Additional job metadata + 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), + 'sha1' => $job->ignoreDuplicates() + ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) + : '', + 'timestamp' => time() // UNIX timestamp + ); + } + + /** + * @param $fields array + * @return Job|bool + */ + protected function getJobFromFields( array $fields ) { + $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); + if ( $title ) { + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['uuid'] = $fields['uuid']; + return $job; + } + return false; + } + + /** + * @param array $fields + * @return string Serialized and possibly compressed version of $fields + */ + protected function serialize( array $fields ) { + $blob = serialize( $fields ); + if ( $this->compression === 'gzip' + && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) ) + { + $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); + $blobz = serialize( $object ); + return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; + } else { + return $blob; + } + } + + /** + * @param string $blob + * @return array|bool Unserialized version of $blob or false + */ + protected function unserialize( $blob ) { + $fields = unserialize( $blob ); + if ( is_object( $fields ) ) { + if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { + $fields = unserialize( gzinflate( $fields->blob ) ); + } else { + $fields = false; + } + } + return is_array( $fields ) ? $fields : false; + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return Array (server name, Redis instance) + * @throws MWException + */ + protected function getConnection() { + $conn = $this->redisPool->getConnection( $this->server ); + if ( !$conn ) { + throw new JobQueueConnectionError( "Unable to connect to redis server." ); + } + return $conn; + } + + /** + * @param $server string + * @param $conn RedisConnRef + * @param $e RedisException + * @throws MWException + */ + protected function throwRedisException( $server, RedisConnRef $conn, $e ) { + $this->redisPool->handleException( $server, $conn, $e ); + throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); + } + + /** + * @param $prop string + * @param $type string|null + * @return string + */ + private function getQueueKey( $prop, $type = null ) { + $type = is_string( $type ) ? $type : $this->type; + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + if ( strlen( $this->key ) ) { // namespaced queue (for testing) + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); + } else { + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); + } + } + + /** + * @param $key string + * @return void + */ + public function setTestingPrefix( $key ) { + $this->key = $key; + } +} diff --git a/includes/job/JobQueueAggregator.php b/includes/job/aggregator/JobQueueAggregator.php index 3dba3c53..a8186abd 100644 --- a/includes/job/JobQueueAggregator.php +++ b/includes/job/aggregator/JobQueueAggregator.php @@ -119,6 +119,23 @@ abstract class JobQueueAggregator { abstract protected function doGetAllReadyWikiQueues(); /** + * Purge all of the aggregator information + * + * @return bool Success + */ + final public function purge() { + wfProfileIn( __METHOD__ ); + $res = $this->doPurge(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueueAggregator::purge() + */ + abstract protected function doPurge(); + + /** * Get all databases that have a pending job. * This poll all the queues and is this expensive. * diff --git a/includes/job/JobQueueAggregatorMemc.php b/includes/job/aggregator/JobQueueAggregatorMemc.php index 4b82cf92..9434da04 100644 --- a/includes/job/JobQueueAggregatorMemc.php +++ b/includes/job/aggregator/JobQueueAggregatorMemc.php @@ -91,9 +91,9 @@ class JobQueueAggregatorMemc extends JobQueueAggregator { if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock $pendingDbInfo = array( 'pendingDBs' => $this->findPendingWikiQueues(), - 'timestamp' => time() + 'timestamp' => time() ); - for ( $attempts=1; $attempts <= 25; ++$attempts ) { + for ( $attempts = 1; $attempts <= 25; ++$attempts ) { if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock $this->cache->set( $key, $pendingDbInfo ); $this->cache->delete( "$key:lock" ); // unlock @@ -109,6 +109,13 @@ class JobQueueAggregatorMemc extends JobQueueAggregator { } /** + * @see JobQueueAggregator::doPurge() + */ + protected function doPurge() { + return $this->cache->delete( $this->getReadyQueueCacheKey() ); + } + + /** * @return string */ private function getReadyQueueCacheKey() { diff --git a/includes/job/JobQueueAggregatorRedis.php b/includes/job/aggregator/JobQueueAggregatorRedis.php index 74e9171c..c6a799df 100644 --- a/includes/job/JobQueueAggregatorRedis.php +++ b/includes/job/aggregator/JobQueueAggregatorRedis.php @@ -25,6 +25,7 @@ * Class to handle tracking information about all queues using PhpRedis * * @ingroup JobQueue + * @ingroup Redis * @since 1.21 */ class JobQueueAggregatorRedis extends JobQueueAggregator { @@ -100,8 +101,18 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { $pendingDBs[$type][] = $wiki; } } else { // cache miss + // Avoid duplicated effort + $conn->multi( Redis::MULTI ); + $conn->setnx( $this->getReadyQueueKey() . ":lock", 1 ); + $conn->expire( $this->getReadyQueueKey() . ":lock", 3600 ); + if ( $conn->exec() !== array( true, true ) ) { // lock + return array(); // already in progress + } + $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) + $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock + $now = time(); $map = array(); foreach ( $pendingDBs as $type => $wikis ) { @@ -120,6 +131,23 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { } /** + * @see JobQueueAggregator::doPurge() + */ + protected function doPurge() { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->delete( $this->getReadyQueueKey() ); + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return false; + } + return true; + } + + /** * Get a connection to the server that handles all sub-queues for this queue * * @return Array (server name, Redis instance) diff --git a/includes/job/jobs/AssembleUploadChunksJob.php b/includes/job/jobs/AssembleUploadChunksJob.php index c5dd9eaa..6237e568 100644 --- a/includes/job/jobs/AssembleUploadChunksJob.php +++ b/includes/job/jobs/AssembleUploadChunksJob.php @@ -42,6 +42,15 @@ class AssembleUploadChunksJob extends Job { return false; } + if ( count( $_SESSION ) === 0 ) { + // Empty session probably indicates that we didn't associate + // with the session correctly. Note that being able to load + // the user does not necessarily mean the session was loaded. + // Most likely cause by suhosin.session.encrypt = On. + $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" ); + return false; + } + UploadBase::setSessionStatus( $this->params['filekey'], array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ) @@ -82,11 +91,11 @@ class AssembleUploadChunksJob extends Job { UploadBase::setSessionStatus( $this->params['filekey'], array( - 'result' => 'Success', - 'stage' => 'assembling', - 'filekey' => $newFileKey, + 'result' => 'Success', + 'stage' => 'assembling', + 'filekey' => $newFileKey, 'imageinfo' => $imageInfo, - 'status' => Status::newGood() + 'status' => Status::newGood() ) ); } catch ( MWException $e ) { @@ -94,7 +103,7 @@ class AssembleUploadChunksJob extends Job { $this->params['filekey'], array( 'result' => 'Failure', - 'stage' => 'assembling', + 'stage' => 'assembling', 'status' => Status::newFatal( 'api-error-stashfailed' ) ) ); diff --git a/includes/job/jobs/DoubleRedirectJob.php b/includes/job/jobs/DoubleRedirectJob.php index 05abeeef..33e749b8 100644 --- a/includes/job/jobs/DoubleRedirectJob.php +++ b/includes/job/jobs/DoubleRedirectJob.php @@ -90,40 +90,40 @@ class DoubleRedirectJob extends Job { $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST ); if ( !$targetRev ) { - wfDebug( __METHOD__.": target redirect already deleted, ignoring\n" ); + wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" ); return true; } $content = $targetRev->getContent(); $currentDest = $content ? $content->getRedirectTarget() : null; if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) { - wfDebug( __METHOD__.": Redirect has changed since the job was queued\n" ); + wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" ); return true; } - # Check for a suppression tag (used e.g. in periodically archived discussions) + // Check for a suppression tag (used e.g. in periodically archived discussions) $mw = MagicWord::get( 'staticredirect' ); if ( $content->matchMagicWord( $mw ) ) { - wfDebug( __METHOD__.": skipping: suppressed with __STATICREDIRECT__\n" ); + wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" ); return true; } - # Find the current final destination + // Find the current final destination $newTitle = self::getFinalDestination( $this->redirTitle ); if ( !$newTitle ) { - wfDebug( __METHOD__.": skipping: single redirect, circular redirect or invalid redirect destination\n" ); + wfDebug( __METHOD__ . ": skipping: single redirect, circular redirect or invalid redirect destination\n" ); return true; } if ( $newTitle->equals( $this->redirTitle ) ) { - # The redirect is already right, no need to change it - # This can happen if the page was moved back (say after vandalism) - wfDebug( __METHOD__.": skipping, already good\n" ); + // The redirect is already right, no need to change it + // This can happen if the page was moved back (say after vandalism) + wfDebug( __METHOD__ . " : skipping, already good\n" ); } - # Preserve fragment (bug 14904) + // Preserve fragment (bug 14904) $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(), $currentDest->getFragment(), $newTitle->getInterwiki() ); - # Fix the text + // Fix the text $newContent = $content->updateRedirect( $newTitle ); if ( $newContent->equals( $content ) ) { @@ -137,11 +137,13 @@ class DoubleRedirectJob extends Job { return false; } - # Save it + // Save it global $wgUser; $oldUser = $wgUser; $wgUser = $user; $article = WikiPage::factory( $this->title ); + + // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance $reason = wfMessage( 'double-redirect-fixed-' . $this->reason, $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText() )->inContentLanguage()->text(); @@ -161,7 +163,8 @@ class DoubleRedirectJob extends Job { public static function getFinalDestination( $title ) { $dbw = wfGetDB( DB_MASTER ); - $seenTitles = array(); # Circular redirect check + // Circular redirect check + $seenTitles = array(); $dest = false; while ( true ) { diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php index 524983b8..be1bfe5c 100644 --- a/includes/job/jobs/DuplicateJob.php +++ b/includes/job/jobs/DuplicateJob.php @@ -45,7 +45,7 @@ final class DuplicateJob extends Job { * @return Job */ public static function newFromJob( Job $job ) { - $djob = new self( $job->getTitle(), $job->getParams(), $job->getId() ); + $djob = new self( $job->getTitle(), $job->getParams(), $job->id ); $djob->command = $job->getType(); $djob->params = is_array( $djob->params ) ? $djob->params : array(); $djob->params = array( 'isDuplicate' => true ) + $djob->params; diff --git a/includes/job/jobs/EnotifNotifyJob.php b/includes/job/jobs/EnotifNotifyJob.php index 2be05b63..bbe988d0 100644 --- a/includes/job/jobs/EnotifNotifyJob.php +++ b/includes/job/jobs/EnotifNotifyJob.php @@ -35,7 +35,7 @@ class EnotifNotifyJob extends Job { function run() { $enotif = new EmailNotification(); // Get the user from ID (rename safe). Anons are 0, so defer to name. - if( isset( $this->params['editorID'] ) && $this->params['editorID'] ) { + if ( isset( $this->params['editorID'] ) && $this->params['editorID'] ) { $editor = User::newFromId( $this->params['editorID'] ); // B/C, only the name might be given. } else { diff --git a/includes/job/jobs/HTMLCacheUpdateJob.php b/includes/job/jobs/HTMLCacheUpdateJob.php index 818c6abf..44c240bb 100644 --- a/includes/job/jobs/HTMLCacheUpdateJob.php +++ b/includes/job/jobs/HTMLCacheUpdateJob.php @@ -79,8 +79,16 @@ class HTMLCacheUpdateJob extends Job { * Update all of the backlinks */ protected function doFullUpdate() { + global $wgMaxBacklinksInvalidate; + # Get an estimate of the number of rows from the BacklinkCache - $numRows = $this->blCache->getNumLinks( $this->params['table'] ); + $max = max( $this->rowsPerJob * 2, $wgMaxBacklinksInvalidate ) + 1; + $numRows = $this->blCache->getNumLinks( $this->params['table'], $max ); + if ( $wgMaxBacklinksInvalidate !== false && $numRows > $wgMaxBacklinksInvalidate ) { + wfDebug( "Skipped HTML cache invalidation of {$this->title->getPrefixedText()}." ); + return true; + } + if ( $numRows > $this->rowsPerJob * 2 ) { # Do fast cached partition $this->insertPartitionJobs(); @@ -90,12 +98,13 @@ class HTMLCacheUpdateJob extends Job { # Check if the row count estimate was correct if ( $titleArray->count() > $this->rowsPerJob * 2 ) { # Not correct, do accurate partition - wfDebug( __METHOD__.": row count estimate was incorrect, repartitioning\n" ); + wfDebug( __METHOD__ . ": row count estimate was incorrect, repartitioning\n" ); $this->insertJobsFromTitles( $titleArray ); } else { $this->invalidateTitles( $titleArray ); // just do the query } } + return true; } @@ -145,7 +154,7 @@ class HTMLCacheUpdateJob extends Job { array( 'table' => $this->params['table'], 'start' => $start, - 'end' => $id - 1 + 'end' => $id - 1 ) + $rootJobParams // carry over information for de-duplication ); $start = $id; @@ -158,16 +167,16 @@ class HTMLCacheUpdateJob extends Job { array( 'table' => $this->params['table'], 'start' => $start, - 'end' => $this->params['end'] + 'end' => $this->params['end'] ) + $rootJobParams // carry over information for de-duplication ); - wfDebug( __METHOD__.": repartitioning into " . count( $jobs ) . " jobs\n" ); + wfDebug( __METHOD__ . ": repartitioning into " . count( $jobs ) . " jobs\n" ); if ( count( $jobs ) < 2 ) { # I don't think this is possible at present, but handling this case # makes the code a bit more robust against future code updates and # avoids a potential infinite loop of repartitioning - wfDebug( __METHOD__.": repartitioning failed!\n" ); + wfDebug( __METHOD__ . ": repartitioning failed!\n" ); $this->invalidateTitles( $titleArray ); } else { JobQueueGroup::singleton()->push( $jobs ); @@ -194,7 +203,7 @@ class HTMLCacheUpdateJob extends Job { array( 'table' => $this->params['table'], 'start' => $start, - 'end' => $end, + 'end' => $end, ) + $rootJobParams // carry over information for de-duplication ); } @@ -245,7 +254,7 @@ class HTMLCacheUpdateJob extends Job { } # Update file cache - if ( $wgUseFileCache ) { + if ( $wgUseFileCache ) { foreach ( $titleArray as $title ) { HTMLFileCache::clearFileCache( $title ); } diff --git a/includes/job/jobs/NullJob.php b/includes/job/jobs/NullJob.php index d282a8e6..b6164a5d 100644 --- a/includes/job/jobs/NullJob.php +++ b/includes/job/jobs/NullJob.php @@ -26,6 +26,22 @@ * in the queue and/or sleep for a brief time period. These can be used * to represent "no-op" jobs or test lock contention and performance. * + * @par Example: + * Inserting a null job in the configured job queue: + * @code + * $ php maintenance/eval.php + * > $queue = JobQueueGroup::singleton(); + * > $job = new NullJob( Title::newMainPage(), array( 'lives' => 10 ) ); + * > $queue->push( $job ); + * @endcode + * You can then confirm the job has been enqueued by using the showJobs.php + * maintenance utility: + * @code + * $ php maintenance/showJobs.php --group + * null: 1 queue; 0 claimed (0 active, 0 abandoned) + * $ + * @endcode + * * @ingroup JobQueue */ class NullJob extends Job { diff --git a/includes/job/jobs/PublishStashedFileJob.php b/includes/job/jobs/PublishStashedFileJob.php index d3feda28..5a24f93c 100644 --- a/includes/job/jobs/PublishStashedFileJob.php +++ b/includes/job/jobs/PublishStashedFileJob.php @@ -42,13 +42,23 @@ class PublishStashedFileJob extends Job { return false; } + if ( count( $_SESSION ) === 0 ) { + // Empty session probably indicates that we didn't associate + // with the session correctly. Note that being able to load + // the user does not necessarily mean the session was loaded. + // Most likely cause by suhosin.session.encrypt = On. + $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" ); + return false; + } + + UploadBase::setSessionStatus( $this->params['filekey'], array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ) ); $upload = new UploadFromStash( $user ); - // @TODO: initialize() causes a GET, ideally we could frontload the antivirus + // @todo initialize() causes a GET, ideally we could frontload the antivirus // checks and anything else to the stash stage (which includes concatenation and // the local file is thus already there). That way, instead of GET+PUT, there could // just be a COPY operation from the stash to the public zone. @@ -94,11 +104,11 @@ class PublishStashedFileJob extends Job { UploadBase::setSessionStatus( $this->params['filekey'], array( - 'result' => 'Success', - 'stage' => 'publish', - 'filename' => $upload->getLocalFile()->getName(), + 'result' => 'Success', + 'stage' => 'publish', + 'filename' => $upload->getLocalFile()->getName(), 'imageinfo' => $imageInfo, - 'status' => Status::newGood() + 'status' => Status::newGood() ) ); } catch ( MWException $e ) { @@ -106,7 +116,7 @@ class PublishStashedFileJob extends Job { $this->params['filekey'], array( 'result' => 'Failure', - 'stage' => 'publish', + 'stage' => 'publish', 'status' => Status::newFatal( 'api-error-publishfailed' ) ) ); diff --git a/includes/job/jobs/RefreshLinksJob.php b/includes/job/jobs/RefreshLinksJob.php index 9dbe8278..4fc8bac6 100644 --- a/includes/job/jobs/RefreshLinksJob.php +++ b/includes/job/jobs/RefreshLinksJob.php @@ -37,21 +37,18 @@ class RefreshLinksJob extends Job { * @return boolean success */ function run() { - wfProfileIn( __METHOD__ ); - $linkCache = LinkCache::singleton(); $linkCache->clear(); if ( is_null( $this->title ) ) { $this->error = "refreshLinks: Invalid title"; - wfProfileOut( __METHOD__ ); return false; } # Wait for the DB of the current/next slave DB handle to catch up to the master. # This way, we get the correct page_latest for templates or files that just changed # milliseconds ago, having triggered this job to begin with. - if ( isset( $this->params['masterPos'] ) ) { + if ( isset( $this->params['masterPos'] ) && $this->params['masterPos'] !== false ) { wfGetLB()->waitFor( $this->params['masterPos'] ); } @@ -59,13 +56,11 @@ class RefreshLinksJob extends Job { if ( !$revision ) { $this->error = 'refreshLinks: Article not found "' . $this->title->getPrefixedDBkey() . '"'; - wfProfileOut( __METHOD__ ); return false; // XXX: what if it was just deleted? } self::runForTitleInternal( $this->title, $revision, __METHOD__ ); - wfProfileOut( __METHOD__ ); return true; } @@ -101,6 +96,9 @@ class RefreshLinksJob extends Job { $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput ); DataUpdate::runUpdates( $updates ); + + InfoAction::invalidateCache( $title ); + wfProfileOut( $fname ); } } @@ -114,6 +112,8 @@ class RefreshLinksJob extends Job { class RefreshLinksJob2 extends Job { function __construct( $title, $params, $id = 0 ) { parent::__construct( 'refreshLinks2', $title, $params, $id ); + // Base jobs for large templates can easily be de-duplicated + $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] ); } /** @@ -123,14 +123,11 @@ class RefreshLinksJob2 extends Job { function run() { global $wgUpdateRowsPerJob; - wfProfileIn( __METHOD__ ); - $linkCache = LinkCache::singleton(); $linkCache->clear(); if ( is_null( $this->title ) ) { $this->error = "refreshLinks2: Invalid title"; - wfProfileOut( __METHOD__ ); return false; } @@ -144,7 +141,7 @@ class RefreshLinksJob2 extends Job { // Hopefully, when leaf jobs are popped, the slaves will have reached that position. if ( isset( $this->params['masterPos'] ) ) { $masterPos = $this->params['masterPos']; - } elseif ( wfGetLB()->getServerCount() > 1 ) { + } elseif ( wfGetLB()->getServerCount() > 1 ) { $masterPos = wfGetLB()->getMasterPos(); } else { $masterPos = false; @@ -158,7 +155,7 @@ class RefreshLinksJob2 extends Job { $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); } else { # This is a base job to trigger the insertion of partitioned jobs... - if ( $tbc->getNumLinks( $table ) <= $wgUpdateRowsPerJob ) { + if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) { # Just directly insert the single per-title jobs $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); } else { @@ -167,10 +164,10 @@ class RefreshLinksJob2 extends Job { list( $start, $end ) = $batch; $jobs[] = new RefreshLinksJob2( $this->title, array( - 'table' => $table, - 'start' => $start, - 'end' => $end, - 'masterPos' => $masterPos, + 'table' => $table, + 'start' => $start, + 'end' => $end, + 'masterPos' => $masterPos, ) + $this->getRootJobParams() // carry over information for de-duplication ); } @@ -181,7 +178,6 @@ class RefreshLinksJob2 extends Job { JobQueueGroup::singleton()->push( $jobs ); } - wfProfileOut( __METHOD__ ); return true; } diff --git a/includes/job/jobs/UploadFromUrlJob.php b/includes/job/jobs/UploadFromUrlJob.php index 87549140..c993cfb4 100644 --- a/includes/job/jobs/UploadFromUrlJob.php +++ b/includes/job/jobs/UploadFromUrlJob.php @@ -48,6 +48,7 @@ class UploadFromUrlJob extends Job { } public function run() { + global $wgCopyUploadAsyncTimeout; # Initialize this object and the upload object $this->upload = new UploadFromUrl(); $this->upload->initialize( @@ -58,7 +59,11 @@ class UploadFromUrlJob extends Job { $this->user = User::newFromName( $this->params['userName'] ); # Fetch the file - $status = $this->upload->fetchFile(); + $opts = array(); + if ( $wgCopyUploadAsyncTimeout ) { + $opts['timeout'] = $wgCopyUploadAsyncTimeout; + } + $status = $this->upload->fetchFile( $opts ); if ( !$status->isOk() ) { $this->leaveMessage( $status ); return true; |