diff options
Diffstat (limited to 'includes/jobqueue/JobQueueDB.php')
-rw-r--r-- | includes/jobqueue/JobQueueDB.php | 98 |
1 files changed, 44 insertions, 54 deletions
diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index d5f47ffd..d1e4a135 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -29,7 +29,6 @@ */ class JobQueueDB extends JobQueue { const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating - const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random const MAX_OFFSET = 255; // integer; maximum number of rows to skip @@ -71,15 +70,6 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doIsEmpty() { - $key = $this->getCacheKey( 'empty' ); - - $isEmpty = $this->cache->get( $key ); - if ( $isEmpty === 'true' ) { - return true; - } elseif ( $isEmpty === 'false' ) { - return false; - } - $dbr = $this->getSlaveDB(); try { $found = $dbr->selectField( // unclaimed job @@ -88,7 +78,6 @@ class JobQueueDB extends JobQueue { } catch ( DBError $e ) { $this->throwDBException( $e ); } - $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); return !$found; } @@ -256,12 +245,9 @@ class JobQueueDB extends JobQueue { foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { $dbw->insert( 'job', $rowBatch, $method ); } - JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki ); - JobQueue::incrStats( - 'job-insert-duplicate', - $this->type, - count( $rowSet ) + count( $rowList ) - count( $rows ), - $this->wiki + JobQueue::incrStats( 'inserts', $this->type, count( $rows ) ); + JobQueue::incrStats( 'dupe_inserts', $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) ); } catch ( DBError $e ) { if ( $flags & self::QOS_ATOMIC ) { @@ -273,8 +259,6 @@ class JobQueueDB extends JobQueue { $dbw->commit( $method ); } - $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); - return; } @@ -283,10 +267,6 @@ class JobQueueDB extends JobQueue { * @return Job|bool */ protected function doPop() { - if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { - return false; // queue is empty - } - $dbw = $this->getMasterDB(); try { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction @@ -309,22 +289,23 @@ class JobQueueDB extends JobQueue { } // Check if we found a row to reserve... if ( !$row ) { - $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); break; // nothing to do } - JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); + JobQueue::incrStats( 'pops', $this->type ); // Get the job object from the row... - $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); - if ( !$title ) { - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - wfDebug( "Row has invalid title '{$row->job_title}'.\n" ); - continue; // try again - } + $title = Title::makeTitle( $row->job_namespace, $row->job_title ); $job = Job::factory( $row->job_cmd, $title, self::extractBlob( $row->job_params ), $row->job_id ); $job->metadata['id'] = $row->job_id; + $job->metadata['timestamp'] = $row->job_timestamp; break; // done } while ( true ); + + if ( !$job || mt_rand( 0, 9 ) == 0 ) { + // Handled jobs that need to be recycled/deleted; + // any recycled jobs will be picked up next attempt + $this->recycleAndDeleteStaleJobs(); + } } catch ( DBError $e ) { $this->throwDBException( $e ); } @@ -495,6 +476,8 @@ class JobQueueDB extends JobQueue { // Delete a row with a single DELETE without holding row locks over RTTs... $dbw->delete( 'job', array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + + JobQueue::incrStats( 'acks', $this->type ); } catch ( DBError $e ) { $this->throwDBException( $e ); } @@ -504,11 +487,11 @@ class JobQueueDB extends JobQueue { /** * @see JobQueue::doDeduplicateRootJob() - * @param Job $job + * @param IJobSpecification $job * @throws MWException * @return bool */ - protected function doDeduplicateRootJob( Job $job ) { + protected function doDeduplicateRootJob( IJobSpecification $job ) { $params = $job->getParams(); if ( !isset( $params['rootJobSignature'] ) ) { throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); @@ -560,22 +543,10 @@ class JobQueueDB extends JobQueue { } /** - * @return array - */ - protected function doGetPeriodicTasks() { - return array( - 'recycleAndDeleteStaleJobs' => array( - 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) - ) - ); - } - - /** * @return void */ protected function doFlushCaches() { - foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { + foreach ( array( 'size', 'acquiredcount' ) as $type ) { $this->cache->delete( $this->getCacheKey( $type ) ); } } @@ -585,18 +556,35 @@ class JobQueueDB extends JobQueue { * @return Iterator */ public function getAllQueuedJobs() { + return $this->getJobIterator( array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ); + } + + /** + * @see JobQueue::getAllAcquiredJobs() + * @return Iterator + */ + public function getAllAcquiredJobs() { + return $this->getJobIterator( array( 'job_cmd' => $this->getType(), "job_token > ''" ) ); + } + + /** + * @param array $conds Query conditions + * @return Iterator + */ + protected function getJobIterator( array $conds ) { $dbr = $this->getSlaveDB(); try { return new MappedIterator( - $dbr->select( 'job', self::selectFields(), - array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function ( $row ) use ( $dbr ) { + $dbr->select( 'job', self::selectFields(), $conds ), + function ( $row ) { $job = Job::factory( $row->job_cmd, Title::makeTitle( $row->job_namespace, $row->job_title ), - strlen( $row->job_params ) ? unserialize( $row->job_params ) : false + strlen( $row->job_params ) ? unserialize( $row->job_params ) : array() ); $job->metadata['id'] = $row->job_id; + $job->metadata['timestamp'] = $row->job_timestamp; + return $job; } ); @@ -613,6 +601,10 @@ class JobQueueDB extends JobQueue { protected function doGetSiblingQueuesWithJobs( array $types ) { $dbr = $this->getSlaveDB(); + // @note: this does not check whether the jobs are claimed or not. + // This is useful so JobQueueGroup::pop() also sees queues that only + // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue + // failed jobs so that they can be popped again for that edge case. $res = $dbr->select( 'job', 'DISTINCT job_cmd', array( 'job_cmd' => $types ), __METHOD__ ); @@ -685,9 +677,7 @@ class JobQueueDB extends JobQueue { ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki ); - // The tasks recycled jobs or release delayed jobs into the queue - $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + JobQueue::incrStats( 'recycles', $this->type, $affected ); $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); } } @@ -714,7 +704,7 @@ class JobQueueDB extends JobQueue { $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki ); + JobQueue::incrStats( 'abandons', $this->type, $affected ); } $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); |