summaryrefslogtreecommitdiff
path: root/includes/jobqueue/JobQueueFederated.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue/JobQueueFederated.php')
-rw-r--r--includes/jobqueue/JobQueueFederated.php131
1 files changed, 43 insertions, 88 deletions
diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php
index c4301eed..d985d449 100644
--- a/includes/jobqueue/JobQueueFederated.php
+++ b/includes/jobqueue/JobQueueFederated.php
@@ -49,20 +49,12 @@
class JobQueueFederated extends JobQueue {
/** @var HashRing */
protected $partitionRing;
- /** @var HashRing */
- protected $partitionPushRing;
/** @var array (partition name => JobQueue) reverse sorted by weight */
protected $partitionQueues = array();
- /** @var BagOStuff */
- protected $cache;
-
/** @var int Maximum number of partitions to try */
protected $maxPartitionsTry;
- const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
-
/**
* @param array $params Possible keys:
* - sectionsByWiki : A map of wiki IDs to section names.
@@ -72,10 +64,8 @@ class JobQueueFederated extends JobQueue {
* have explicitly defined sections.
* - configByPartition : Map of queue partition names to configuration arrays.
* These configuration arrays are passed to JobQueue::factory().
- * The options set here are overriden by those passed to this
+ * The options set here are overridden by those passed to this
* the federated queue itself (e.g. 'order' and 'claimTTL').
- * - partitionsNoPush : List of partition names that can handle pop() but not push().
- * This can be used to migrate away from a certain partition.
* - maxPartitionsTry : Maximum number of times to attempt job insertion using
* different partition queues. This improves availability
* during failure, at the cost of added latency and somewhat
@@ -96,17 +86,10 @@ class JobQueueFederated extends JobQueue {
// Get the full partition map
$partitionMap = $params['partitionsBySection'][$section];
arsort( $partitionMap, SORT_NUMERIC );
- // Get the partitions jobs can actually be pushed to
- $partitionPushMap = $partitionMap;
- if ( isset( $params['partitionsNoPush'] ) ) {
- foreach ( $params['partitionsNoPush'] as $partition ) {
- unset( $partitionPushMap[$partition] );
- }
- }
// Get the config to pass to merge into each partition queue config
$baseConfig = $params;
foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
- 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
+ 'partitionsBySection', 'configByPartition', ) as $o
) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
@@ -120,14 +103,6 @@ class JobQueueFederated extends JobQueue {
}
// Ring of all partitions
$this->partitionRing = new HashRing( $partitionMap );
- // Get the ring of partitions to push jobs into
- if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
- $this->partitionPushRing = clone $this->partitionRing; // faster
- } else {
- $this->partitionPushRing = new HashRing( $partitionPushMap );
- }
- // Aggregate cache some per-queue values if there are multiple partition queues
- $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
}
protected function supportedOrders() {
@@ -140,19 +115,16 @@ class JobQueueFederated extends JobQueue {
}
protected function supportsDelayedJobs() {
- return true; // defer checks to the partitions
+ foreach ( $this->partitionQueues as $queue ) {
+ if ( !$queue->supportsDelayedJobs() ) {
+ return false;
+ }
+ }
+
+ return true;
}
protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
$empty = true;
$failed = 0;
foreach ( $this->partitionQueues as $queue ) {
@@ -160,12 +132,11 @@ class JobQueueFederated extends JobQueue {
$empty = $empty && $queue->doIsEmpty();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
return $empty;
}
@@ -191,32 +162,24 @@ class JobQueueFederated extends JobQueue {
* @return int
*/
protected function getCrossPartitionSum( $type, $method ) {
- $key = $this->getCacheKey( $type );
-
- $count = $this->cache->get( $key );
- if ( $count !== false ) {
- return $count;
- }
-
+ $count = 0;
$failed = 0;
foreach ( $this->partitionQueues as $queue ) {
try {
$count += $queue->$method();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
-
return $count;
}
protected function doBatchPush( array $jobs, $flags ) {
// Local ring variable that may be changed to point to a new ring on failure
- $partitionRing = $this->partitionPushRing;
+ $partitionRing = $this->partitionRing;
// Try to insert the jobs and update $partitionsTry on any failures.
// Retry to insert any remaning jobs again, ignoring the bad partitions.
$jobsLeft = $jobs;
@@ -277,12 +240,9 @@ class JobQueueFederated extends JobQueue {
$queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
} catch ( JobQueueError $e ) {
$ok = false;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
- } else {
+ if ( !$ok ) {
if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
@@ -299,12 +259,9 @@ class JobQueueFederated extends JobQueue {
$queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
} catch ( JobQueueError $e ) {
$ok = false;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
- } else {
+ if ( !$ok ) {
if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
@@ -331,7 +288,7 @@ class JobQueueFederated extends JobQueue {
$job = $queue->pop();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
$job = false;
}
if ( $job ) {
@@ -344,9 +301,6 @@ class JobQueueFederated extends JobQueue {
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'true', self::CACHE_TTL_LONG );
-
return false;
}
@@ -361,12 +315,12 @@ class JobQueueFederated extends JobQueue {
protected function doIsRootJobOldDuplicate( Job $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
try {
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
} catch ( JobQueueError $e ) {
- if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
}
}
@@ -377,12 +331,12 @@ class JobQueueFederated extends JobQueue {
protected function doDeduplicateRootJob( Job $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
try {
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
} catch ( JobQueueError $e ) {
- if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
}
}
@@ -398,7 +352,7 @@ class JobQueueFederated extends JobQueue {
$queue->doDelete();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -413,7 +367,7 @@ class JobQueueFederated extends JobQueue {
$queue->waitForBackups();
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -440,10 +394,6 @@ class JobQueueFederated extends JobQueue {
'abandonedcount'
);
- foreach ( $types as $type ) {
- $this->cache->delete( $this->getCacheKey( $type ) );
- }
-
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->doFlushCaches();
@@ -472,6 +422,17 @@ class JobQueueFederated extends JobQueue {
return $iterator;
}
+ public function getAllAbandonedJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllAbandonedJobs() );
+ }
+
+ return $iterator;
+ }
+
public function getCoalesceLocationInternal() {
return "JobQueueFederated:wiki:{$this->wiki}" .
sha1( serialize( array_keys( $this->partitionQueues ) ) );
@@ -495,7 +456,7 @@ class JobQueueFederated extends JobQueue {
}
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -519,7 +480,7 @@ class JobQueueFederated extends JobQueue {
}
} catch ( JobQueueError $e ) {
++$failed;
- MWExceptionHandler::logException( $e );
+ $this->logException( $e );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
@@ -527,6 +488,10 @@ class JobQueueFederated extends JobQueue {
return $result;
}
+ protected function logException( Exception $e ) {
+ wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() );
+ }
+
/**
* Throw an error if no partitions available
*
@@ -546,14 +511,4 @@ class JobQueueFederated extends JobQueue {
$queue->setTestingPrefix( $key );
}
}
-
- /**
- * @param string $property
- * @return string
- */
- private function getCacheKey( $property ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
- }
}