From f6d65e533c62f6deb21342d4901ece24497b433e Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Thu, 4 Jun 2015 07:31:04 +0200 Subject: Update to MediaWiki 1.25.1 --- includes/jobqueue/JobQueueRedis.php | 255 +++++++++--------------------------- 1 file changed, 59 insertions(+), 196 deletions(-) (limited to 'includes/jobqueue/JobQueueRedis.php') diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 3519eac8..6c823fb9 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -24,7 +24,7 @@ /** * Class to handle job queues stored in Redis * - * This is faster, less resource intensive, queue that JobQueueDB. + * This is a faster and less resource-intensive job queue than JobQueueDB. * All data for a queue using this class is placed into one redis server. * * There are eight main redis keys used to track jobs: @@ -49,7 +49,7 @@ * * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. * Additionally, it should be noted that redis has different persistence modes, such - * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be + * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be * made on the servers based on what queues are using it and what tolerance they have. * * @ingroup JobQueue @@ -64,8 +64,6 @@ class JobQueueRedis extends JobQueue { protected $server; /** @var string Compression method to use */ protected $compression; - /** @var bool */ - protected $daemonized; const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) @@ -90,7 +88,11 @@ class JobQueueRedis extends JobQueue { $this->server = $params['redisServer']; $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); - $this->daemonized = !empty( $params['daemonized'] ); + if ( empty( $params['daemonized'] ) ) { + throw new Exception( + "Non-daemonized mode is no longer supported. Please install the " . + "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); + } } protected function supportedOrders() { @@ -134,9 +136,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAcquiredCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { $conn->multi( Redis::PIPELINE ); @@ -155,9 +154,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetDelayedCount() { - if ( !$this->checkDelay ) { - return 0; // no delayed jobs - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); @@ -172,9 +168,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAbandonedCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); @@ -299,24 +292,10 @@ LUA; protected function doPop() { $job = false; - // Push ready delayed jobs into the queue every 10 jobs to spread the load. - // This is also done as a periodic task, but we don't want too much done at once. - if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $conn = $this->getConnection(); try { do { - if ( $this->claimTTL > 0 ) { - // Keep the claimed job list down for high-traffic queues - if ( mt_rand( 0, 99 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $blob = $this->popAndAcquireBlob( $conn ); - } else { - $blob = $this->popAndDeleteBlob( $conn ); - } + $blob = $this->popAndAcquireBlob( $conn ); if ( !is_string( $blob ) ) { break; // no jobs; nothing to do } @@ -328,7 +307,7 @@ LUA; continue; } - // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed + // If $item is invalid, the runner loop recyling will cleanup as needed $job = $this->getJobFromFields( $item ); // may be false } while ( !$job ); // job may be false if invalid } catch ( RedisException $e ) { @@ -338,39 +317,6 @@ LUA; return $job; } - /** - * @param RedisConnRef $conn - * @return array Serialized string or false - * @throws RedisException - */ - protected function popAndDeleteBlob( RedisConnRef $conn ) { - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] - $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] - $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - ), - 4 # number of first argument(s) that are keys - ); - } - /** * @param RedisConnRef $conn * @return array Serialized string or false @@ -416,36 +362,35 @@ LUA; if ( !isset( $job->metadata['uuid'] ) ) { throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); } - if ( $this->claimTTL > 0 ) { - $conn = $this->getConnection(); - try { - static $script = + + $conn = $this->getConnection(); + try { + static $script = <<luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'h-data' ), # KEYS[3] - $job->metadata['uuid'] # ARGV[1] - ), - 3 # number of first argument(s) that are keys - ); - - if ( !$res ) { - wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); - - return false; - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $job->metadata['uuid'] # ARGV[1] + ), + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + + return false; } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); } return true; @@ -571,6 +516,29 @@ LUA; } } + /** + * @see JobQueue::getAllAbandonedJobs() + * @return Iterator + */ + public function getAllAbandonedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + + return new MappedIterator( // delayed jobs + $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ), + function ( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + public function getCoalesceLocationInternal() { return "RedisServer:" . $this->server; } @@ -629,116 +597,11 @@ LUA; } } - /** - * Recycle or destroy any jobs that have been claimed for too long - * and release any ready delayed jobs into the queue - * - * @return int Number of jobs recycled/deleted/undelayed - * @throws MWException|JobQueueError - */ - public function recyclePruneAndUndelayJobs() { - $count = 0; - // For each job item that can be retried, we need to add it back to the - // main queue and remove it from the list of currenty claimed job items. - // For those that cannot, they are marked as dead and kept around for - // investigation and manual job restoration but are eventually deleted. - $conn = $this->getConnection(); - try { - $now = time(); - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - $this->getQueueKey( 'z-abandoned' ), # KEYS[5] - $this->getQueueKey( 'z-delayed' ), # KEYS[6] - $now - $this->claimTTL, # ARGV[1] - $now - self::MAX_AGE_PRUNE, # ARGV[2] - $this->maxTries, # ARGV[3] - $now # ARGV[4] - ), - 6 # number of first argument(s) that are keys - ); - if ( $res ) { - list( $released, $abandoned, $pruned, $undelayed ) = $res; - $count += $released + $pruned + $undelayed; - JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); - JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); - JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki ); - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); - } - - return $count; - } - /** * @return array */ protected function doGetPeriodicTasks() { - if ( $this->daemonized ) { - return array(); // managed in the runner loop - } - $periods = array( 3600 ); // standard cleanup (useful on config change) - if ( $this->claimTTL > 0 ) { - $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing - } - if ( $this->checkDelay ) { - $periods[] = 300; // 5 minutes - } - $period = min( $periods ); - $period = max( $period, 30 ); // sanity - - return array( - 'recyclePruneAndUndelayJobs' => array( - 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), - 'period' => $period, - ) - ); + return array(); // managed in the runner loop } /** -- cgit v1.2.3-54-g00ecf