diff options
Diffstat (limited to 'includes/jobqueue/JobQueue.php')
-rw-r--r-- | includes/jobqueue/JobQueue.php | 83 |
1 files changed, 33 insertions, 50 deletions
diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index c00d22e9..91fe86cf 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -44,11 +44,10 @@ abstract class JobQueue { /** @var int Maximum number of times to try a job */ protected $maxTries; - /** @var bool Allow delayed jobs */ - protected $checkDelay; - /** @var BagOStuff */ protected $dupCache; + /** @var JobQueueAggregator */ + protected $aggr; const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions @@ -71,11 +70,10 @@ abstract class JobQueue { if ( !in_array( $this->order, $this->supportedOrders() ) ) { throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); } - $this->checkDelay = !empty( $params['checkDelay'] ); - if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { - throw new MWException( __CLASS__ . " does not support delayed jobs." ); - } $this->dupCache = wfGetCache( CACHE_ANYTHING ); + $this->aggr = isset( $params['aggregator'] ) + ? $params['aggregator'] + : new JobQueueAggregatorNull( array() ); } /** @@ -98,10 +96,6 @@ abstract class JobQueue { * but not acknowledged as completed after this many seconds. Recycling * of jobs simple means re-inserting them into the queue. Jobs can be * attempted up to three times before being discarded. - * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. - * This lets delayed jobs wait in a staging area until a given timestamp is - * reached, at which point they will enter the queue. If this is not enabled - * or not supported, an exception will be thrown on delayed job insertion. * * Queue classes should throw an exception if they do not support the options given. * @@ -144,14 +138,6 @@ abstract class JobQueue { } /** - * @return bool Whether delayed jobs are enabled - * @since 1.22 - */ - final public function delayedJobsEnabled() { - return $this->checkDelay; - } - - /** * Get the allowed queue orders for configuration validation * * @return array Subset of (random, timestamp, fifo, undefined) @@ -175,6 +161,14 @@ abstract class JobQueue { } /** + * @return bool Whether delayed jobs are enabled + * @since 1.22 + */ + final public function delayedJobsEnabled() { + return $this->supportsDelayedJobs(); + } + + /** * Quickly check if the queue has no available (unacquired, non-delayed) jobs. * Queue classes should use caching if they are any slower without memcached. * @@ -187,9 +181,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function isEmpty() { - wfProfileIn( __METHOD__ ); $res = $this->doIsEmpty(); - wfProfileOut( __METHOD__ ); return $res; } @@ -210,9 +202,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function getSize() { - wfProfileIn( __METHOD__ ); $res = $this->doGetSize(); - wfProfileOut( __METHOD__ ); return $res; } @@ -233,9 +223,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function getAcquiredCount() { - wfProfileIn( __METHOD__ ); $res = $this->doGetAcquiredCount(); - wfProfileOut( __METHOD__ ); return $res; } @@ -257,9 +245,7 @@ abstract class JobQueue { * @since 1.22 */ final public function getDelayedCount() { - wfProfileIn( __METHOD__ ); $res = $this->doGetDelayedCount(); - wfProfileOut( __METHOD__ ); return $res; } @@ -282,9 +268,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function getAbandonedCount() { - wfProfileIn( __METHOD__ ); $res = $this->doGetAbandonedCount(); - wfProfileOut( __METHOD__ ); return $res; } @@ -308,7 +292,8 @@ abstract class JobQueue { * @throws JobQueueError */ final public function push( $jobs, $flags = 0 ) { - $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + $this->batchPush( $jobs, $flags ); } /** @@ -330,15 +315,14 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); - } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) { throw new MWException( "Got delayed '{$job->getType()}' job; delays are not supported." ); } } - wfProfileIn( __METHOD__ ); $this->doBatchPush( $jobs, $flags ); - wfProfileOut( __METHOD__ ); + $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); } /** @@ -366,9 +350,11 @@ abstract class JobQueue { throw new MWException( "Unrecognized job type '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $job = $this->doPop(); - wfProfileOut( __METHOD__ ); + + if ( !$job ) { + $this->aggr->notifyQueueEmpty( $this->wiki, $this->type ); + } // Flag this job as an old duplicate based on its "root" job... try { @@ -376,7 +362,7 @@ abstract class JobQueue { JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki ); $job = DuplicateJob::newFromJob( $job ); // convert to a no-op } - } catch ( MWException $e ) { + } catch ( Exception $e ) { // don't lose jobs over this } @@ -403,9 +389,7 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $this->doAck( $job ); - wfProfileOut( __METHOD__ ); } /** @@ -449,9 +433,7 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $ok = $this->doDeduplicateRootJob( $job ); - wfProfileOut( __METHOD__ ); return $ok; } @@ -494,9 +476,7 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - wfProfileIn( __METHOD__ ); $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); - wfProfileOut( __METHOD__ ); return $isDuplicate; } @@ -538,9 +518,7 @@ abstract class JobQueue { * @return void */ final public function delete() { - wfProfileIn( __METHOD__ ); $this->doDelete(); - wfProfileOut( __METHOD__ ); } /** @@ -560,9 +538,7 @@ abstract class JobQueue { * @throws JobQueueError */ final public function waitForBackups() { - wfProfileIn( __METHOD__ ); $this->doWaitForBackups(); - wfProfileOut( __METHOD__ ); } /** @@ -607,9 +583,7 @@ abstract class JobQueue { * @return void */ final public function flushCaches() { - wfProfileIn( __METHOD__ ); $this->doFlushCaches(); - wfProfileOut( __METHOD__ ); } /** @@ -642,6 +616,17 @@ abstract class JobQueue { } /** + * Get an iterator to traverse over all abandoned jobs in this queue + * + * @return Iterator + * @throws JobQueueError + * @since 1.25 + */ + public function getAllAbandonedJobs() { + return new ArrayIterator( array() ); // not implemented + } + + /** * Do not use this function outside of JobQueue/JobQueueGroup * * @return string @@ -661,7 +646,6 @@ abstract class JobQueue { * @since 1.22 */ final public function getSiblingQueuesWithJobs( array $types ) { - $section = new ProfileSection( __METHOD__ ); return $this->doGetSiblingQueuesWithJobs( $types ); } @@ -686,7 +670,6 @@ abstract class JobQueue { * @since 1.22 */ final public function getSiblingQueueSizes( array $types ) { - $section = new ProfileSection( __METHOD__ ); return $this->doGetSiblingQueueSizes( $types ); } |