diff options
Diffstat (limited to 'includes/job/JobQueueGroup.php')
-rw-r--r-- | includes/job/JobQueueGroup.php | 90 |
1 files changed, 83 insertions, 7 deletions
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 351c71a3..fa7fee5f 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -36,10 +36,14 @@ class JobQueueGroup { protected $wiki; // string; wiki ID + /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ + protected $coalescedQueues; + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job const USE_CACHE = 1; // integer; use process or persistent cache + const USE_PRIORITY = 2; // integer; respect deprioritization const PROC_CACHE_TTL = 15; // integer; seconds @@ -146,6 +150,9 @@ class JobQueueGroup { */ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { if ( is_string( $qtype ) ) { // specific job type + if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) { + return false; // back off + } $job = $this->get( $qtype )->pop(); if ( !$job ) { JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); @@ -167,6 +174,9 @@ class JobQueueGroup { shuffle( $types ); // avoid starvation foreach ( $types as $type ) { // for each queue... + if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) { + continue; // back off + } $job = $this->get( $type )->pop(); if ( $job ) { // found return $job; @@ -247,15 +257,72 @@ class JobQueueGroup { */ public function getQueuesWithJobs() { $types = array(); - foreach ( $this->getQueueTypes() as $type ) { - if ( !$this->get( $type )->isEmpty() ) { - $types[] = $type; + foreach ( $this->getCoalescedQueues() as $info ) { + $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); + if ( is_array( $nonEmpty ) ) { // batching features supported + $types = array_merge( $types, $nonEmpty ); + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } } } return $types; } /** + * Get the size of the queus for a list of job types + * + * @return Array Map of (job type => size) + */ + public function getQueueSizes() { + $sizeMap = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); + if ( is_array( $sizes ) ) { // batching features supported + $sizeMap = $sizeMap + $sizes; + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + $sizeMap[$type] = $this->get( $type )->getSize(); + } + } + } + return $sizeMap; + } + + /** + * @return array + */ + protected function getCoalescedQueues() { + global $wgJobTypeConf; + + if ( $this->coalescedQueues === null ) { + $this->coalescedQueues = array(); + foreach ( $wgJobTypeConf as $type => $conf ) { + $queue = JobQueue::factory( + array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); + $loc = $queue->getCoalesceLocationInternal(); + if ( !isset( $this->coalescedQueues[$loc] ) ) { + $this->coalescedQueues[$loc]['queue'] = $queue; + $this->coalescedQueues[$loc]['types'] = array(); + } + if ( $type === 'default' ) { + $this->coalescedQueues[$loc]['types'] = array_merge( + $this->coalescedQueues[$loc]['types'], + array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) + ); + } else { + $this->coalescedQueues[$loc]['types'][] = $type; + } + } + } + + return $this->coalescedQueues; + } + + /** * Check if jobs should not be popped of a queue right now. * This is only used for performance, such as to avoid spamming * the queue with many sub-jobs before they actually get run. @@ -264,10 +331,15 @@ class JobQueueGroup { * @return bool */ public function isQueueDeprioritized( $type ) { + if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) { + return $this->cache->get( 'isDeprioritized', $type ); + } if ( $type === 'refreshLinks2' ) { // Don't keep converting refreshLinks2 => refreshLinks jobs if the // later jobs have not been done yet. This helps throttle queue spam. - return !$this->get( 'refreshLinks' )->isEmpty(); + $deprioritized = !$this->get( 'refreshLinks' )->isEmpty(); + $this->cache->set( 'isDeprioritized', $type, $deprioritized ); + return $deprioritized; } return false; } @@ -298,9 +370,13 @@ class JobQueueGroup { } elseif ( !isset( $lastRuns[$type][$task] ) || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) { - if ( call_user_func( $definition['callback'] ) !== null ) { - $tasksRun[$type][$task] = time(); - ++$count; + try { + if ( call_user_func( $definition['callback'] ) !== null ) { + $tasksRun[$type][$task] = time(); + ++$count; + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); } } } |