diff options
Diffstat (limited to 'includes/jobqueue/JobQueueRedis.php')
-rw-r--r-- | includes/jobqueue/JobQueueRedis.php | 151 |
1 files changed, 83 insertions, 68 deletions
diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 6c823fb9..29c8068a 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -81,6 +81,7 @@ class JobQueueRedis extends JobQueue { * - daemonized : Set to true if the redisJobRunnerService runs in the background. * This will disable job recycling/undelaying from the MediaWiki side * to avoid redundance and out-of-sync configuration. + * @throws InvalidArgumentException */ public function __construct( array $params ) { parent::__construct( $params ); @@ -89,7 +90,7 @@ class JobQueueRedis extends JobQueue { $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); if ( empty( $params['daemonized'] ) ) { - throw new Exception( + throw new InvalidArgumentException( "Non-daemonized mode is no longer supported. Please install the " . "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); } @@ -110,7 +111,7 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doIsEmpty() * @return bool - * @throws MWException + * @throws JobQueueError */ protected function doIsEmpty() { return $this->doGetSize() == 0; @@ -119,7 +120,7 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doGetSize() * @return int - * @throws MWException + * @throws JobQueueError */ protected function doGetSize() { $conn = $this->getConnection(); @@ -205,7 +206,7 @@ class JobQueueRedis extends JobQueue { if ( $flags & self::QOS_ATOMIC ) { $batches = array( $items ); // all or nothing } else { - $batches = array_chunk( $items, 500 ); // avoid tying up the server + $batches = array_chunk( $items, 100 ); // avoid tying up the server } $failed = 0; $pushed = 0; @@ -222,9 +223,9 @@ class JobQueueRedis extends JobQueue { throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." ); } - JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki ); - JobQueue::incrStats( 'job-insert-duplicate', $this->type, - count( $items ) - $failed - $pushed, $this->wiki ); + JobQueue::incrStats( 'inserts', $this->type, count( $items ) ); + JobQueue::incrStats( 'dupe_inserts', $this->type, + count( $items ) - $failed - $pushed ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } @@ -300,7 +301,7 @@ LUA; break; // no jobs; nothing to do } - JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); + JobQueue::incrStats( 'job-pop', $this->type ); $item = $this->unserialize( $blob ); if ( $item === false ) { wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); @@ -356,13 +357,15 @@ LUA; * @see JobQueue::doAck() * @param Job $job * @return Job|bool - * @throws MWException|JobQueueError + * @throws UnexpectedValueException + * @throws JobQueueError */ protected function doAck( Job $job ) { if ( !isset( $job->metadata['uuid'] ) ) { - throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); + throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); } + $uuid = $job->metadata['uuid']; $conn = $this->getConnection(); try { static $script = @@ -379,16 +382,18 @@ LUA; $this->getQueueKey( 'z-claimed' ), # KEYS[1] $this->getQueueKey( 'h-attempts' ), # KEYS[2] $this->getQueueKey( 'h-data' ), # KEYS[3] - $job->metadata['uuid'] # ARGV[1] + $uuid # ARGV[1] ), 3 # number of first argument(s) that are keys ); if ( !$res ) { - wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." ); return false; } + + JobQueue::incrStats( 'job-ack', $this->type ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } @@ -398,13 +403,14 @@ LUA; /** * @see JobQueue::doDeduplicateRootJob() - * @param Job $job + * @param IJobSpecification $job * @return bool - * @throws MWException|JobQueueError + * @throws JobQueueError + * @throws LogicException */ - protected function doDeduplicateRootJob( Job $job ) { + protected function doDeduplicateRootJob( IJobSpecification $job ) { if ( !$job->hasRootJobParams() ) { - throw new MWException( "Cannot register root job; missing parameters." ); + throw new LogicException( "Cannot register root job; missing parameters." ); } $params = $job->getRootJobParams(); @@ -441,6 +447,7 @@ LUA; // Get the last time this root job was enqueued $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); } catch ( RedisException $e ) { + $timestamp = false; $this->throwRedisException( $conn, $e ); } @@ -473,70 +480,84 @@ LUA; /** * @see JobQueue::getAllQueuedJobs() * @return Iterator + * @throws JobQueueError */ public function getAllQueuedJobs() { $conn = $this->getConnection(); try { - $that = $this; - - return new MappedIterator( - $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), - function ( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { - return is_object( $job ); - } ) - ); + $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } + + return $this->getJobIterator( $conn, $uids ); } /** - * @see JobQueue::getAllQueuedJobs() + * @see JobQueue::getAllDelayedJobs() * @return Iterator + * @throws JobQueueError */ public function getAllDelayedJobs() { $conn = $this->getConnection(); try { - $that = $this; - - return new MappedIterator( // delayed jobs - $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), - function ( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { - return is_object( $job ); - } ) - ); + $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @see JobQueue::getAllAcquiredJobs() + * @return Iterator + * @throws JobQueueError + */ + public function getAllAcquiredJobs() { + $conn = $this->getConnection(); + try { + $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $this->getJobIterator( $conn, $uids ); } /** * @see JobQueue::getAllAbandonedJobs() * @return Iterator + * @throws JobQueueError */ public function getAllAbandonedJobs() { $conn = $this->getConnection(); try { - $that = $this; - - return new MappedIterator( // delayed jobs - $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ), - function ( $uid ) use ( $that, $conn ) { - return $that->getJobFromUidInternal( $uid, $conn ); - }, - array( 'accept' => function ( $job ) { - return is_object( $job ); - } ) - ); + $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @param RedisConnRef $conn + * @param array $uids List of job UUIDs + * @return MappedIterator + */ + protected function getJobIterator( RedisConnRef $conn, array $uids ) { + $that = $this; + + return new MappedIterator( + $uids, + function ( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) + ); } public function getCoalesceLocationInternal() { @@ -575,7 +596,8 @@ LUA; * @param string $uid * @param RedisConnRef $conn * @return Job|bool Returns false if the job does not exist - * @throws MWException|JobQueueError + * @throws JobQueueError + * @throws UnexpectedValueException */ public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { try { @@ -583,13 +605,16 @@ LUA; if ( $data === false ) { return false; // not found } - $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); + $item = $this->unserialize( $data ); if ( !is_array( $item ) ) { // this shouldn't happen - throw new MWException( "Could not find job with ID '$uid'." ); + throw new UnexpectedValueException( "Could not find job with ID '$uid'." ); } $title = Title::makeTitle( $item['namespace'], $item['title'] ); $job = Job::factory( $item['type'], $title, $item['params'] ); $job->metadata['uuid'] = $item['uuid']; + $job->metadata['timestamp'] = $item['timestamp']; + // Add in attempt count for debugging at showJobs.php + $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ); return $job; } catch ( RedisException $e ) { @@ -598,13 +623,6 @@ LUA; } /** - * @return array - */ - protected function doGetPeriodicTasks() { - return array(); // managed in the runner loop - } - - /** * @param IJobSpecification $job * @return array */ @@ -631,15 +649,12 @@ LUA; * @return Job|bool */ protected function getJobFromFields( array $fields ) { - $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); - if ( $title ) { - $job = Job::factory( $fields['type'], $title, $fields['params'] ); - $job->metadata['uuid'] = $fields['uuid']; + $title = Title::makeTitle( $fields['namespace'], $fields['title'] ); + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['uuid'] = $fields['uuid']; + $job->metadata['timestamp'] = $fields['timestamp']; - return $job; - } - - return false; + return $job; } /** |