diff options
Diffstat (limited to 'includes/jobqueue/JobQueueFederated.php')
-rw-r--r-- | includes/jobqueue/JobQueueFederated.php | 131 |
1 files changed, 43 insertions, 88 deletions
diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php index c4301eed..d985d449 100644 --- a/includes/jobqueue/JobQueueFederated.php +++ b/includes/jobqueue/JobQueueFederated.php @@ -49,20 +49,12 @@ class JobQueueFederated extends JobQueue { /** @var HashRing */ protected $partitionRing; - /** @var HashRing */ - protected $partitionPushRing; /** @var array (partition name => JobQueue) reverse sorted by weight */ protected $partitionQueues = array(); - /** @var BagOStuff */ - protected $cache; - /** @var int Maximum number of partitions to try */ protected $maxPartitionsTry; - const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating - const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date - /** * @param array $params Possible keys: * - sectionsByWiki : A map of wiki IDs to section names. @@ -72,10 +64,8 @@ class JobQueueFederated extends JobQueue { * have explicitly defined sections. * - configByPartition : Map of queue partition names to configuration arrays. * These configuration arrays are passed to JobQueue::factory(). - * The options set here are overriden by those passed to this + * The options set here are overridden by those passed to this * the federated queue itself (e.g. 'order' and 'claimTTL'). - * - partitionsNoPush : List of partition names that can handle pop() but not push(). - * This can be used to migrate away from a certain partition. * - maxPartitionsTry : Maximum number of times to attempt job insertion using * different partition queues. This improves availability * during failure, at the cost of added latency and somewhat @@ -96,17 +86,10 @@ class JobQueueFederated extends JobQueue { // Get the full partition map $partitionMap = $params['partitionsBySection'][$section]; arsort( $partitionMap, SORT_NUMERIC ); - // Get the partitions jobs can actually be pushed to - $partitionPushMap = $partitionMap; - if ( isset( $params['partitionsNoPush'] ) ) { - foreach ( $params['partitionsNoPush'] as $partition ) { - unset( $partitionPushMap[$partition] ); - } - } // Get the config to pass to merge into each partition queue config $baseConfig = $params; foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry', - 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o + 'partitionsBySection', 'configByPartition', ) as $o ) { unset( $baseConfig[$o] ); // partition queue doesn't care about this } @@ -120,14 +103,6 @@ class JobQueueFederated extends JobQueue { } // Ring of all partitions $this->partitionRing = new HashRing( $partitionMap ); - // Get the ring of partitions to push jobs into - if ( count( $partitionPushMap ) === count( $partitionMap ) ) { - $this->partitionPushRing = clone $this->partitionRing; // faster - } else { - $this->partitionPushRing = new HashRing( $partitionPushMap ); - } - // Aggregate cache some per-queue values if there are multiple partition queues - $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); } protected function supportedOrders() { @@ -140,19 +115,16 @@ class JobQueueFederated extends JobQueue { } protected function supportsDelayedJobs() { - return true; // defer checks to the partitions + foreach ( $this->partitionQueues as $queue ) { + if ( !$queue->supportsDelayedJobs() ) { + return false; + } + } + + return true; } protected function doIsEmpty() { - $key = $this->getCacheKey( 'empty' ); - - $isEmpty = $this->cache->get( $key ); - if ( $isEmpty === 'true' ) { - return true; - } elseif ( $isEmpty === 'false' ) { - return false; - } - $empty = true; $failed = 0; foreach ( $this->partitionQueues as $queue ) { @@ -160,12 +132,11 @@ class JobQueueFederated extends JobQueue { $empty = $empty && $queue->doIsEmpty(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); - $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG ); return $empty; } @@ -191,32 +162,24 @@ class JobQueueFederated extends JobQueue { * @return int */ protected function getCrossPartitionSum( $type, $method ) { - $key = $this->getCacheKey( $type ); - - $count = $this->cache->get( $key ); - if ( $count !== false ) { - return $count; - } - + $count = 0; $failed = 0; foreach ( $this->partitionQueues as $queue ) { try { $count += $queue->$method(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); - $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); - return $count; } protected function doBatchPush( array $jobs, $flags ) { // Local ring variable that may be changed to point to a new ring on failure - $partitionRing = $this->partitionPushRing; + $partitionRing = $this->partitionRing; // Try to insert the jobs and update $partitionsTry on any failures. // Retry to insert any remaning jobs again, ignoring the bad partitions. $jobsLeft = $jobs; @@ -277,12 +240,9 @@ class JobQueueFederated extends JobQueue { $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); } catch ( JobQueueError $e ) { $ok = false; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } - if ( $ok ) { - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); - } else { + if ( !$ok ) { if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist throw new JobQueueError( "Could not insert job(s), no partitions available." ); } @@ -299,12 +259,9 @@ class JobQueueFederated extends JobQueue { $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); } catch ( JobQueueError $e ) { $ok = false; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } - if ( $ok ) { - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); - } else { + if ( !$ok ) { if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist throw new JobQueueError( "Could not insert job(s), no partitions available." ); } @@ -331,7 +288,7 @@ class JobQueueFederated extends JobQueue { $job = $queue->pop(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); $job = false; } if ( $job ) { @@ -344,9 +301,6 @@ class JobQueueFederated extends JobQueue { } $this->throwErrorIfAllPartitionsDown( $failed ); - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'true', self::CACHE_TTL_LONG ); - return false; } @@ -361,12 +315,12 @@ class JobQueueFederated extends JobQueue { protected function doIsRootJobOldDuplicate( Job $job ) { $params = $job->getRootJobParams(); $sigature = $params['rootJobSignature']; - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + $partition = $this->partitionRing->getLiveLocation( $sigature ); try { return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); } catch ( JobQueueError $e ) { - if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionRing->getLiveLocation( $sigature ); return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); } } @@ -377,12 +331,12 @@ class JobQueueFederated extends JobQueue { protected function doDeduplicateRootJob( Job $job ) { $params = $job->getRootJobParams(); $sigature = $params['rootJobSignature']; - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + $partition = $this->partitionRing->getLiveLocation( $sigature ); try { return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); } catch ( JobQueueError $e ) { - if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { - $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionRing->getLiveLocation( $sigature ); return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); } } @@ -398,7 +352,7 @@ class JobQueueFederated extends JobQueue { $queue->doDelete(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -413,7 +367,7 @@ class JobQueueFederated extends JobQueue { $queue->waitForBackups(); } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -440,10 +394,6 @@ class JobQueueFederated extends JobQueue { 'abandonedcount' ); - foreach ( $types as $type ) { - $this->cache->delete( $this->getCacheKey( $type ) ); - } - /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) { $queue->doFlushCaches(); @@ -472,6 +422,17 @@ class JobQueueFederated extends JobQueue { return $iterator; } + public function getAllAbandonedJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllAbandonedJobs() ); + } + + return $iterator; + } + public function getCoalesceLocationInternal() { return "JobQueueFederated:wiki:{$this->wiki}" . sha1( serialize( array_keys( $this->partitionQueues ) ) ); @@ -495,7 +456,7 @@ class JobQueueFederated extends JobQueue { } } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -519,7 +480,7 @@ class JobQueueFederated extends JobQueue { } } catch ( JobQueueError $e ) { ++$failed; - MWExceptionHandler::logException( $e ); + $this->logException( $e ); } } $this->throwErrorIfAllPartitionsDown( $failed ); @@ -527,6 +488,10 @@ class JobQueueFederated extends JobQueue { return $result; } + protected function logException( Exception $e ) { + wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() ); + } + /** * Throw an error if no partitions available * @@ -546,14 +511,4 @@ class JobQueueFederated extends JobQueue { $queue->setTestingPrefix( $key ); } } - - /** - * @param string $property - * @return string - */ - private function getCacheKey( $property ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); - } } |