diff options
Diffstat (limited to 'includes/job/JobQueue.php')
-rw-r--r-- | includes/job/JobQueue.php | 355 |
1 files changed, 313 insertions, 42 deletions
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index b0dd9258..6556ee85 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -34,8 +34,14 @@ abstract class JobQueue { protected $order; // string; job priority for pop() protected $claimTTL; // integer; seconds protected $maxTries; // integer; maximum number of times to try a job + protected $checkDelay; // boolean; allow delayed jobs - const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions + /** @var BagOStuff */ + protected $dupCache; + + const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions + + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) /** * @param $params array @@ -53,28 +59,37 @@ abstract class JobQueue { if ( !in_array( $this->order, $this->supportedOrders() ) ) { throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); } + $this->checkDelay = !empty( $params['checkDelay'] ); + if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { + throw new MWException( __CLASS__ . " does not support delayed jobs." ); + } + $this->dupCache = wfGetCache( CACHE_ANYTHING ); } /** * Get a job queue object of the specified type. * $params includes: - * - class : What job class to use (determines job type) - * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) - * - type : The name of the job types this queue handles - * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". - * If "fifo" is used, the queue will effectively be FIFO. Note that - * job completion will not appear to be exactly FIFO if there are multiple - * job runners since jobs can take different times to finish once popped. - * If "timestamp" is used, the queue will at least be loosely ordered - * by timestamp, allowing for some jobs to be popped off out of order. - * If "random" is used, pop() will pick jobs in random order. - * Note that it may only be weakly random (e.g. a lottery of the oldest X). - * If "any" is choosen, the queue will use whatever order is the fastest. - * This might be useful for improving concurrency for job acquisition. - * - claimTTL : If supported, the queue will recycle jobs that have been popped - * but not acknowledged as completed after this many seconds. Recycling - * of jobs simple means re-inserting them into the queue. Jobs can be - * attempted up to three times before being discarded. + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. + * This lets delayed jobs wait in a staging area until a given timestamp is + * reached, at which point they will enter the queue. If this is not enabled + * or not supported, an exception will be thrown on delayed job insertion. * * Queue classes should throw an exception if they do not support the options given. * @@ -84,7 +99,7 @@ abstract class JobQueue { */ final public static function factory( array $params ) { $class = $params['class']; - if ( !MWInit::classExists( $class ) ) { + if ( !class_exists( $class ) ) { throw new MWException( "Invalid job queue class '$class'." ); } $obj = new $class( $params ); @@ -109,24 +124,45 @@ abstract class JobQueue { } /** - * @return string One of (random, timestamp, fifo) + * @return string One of (random, timestamp, fifo, undefined) */ final public function getOrder() { return $this->order; } /** - * @return Array Subset of (random, timestamp, fifo) + * @return bool Whether delayed jobs are enabled + * @since 1.22 + */ + final public function delayedJobsEnabled() { + return $this->checkDelay; + } + + /** + * Get the allowed queue orders for configuration validation + * + * @return Array Subset of (random, timestamp, fifo, undefined) */ abstract protected function supportedOrders(); /** - * @return string One of (random, timestamp, fifo) + * Get the default queue order to use if configuration does not specify one + * + * @return string One of (random, timestamp, fifo, undefined) */ abstract protected function optimalOrder(); /** - * Quickly check if the queue is empty (has no available jobs). + * Find out if delayed jobs are supported for configuration validation + * + * @return boolean Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this might return false when there are actually no jobs. @@ -135,7 +171,7 @@ abstract class JobQueue { * not distinguishable from the race condition between isEmpty() and pop(). * * @return bool - * @throws MWException + * @throws JobQueueError */ final public function isEmpty() { wfProfileIn( __METHOD__ ); @@ -151,13 +187,13 @@ abstract class JobQueue { abstract protected function doIsEmpty(); /** - * Get the number of available (unacquired) jobs in the queue. + * Get the number of available (unacquired, non-delayed) jobs in the queue. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError */ final public function getSize() { wfProfileIn( __METHOD__ ); @@ -179,7 +215,7 @@ abstract class JobQueue { * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError */ final public function getAcquiredCount() { wfProfileIn( __METHOD__ ); @@ -195,14 +231,63 @@ abstract class JobQueue { abstract protected function doGetAcquiredCount(); /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + * @since 1.22 + */ + final public function getDelayedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetDelayedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return integer + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + + /** + * Get the number of acquired jobs that can no longer be attempted. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + */ + final public function getAbandonedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAbandonedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getAbandonedCount() + * @return integer + */ + protected function doGetAbandonedCount() { + return 0; // not implemented + } + + /** * Push a single jobs into the queue. * This does not require $wgJobClasses to be set for the given job type. * Outside callers should use JobQueueGroup::push() instead of this function. * * @param $jobs Job|Array - * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure - * @throws MWException + * @throws JobQueueError */ final public function push( $jobs, $flags = 0 ) { return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); @@ -214,9 +299,9 @@ abstract class JobQueue { * Outside callers should use JobQueueGroup::push() instead of this function. * * @param array $jobs List of Jobs - * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure - * @throws MWException + * @throws JobQueueError */ final public function batchPush( array $jobs, $flags = 0 ) { if ( !count( $jobs ) ) { @@ -225,7 +310,11 @@ abstract class JobQueue { foreach ( $jobs as $job ) { if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); } } @@ -247,7 +336,7 @@ abstract class JobQueue { * Outside callers should use JobQueueGroup::pop() instead of this function. * * @return Job|bool Returns false if there are no jobs - * @throws MWException + * @throws JobQueueError */ final public function pop() { global $wgJobClasses; @@ -262,6 +351,15 @@ abstract class JobQueue { wfProfileIn( __METHOD__ ); $job = $this->doPop(); wfProfileOut( __METHOD__ ); + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + JobQueue::incrStats( 'job-pop-duplicate', $this->type ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) {} // don't lose jobs over this + return $job; } @@ -279,7 +377,7 @@ abstract class JobQueue { * * @param $job Job * @return bool - * @throws MWException + * @throws JobQueueError */ final public function ack( Job $job ) { if ( $job->getType() !== $this->type ) { @@ -326,7 +424,7 @@ abstract class JobQueue { * * @param $job Job * @return bool - * @throws MWException + * @throws JobQueueError */ final public function deduplicateRootJob( Job $job ) { if ( $job->getType() !== $this->type ) { @@ -344,7 +442,91 @@ abstract class JobQueue { * @return bool */ protected function doDeduplicateRootJob( Job $job ) { - return true; + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + * @throws JobQueueError + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + wfProfileOut( __METHOD__ ); + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Get the last time this root job was enqueued + $timestamp = $this->dupCache->get( $key ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } + + /** + * Deleted all unclaimed and delayed jobs from the queue + * + * @return bool Success + * @throws JobQueueError + * @since 1.22 + */ + final public function delete() { + wfProfileIn( __METHOD__ ); + $res = $this->doDelete(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::delete() + * @return bool Success + */ + protected function doDelete() { + throw new MWException( "This method is not implemented." ); } /** @@ -353,7 +535,7 @@ abstract class JobQueue { * This does nothing for certain queue classes. * * @return void - * @throws MWException + * @throws JobQueueError */ final public function waitForBackups() { wfProfileIn( __METHOD__ ); @@ -413,16 +595,98 @@ abstract class JobQueue { protected function doFlushCaches() {} /** - * Get an iterator to traverse over all of the jobs in this queue. - * This does not include jobs that are current acquired. In general, - * this should only be called on a queue that is no longer being popped. + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * Note: results may be stale if the queue is concurrently modified. * - * @return Iterator|Traversable|Array - * @throws MWException + * @return Iterator + * @throws JobQueueError */ abstract public function getAllQueuedJobs(); /** + * Get an iterator to traverse over all delayed jobs in this queue. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + * @since 1.22 + */ + public function getAllDelayedJobs() { + return new ArrayIterator( array() ); // not implemented + } + + /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return null; + } + + /** + * Check whether each of the given queues are empty. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (list of non-empty queue types) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueuesWithJobs( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueuesWithJobs( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesWithJobs() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesWithJobs( array $types ) { + return null; // not supported + } + + /** + * Check the size of each of the given queues. + * For queues not served by the same store as this one, 0 is returned. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (job type => whether queue is empty) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueueSizes( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueueSizes( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return null; // not supported + } + + /** + * Call wfIncrStats() for the queue overall and for the queue type + * + * @param string $key Event type + * @param string $type Job type + * @param integer $delta + * @since 1.22 + */ + public static function incrStats( $key, $type, $delta = 1 ) { + wfIncrStats( $key, $delta ); + wfIncrStats( "{$key}-{$type}", $delta ); + } + + /** * Namespace the queue with a key to isolate it for testing * * @param $key string @@ -433,3 +697,10 @@ abstract class JobQueue { throw new MWException( "Queue namespacing not supported for this queue type." ); } } + +/** + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueError extends MWException {} +class JobQueueConnectionError extends JobQueueError {} |