diff options
author | Pierre Schmitz <pierre@archlinux.de> | 2015-06-04 07:31:04 +0200 |
---|---|---|
committer | Pierre Schmitz <pierre@archlinux.de> | 2015-06-04 07:58:39 +0200 |
commit | f6d65e533c62f6deb21342d4901ece24497b433e (patch) | |
tree | f28adf0362d14bcd448f7b65a7aaf38650f923aa /includes/jobqueue/JobQueueRedis.php | |
parent | c27b2e832fe25651ef2410fae85b41072aae7519 (diff) |
Update to MediaWiki 1.25.1
Diffstat (limited to 'includes/jobqueue/JobQueueRedis.php')
-rw-r--r-- | includes/jobqueue/JobQueueRedis.php | 255 |
1 files changed, 59 insertions, 196 deletions
diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 3519eac8..6c823fb9 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -24,7 +24,7 @@ /** * Class to handle job queues stored in Redis * - * This is faster, less resource intensive, queue that JobQueueDB. + * This is a faster and less resource-intensive job queue than JobQueueDB. * All data for a queue using this class is placed into one redis server. * * There are eight main redis keys used to track jobs: @@ -49,7 +49,7 @@ * * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. * Additionally, it should be noted that redis has different persistence modes, such - * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be + * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be * made on the servers based on what queues are using it and what tolerance they have. * * @ingroup JobQueue @@ -64,8 +64,6 @@ class JobQueueRedis extends JobQueue { protected $server; /** @var string Compression method to use */ protected $compression; - /** @var bool */ - protected $daemonized; const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) @@ -90,7 +88,11 @@ class JobQueueRedis extends JobQueue { $this->server = $params['redisServer']; $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); - $this->daemonized = !empty( $params['daemonized'] ); + if ( empty( $params['daemonized'] ) ) { + throw new Exception( + "Non-daemonized mode is no longer supported. Please install the " . + "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); + } } protected function supportedOrders() { @@ -134,9 +136,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAcquiredCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { $conn->multi( Redis::PIPELINE ); @@ -155,9 +154,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetDelayedCount() { - if ( !$this->checkDelay ) { - return 0; // no delayed jobs - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); @@ -172,9 +168,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAbandonedCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); @@ -299,24 +292,10 @@ LUA; protected function doPop() { $job = false; - // Push ready delayed jobs into the queue every 10 jobs to spread the load. - // This is also done as a periodic task, but we don't want too much done at once. - if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $conn = $this->getConnection(); try { do { - if ( $this->claimTTL > 0 ) { - // Keep the claimed job list down for high-traffic queues - if ( mt_rand( 0, 99 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $blob = $this->popAndAcquireBlob( $conn ); - } else { - $blob = $this->popAndDeleteBlob( $conn ); - } + $blob = $this->popAndAcquireBlob( $conn ); if ( !is_string( $blob ) ) { break; // no jobs; nothing to do } @@ -328,7 +307,7 @@ LUA; continue; } - // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed + // If $item is invalid, the runner loop recyling will cleanup as needed $job = $this->getJobFromFields( $item ); // may be false } while ( !$job ); // job may be false if invalid } catch ( RedisException $e ) { @@ -343,39 +322,6 @@ LUA; * @return array Serialized string or false * @throws RedisException */ - protected function popAndDeleteBlob( RedisConnRef $conn ) { - static $script = -<<<LUA - local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS) - -- Pop an item off the queue - local id = redis.call('rpop',kUnclaimed) - if not id then return false end - -- Get the job data and remove it - local item = redis.call('hGet',kData,id) - redis.call('hDel',kData,id) - -- Allow new duplicates of this job - local sha1 = redis.call('hGet',kSha1ById,id) - if sha1 then redis.call('hDel',kIdBySha1,sha1) end - redis.call('hDel',kSha1ById,id) - -- Return the job data - return item -LUA; - return $conn->luaEval( $script, - array( - $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] - $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] - $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - ), - 4 # number of first argument(s) that are keys - ); - } - - /** - * @param RedisConnRef $conn - * @return array Serialized string or false - * @throws RedisException - */ protected function popAndAcquireBlob( RedisConnRef $conn ) { static $script = <<<LUA @@ -416,36 +362,35 @@ LUA; if ( !isset( $job->metadata['uuid'] ) ) { throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); } - if ( $this->claimTTL > 0 ) { - $conn = $this->getConnection(); - try { - static $script = + + $conn = $this->getConnection(); + try { + static $script = <<<LUA - local kClaimed, kAttempts, kData = unpack(KEYS) - -- Unmark the job as claimed - redis.call('zRem',kClaimed,ARGV[1]) - redis.call('hDel',kAttempts,ARGV[1]) - -- Delete the job data itself - return redis.call('hDel',kData,ARGV[1]) + local kClaimed, kAttempts, kData = unpack(KEYS) + -- Unmark the job as claimed + redis.call('zRem',kClaimed,ARGV[1]) + redis.call('hDel',kAttempts,ARGV[1]) + -- Delete the job data itself + return redis.call('hDel',kData,ARGV[1]) LUA; - $res = $conn->luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'h-data' ), # KEYS[3] - $job->metadata['uuid'] # ARGV[1] - ), - 3 # number of first argument(s) that are keys - ); - - if ( !$res ) { - wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); - - return false; - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $job->metadata['uuid'] # ARGV[1] + ), + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + + return false; } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); } return true; @@ -571,6 +516,29 @@ LUA; } } + /** + * @see JobQueue::getAllAbandonedJobs() + * @return Iterator + */ + public function getAllAbandonedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + + return new MappedIterator( // delayed jobs + $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ), + function ( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + public function getCoalesceLocationInternal() { return "RedisServer:" . $this->server; } @@ -630,115 +598,10 @@ LUA; } /** - * Recycle or destroy any jobs that have been claimed for too long - * and release any ready delayed jobs into the queue - * - * @return int Number of jobs recycled/deleted/undelayed - * @throws MWException|JobQueueError - */ - public function recyclePruneAndUndelayJobs() { - $count = 0; - // For each job item that can be retried, we need to add it back to the - // main queue and remove it from the list of currenty claimed job items. - // For those that cannot, they are marked as dead and kept around for - // investigation and manual job restoration but are eventually deleted. - $conn = $this->getConnection(); - try { - $now = time(); - static $script = -<<<LUA - local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS) - local released,abandoned,pruned,undelayed = 0,0,0,0 - -- Get all non-dead jobs that have an expired claim on them. - -- The score for each item is the last claim timestamp (UNIX). - local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1]) - for k,id in ipairs(staleClaims) do - local timestamp = redis.call('zScore',kClaimed,id) - local attempts = redis.call('hGet',kAttempts,id) - if attempts < ARGV[3] then - -- Claim expired and retries left: re-enqueue the job - redis.call('lPush',kUnclaimed,id) - redis.call('hIncrBy',kAttempts,id,1) - released = released + 1 - else - -- Claim expired and no retries left: mark the job as dead - redis.call('zAdd',kAbandoned,timestamp,id) - abandoned = abandoned + 1 - end - redis.call('zRem',kClaimed,id) - end - -- Get all of the dead jobs that have been marked as dead for too long. - -- The score for each item is the last claim timestamp (UNIX). - local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2]) - for k,id in ipairs(deadClaims) do - -- Stale and out of retries: remove any traces of the job - redis.call('zRem',kAbandoned,id) - redis.call('hDel',kAttempts,id) - redis.call('hDel',kData,id) - pruned = pruned + 1 - end - -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp) - local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4]) - -- Migrate the jobs from the "delayed" set to the "unclaimed" list - for k,id in ipairs(ids) do - redis.call('lPush',kUnclaimed,id) - redis.call('zRem',kDelayed,id) - end - undelayed = #ids - return {released,abandoned,pruned,undelayed} -LUA; - $res = $conn->luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - $this->getQueueKey( 'z-abandoned' ), # KEYS[5] - $this->getQueueKey( 'z-delayed' ), # KEYS[6] - $now - $this->claimTTL, # ARGV[1] - $now - self::MAX_AGE_PRUNE, # ARGV[2] - $this->maxTries, # ARGV[3] - $now # ARGV[4] - ), - 6 # number of first argument(s) that are keys - ); - if ( $res ) { - list( $released, $abandoned, $pruned, $undelayed ) = $res; - $count += $released + $pruned + $undelayed; - JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); - JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); - JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki ); - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); - } - - return $count; - } - - /** * @return array */ protected function doGetPeriodicTasks() { - if ( $this->daemonized ) { - return array(); // managed in the runner loop - } - $periods = array( 3600 ); // standard cleanup (useful on config change) - if ( $this->claimTTL > 0 ) { - $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing - } - if ( $this->checkDelay ) { - $periods[] = 300; // 5 minutes - } - $period = min( $periods ); - $period = max( $period, 30 ); // sanity - - return array( - 'recyclePruneAndUndelayJobs' => array( - 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), - 'period' => $period, - ) - ); + return array(); // managed in the runner loop } /** |