diff options
author | Pierre Schmitz <pierre@archlinux.de> | 2013-12-08 09:55:49 +0100 |
---|---|---|
committer | Pierre Schmitz <pierre@archlinux.de> | 2013-12-08 09:55:49 +0100 |
commit | 4ac9fa081a7c045f6a9f1cfc529d82423f485b2e (patch) | |
tree | af68743f2f4a47d13f2b0eb05f5c4aaf86d8ea37 /includes/job/JobQueueDB.php | |
parent | af4da56f1ad4d3ef7b06557bae365da2ea27a897 (diff) |
Update to MediaWiki 1.22.0
Diffstat (limited to 'includes/job/JobQueueDB.php')
-rw-r--r-- | includes/job/JobQueueDB.php | 688 |
1 files changed, 394 insertions, 294 deletions
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index ff7f7abc..c39083df 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -28,13 +28,15 @@ * @since 1.21 */ class JobQueueDB extends JobQueue { - const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days) const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random const MAX_OFFSET = 255; // integer; maximum number of rows to skip + /** @var BagOStuff */ + protected $cache; + protected $cluster = false; // string; name of an external DB cluster /** @@ -46,8 +48,13 @@ class JobQueueDB extends JobQueue { * @param $params array */ protected function __construct( array $params ) { + global $wgMemc; + parent::__construct( $params ); + $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; + // Make sure that we don't use the SQL cache, which would be harmful + $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc; } protected function supportedOrders() { @@ -63,22 +70,24 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doIsEmpty() { - global $wgMemc; - $key = $this->getCacheKey( 'empty' ); - $isEmpty = $wgMemc->get( $key ); + $isEmpty = $this->cache->get( $key ); if ( $isEmpty === 'true' ) { return true; } elseif ( $isEmpty === 'false' ) { return false; } - list( $dbr, $scope ) = $this->getSlaveDB(); - $found = $dbr->selectField( // unclaimed job - 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ - ); - $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); + $dbr = $this->getSlaveDB(); + try { + $found = $dbr->selectField( // unclaimed job + 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); return !$found; } @@ -88,21 +97,23 @@ class JobQueueDB extends JobQueue { * @return integer */ protected function doGetSize() { - global $wgMemc; - $key = $this->getCacheKey( 'size' ); - $size = $wgMemc->get( $key ); + $size = $this->cache->get( $key ); if ( is_int( $size ) ) { return $size; } - list( $dbr, $scope ) = $this->getSlaveDB(); - $size = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, 'job_token' => '' ), - __METHOD__ - ); - $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT ); + try { + $dbr = $this->getSlaveDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); return $size; } @@ -112,24 +123,63 @@ class JobQueueDB extends JobQueue { * @return integer */ protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'acquiredcount' ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return integer + * @throws MWException + */ + protected function doGetAbandonedCount() { global $wgMemc; if ( $this->claimTTL <= 0 ) { return 0; // no acknowledgements } - $key = $this->getCacheKey( 'acquiredcount' ); + $key = $this->getCacheKey( 'abandonedcount' ); $count = $wgMemc->get( $key ); if ( is_int( $count ) ) { return $count; } - list( $dbr, $scope ) = $this->getSlaveDB(); - $count = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), - __METHOD__ - ); + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( + 'job_cmd' => $this->type, + "job_token != {$dbr->addQuotes( '' )}", + "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) + ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; @@ -143,70 +193,84 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doBatchPush( array $jobs, $flags ) { - if ( count( $jobs ) ) { - list( $dbw, $scope ) = $this->getMasterDB(); - - $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated - $rowList = array(); // list of jobs for jobs that are are not de-duplicated - - foreach ( $jobs as $job ) { - $row = $this->insertFields( $job ); - if ( $job->ignoreDuplicates() ) { - $rowSet[$row['job_sha1']] = $row; - } else { - $rowList[] = $row; - } + $dbw = $this->getMasterDB(); + + $that = $this; + $method = __METHOD__; + $dbw->onTransactionIdle( + function() use ( $dbw, $that, $jobs, $flags, $method ) { + $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); } + ); - $key = $this->getCacheKey( 'empty' ); - $atomic = ( $flags & self::QoS_Atomic ); + return true; + } - $dbw->onTransactionIdle( - function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $scope - ) { - global $wgMemc; + /** + * This function should *not* be called outside of JobQueueDB + * + * @param DatabaseBase $dbw + * @param array $jobs + * @param int $flags + * @param string $method + * @return boolean + * @throws type + */ + public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { + if ( !count( $jobs ) ) { + return true; + } - if ( $atomic ) { - $dbw->begin( __METHOD__ ); // wrap all the job additions in one transaction - } - try { - // Strip out any duplicate jobs that are already in the queue... - if ( count( $rowSet ) ) { - $res = $dbw->select( 'job', 'job_sha1', - array( - // No job_type condition since it's part of the job_sha1 hash - 'job_sha1' => array_keys( $rowSet ), - 'job_token' => '' // unclaimed - ), - __METHOD__ - ); - foreach ( $res as $row ) { - wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); - unset( $rowSet[$row->job_sha1] ); // already enqueued - } - } - // Build the full list of job rows to insert - $rows = array_merge( $rowList, array_values( $rowSet ) ); - // Insert the job rows in chunks to avoid slave lag... - foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { - $dbw->insert( 'job', $rowBatch, __METHOD__ ); - } - wfIncrStats( 'job-insert', count( $rows ) ); - wfIncrStats( 'job-insert-duplicate', - count( $rowSet ) + count( $rowList ) - count( $rows ) ); - } catch ( DBError $e ) { - if ( $atomic ) { - $dbw->rollback( __METHOD__ ); - } - throw $e; - } - if ( $atomic ) { - $dbw->commit( __METHOD__ ); - } + $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated + $rowList = array(); // list of jobs for jobs that are are not de-duplicated + foreach ( $jobs as $job ) { + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } + } - $wgMemc->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); - } ); + if ( $flags & self::QOS_ATOMIC ) { + $dbw->begin( $method ); // wrap all the job additions in one transaction + } + try { + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + array( + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ), + $method + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid slave lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { + $dbw->insert( 'job', $rowBatch, $method ); + } + JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) ); + } catch ( DBError $e ) { + if ( $flags & self::QOS_ATOMIC ) { + $dbw->rollback( $method ); + } + throw $e; } + if ( $flags & self::QOS_ATOMIC ) { + $dbw->commit( $method ); + } + + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); return true; } @@ -216,54 +280,52 @@ class JobQueueDB extends JobQueue { * @return Job|bool */ protected function doPop() { - global $wgMemc; - - if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { + if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { return false; // queue is empty } - list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); - $uuid = wfRandomString( 32 ); // pop attempt - $job = false; // job popped off - do { // retry when our row is invalid or deleted as a duplicate - // Try to reserve a row in the DB... - if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { - $row = $this->claimOldest( $uuid ); - } else { // random first - $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs - $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand - $row = $this->claimRandom( $uuid, $rand, $gte ); - } - // Check if we found a row to reserve... - if ( !$row ) { - $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); - break; // nothing to do - } - wfIncrStats( 'job-pop' ); - // Get the job object from the row... - $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); - if ( !$title ) { - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." ); - continue; // try again - } - $job = Job::factory( $row->job_cmd, $title, - self::extractBlob( $row->job_params ), $row->job_id ); - $job->id = $row->job_id; // XXX: work around broken subclasses - // Flag this job as an old duplicate based on its "root" job... - if ( $this->isRootJobOldDuplicate( $job ) ) { - wfIncrStats( 'job-pop-duplicate' ); - $job = DuplicateJob::newFromJob( $job ); // convert to a no-op - } - break; // done - } while( true ); + $uuid = wfRandomString( 32 ); // pop attempt + $job = false; // job popped off + do { // retry when our row is invalid or deleted as a duplicate + // Try to reserve a row in the DB... + if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { + $row = $this->claimOldest( $uuid ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $row = $this->claimRandom( $uuid, $rand, $gte ); + } + // Check if we found a row to reserve... + if ( !$row ) { + $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); + break; // nothing to do + } + JobQueue::incrStats( 'job-pop', $this->type ); + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebug( "Row has invalid title '{$row->job_title}'." ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + break; // done + } while ( true ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } return $job; } @@ -277,11 +339,9 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimRandom( $uuid, $rand, $gte ) { - global $wgMemc; - - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); // Check cache to see if the queue has <= OFFSET items - $tinyQueue = $wgMemc->get( $this->getCacheKey( 'small' ) ); + $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); $row = false; // the row acquired $invertedDirection = false; // whether one job_random direction was already scanned @@ -297,7 +357,7 @@ class JobQueueDB extends JobQueue { $dir = $gte ? 'ASC' : 'DESC'; $row = $dbw->selectRow( 'job', '*', // find a random job array( - 'job_cmd' => $this->type, + 'job_cmd' => $this->type, 'job_token' => '', // unclaimed "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), __METHOD__, @@ -314,7 +374,7 @@ class JobQueueDB extends JobQueue { // instead of job_random for reducing excess claim retries. $row = $dbw->selectRow( 'job', '*', // find a random job array( - 'job_cmd' => $this->type, + 'job_cmd' => $this->type, 'job_token' => '', // unclaimed ), __METHOD__, @@ -322,14 +382,14 @@ class JobQueueDB extends JobQueue { ); if ( !$row ) { $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows - $wgMemc->set( $this->getCacheKey( 'small' ), 1, 30 ); + $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); continue; // use job_random } } if ( $row ) { // claim the job $dbw->update( 'job', // update by PK array( - 'job_token' => $uuid, + 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp(), 'job_attempts = job_attempts+1' ), array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), @@ -355,7 +415,7 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimOldest( $uuid ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); $row = false; // the row acquired do { @@ -380,7 +440,7 @@ class JobQueueDB extends JobQueue { // This uses as much of the DB wrapper functions as possible. $dbw->update( 'job', array( - 'job_token' => $uuid, + 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp(), 'job_attempts = job_attempts+1' ), array( 'job_id = (' . @@ -399,7 +459,7 @@ class JobQueueDB extends JobQueue { array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ ); if ( !$row ) { // raced out by duplicate job removal - wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." ); + wfDebug( "Row deleted as duplicate by another process." ); } } else { break; // nothing to do @@ -410,100 +470,31 @@ class JobQueueDB extends JobQueue { } /** - * Recycle or destroy any jobs that have been claimed for too long - * - * @return integer Number of jobs recycled/deleted - */ - public function recycleAndDeleteStaleJobs() { - global $wgMemc; - - $now = time(); - list( $dbw, $scope ) = $this->getMasterDB(); - $count = 0; // affected rows - - if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { - return $count; // already in progress - } - - // Remove claims on jobs acquired for too long if enabled... - if ( $this->claimTTL > 0 ) { - $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); - // Get the IDs of jobs that have be claimed but not finished after too long. - // These jobs can be recycled into the queue by expiring the claim. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', - array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale - "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left - __METHOD__ - ); - $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); - if ( count( $ids ) ) { - // Reset job_token for these jobs so that other runners will pick them up. - // Set the timestamp to the current time, as it is useful to now that the job - // was already tried before (the timestamp becomes the "released" time). - $dbw->update( 'job', - array( - 'job_token' => '', - 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release - array( - 'job_id' => $ids ), - __METHOD__ - ); - $count += $dbw->affectedRows(); - wfIncrStats( 'job-recycle', $dbw->affectedRows() ); - $wgMemc->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); - } - } - - // Just destroy any stale jobs... - $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); - $conds = array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale - ); - if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... - $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; - } - // Get the IDs of jobs that are considered stale and should be removed. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); - $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); - if ( count( $ids ) ) { - $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); - $count += $dbw->affectedRows(); - } - - $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); - - return $count; - } - - /** * @see JobQueue::doAck() * @param Job $job * @throws MWException * @return Job|bool */ protected function doAck( Job $job ) { - if ( !$job->getId() ) { + if ( !isset( $job->metadata['id'] ) ) { throw new MWException( "Job of type '{$job->getType()}' has no ID." ); } - list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); - // Delete a row with a single DELETE without holding row locks over RTTs... - $dbw->delete( 'job', - array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ), __METHOD__ ); + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', + array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } return true; } @@ -527,44 +518,33 @@ class JobQueueDB extends JobQueue { // deferred till "transaction idle", do the same here, so that the ordering is // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. - list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->onTransactionIdle( function() use ( $params, $key, $scope ) { - global $wgMemc; - - $timestamp = $wgMemc->get( $key ); // current last timestamp of this job + $dbw = $this->getMasterDB(); + $cache = $this->dupCache; + $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) { + $timestamp = $cache->get( $key ); // current last timestamp of this job if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued } // Update the timestamp of the last root job started at the location... - return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); } ); return true; } /** - * Check if the "root" job of a given job has been superseded by a newer one - * - * @param $job Job + * @see JobQueue::doDelete() * @return bool */ - protected function isRootJobOldDuplicate( Job $job ) { - global $wgMemc; - - $params = $job->getParams(); - if ( !isset( $params['rootJobSignature'] ) ) { - return false; // job has no de-deplication info - } elseif ( !isset( $params['rootJobTimestamp'] ) ) { - trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." ); - return false; + protected function doDelete() { + $dbw = $this->getMasterDB(); + try { + $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); } - - // Get the last time this root job was enqueued - $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); - - // Check if a new root job was started at the location after this one's... - return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + return true; } /** @@ -582,7 +562,7 @@ class JobQueueDB extends JobQueue { return array( 'recycleAndDeleteStaleJobs' => array( 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) + 'period' => ceil( $this->claimTTL / 2 ) ) ); } @@ -591,10 +571,8 @@ class JobQueueDB extends JobQueue { * @return void */ protected function doFlushCaches() { - global $wgMemc; - foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { - $wgMemc->delete( $this->getCacheKey( $type ) ); + $this->cache->delete( $this->getCacheKey( $type ) ); } } @@ -603,51 +581,140 @@ class JobQueueDB extends JobQueue { * @return Iterator */ public function getAllQueuedJobs() { - list( $dbr, $scope ) = $this->getSlaveDB(); - return new MappedIterator( - $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function( $row ) use ( $scope ) { - $job = Job::factory( - $row->job_cmd, - Title::makeTitle( $row->job_namespace, $row->job_title ), - strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, - $row->job_id - ); - $job->id = $row->job_id; // XXX: work around broken subclasses - return $job; - } - ); + $dbr = $this->getSlaveDB(); + try { + return new MappedIterator( + $dbr->select( 'job', '*', + array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), + function( $row ) use ( $dbr ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, + $row->job_id + ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + return $job; + } + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } } - /** - * @return Array (DatabaseBase, ScopedCallback) - */ - protected function getSlaveDB() { - return $this->getDB( DB_SLAVE ); + public function getCoalesceLocationInternal() { + return $this->cluster + ? "DBCluster:{$this->cluster}:{$this->wiki}" + : "LBFactory:{$this->wiki}"; } - /** - * @return Array (DatabaseBase, ScopedCallback) - */ - protected function getMasterDB() { - return $this->getDB( DB_MASTER ); + protected function doGetSiblingQueuesWithJobs( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + array( 'job_cmd' => $types ), __METHOD__ ); + + $types = array(); + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), + array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); + + $sizes = array(); + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + return $sizes; } /** - * @param $index integer (DB_SLAVE/DB_MASTER) - * @return Array (DatabaseBase, ScopedCallback) + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted */ - protected function getDB( $index ) { - $lb = ( $this->cluster !== false ) - ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) - : wfGetLB( $this->wiki ); - $conn = $lb->getConnection( $index, array(), $this->wiki ); - return array( - $conn, - new ScopedCallback( function() use ( $lb, $conn ) { - $lb->reuseConnection( $conn ); - } ) - ); + public function recycleAndDeleteStaleJobs() { + $now = time(); + $count = 0; // affected rows + $dbw = $this->getMasterDB(); + + try { + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } + + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', + array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left + __METHOD__ + ); + $ids = array_map( + function( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_id' => $ids ), + __METHOD__ + ); + $count += $dbw->affectedRows(); + JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() ); + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + } + } + + // Just destroy any stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale + ); + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; + } + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); + $ids = array_map( + function( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); + $count += $dbw->affectedRows(); + JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() ); + } + + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $count; } /** @@ -655,7 +722,7 @@ class JobQueueDB extends JobQueue { * @return array */ protected function insertFields( Job $job ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); return array( // Fields that describe the nature of the job 'job_cmd' => $job->getType(), @@ -674,20 +741,45 @@ class JobQueueDB extends JobQueue { } /** - * @return string + * @return DBConnRef */ - private function getCacheKey( $property ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); + protected function getSlaveDB() { + try { + return $this->getDB( DB_SLAVE ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @return DBConnRef + */ + protected function getMasterDB() { + try { + return $this->getDB( DB_MASTER ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @param $index integer (DB_SLAVE/DB_MASTER) + * @return DBConnRef + */ + protected function getDB( $index ) { + $lb = ( $this->cluster !== false ) + ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) + : wfGetLB( $this->wiki ); + return $lb->getConnectionRef( $index, array(), $this->wiki ); } /** - * @param string $signature Hash identifier of the root job * @return string */ - private function getRootJobCacheKey( $signature ) { + private function getCacheKey( $property ) { list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); } /** @@ -713,4 +805,12 @@ class JobQueueDB extends JobQueue { return false; } } + + /** + * @param DBError $e + * @throws JobQueueError + */ + protected function throwDBException( DBError $e ) { + throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); + } } |