diff options
Diffstat (limited to 'includes/jobqueue')
24 files changed, 6513 insertions, 0 deletions
diff --git a/includes/jobqueue/Job.php b/includes/jobqueue/Job.php new file mode 100644 index 00000000..ee3f2c2b --- /dev/null +++ b/includes/jobqueue/Job.php @@ -0,0 +1,350 @@ +<?php +/** + * Job queue task base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @defgroup JobQueue JobQueue + */ + +/** + * Class to both describe a background job and handle jobs. + * The queue aspects of this class are now deprecated. + * Using the class to push jobs onto queues is deprecated (use JobSpecification). + * + * @ingroup JobQueue + */ +abstract class Job implements IJobSpecification { + /** @var string */ + public $command; + + /** @var array|bool Array of job parameters or false if none */ + public $params; + + /** @var array Additional queue metadata */ + public $metadata = array(); + + /** @var Title */ + protected $title; + + /** @var bool Expensive jobs may set this to true */ + protected $removeDuplicates; + + /** @var string Text for error that occurred last */ + protected $error; + + /*------------------------------------------------------------------------- + * Abstract functions + *------------------------------------------------------------------------*/ + + /** + * Run the job + * @return bool Success + */ + abstract public function run(); + + /*------------------------------------------------------------------------- + * Static functions + *------------------------------------------------------------------------*/ + + /** + * Create the appropriate object to handle a specific job + * + * @param string $command Job command + * @param Title $title Associated title + * @param array|bool $params Job parameters + * @throws MWException + * @return Job + */ + public static function factory( $command, Title $title, $params = false ) { + global $wgJobClasses; + if ( isset( $wgJobClasses[$command] ) ) { + $class = $wgJobClasses[$command]; + + return new $class( $title, $params ); + } + throw new MWException( "Invalid job command `{$command}`" ); + } + + /** + * Batch-insert a group of jobs into the queue. + * This will be wrapped in a transaction with a forced commit. + * + * This may add duplicate at insert time, but they will be + * removed later on, when the first one is popped. + * + * @param array $jobs Array of Job objects + * @return bool + * @deprecated since 1.21 + */ + public static function batchInsert( $jobs ) { + JobQueueGroup::singleton()->push( $jobs ); + return true; + } + + /** + * Insert a group of jobs into the queue. + * + * Same as batchInsert() but does not commit and can thus + * be rolled-back as part of a larger transaction. However, + * large batches of jobs can cause slave lag. + * + * @param array $jobs Array of Job objects + * @return bool + * @deprecated since 1.21 + */ + public static function safeBatchInsert( $jobs ) { + JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC ); + return true; + } + + /** + * Pop a job of a certain type. This tries less hard than pop() to + * actually find a job; it may be adversely affected by concurrent job + * runners. + * + * @param string $type + * @return Job|bool Returns false if there are no jobs + * @deprecated since 1.21 + */ + public static function pop_type( $type ) { + return JobQueueGroup::singleton()->get( $type )->pop(); + } + + /** + * Pop a job off the front of the queue. + * This is subject to $wgJobTypesExcludedFromDefaultQueue. + * + * @return Job|bool False if there are no jobs + * @deprecated since 1.21 + */ + public static function pop() { + return JobQueueGroup::singleton()->pop(); + } + + /*------------------------------------------------------------------------- + * Non-static functions + *------------------------------------------------------------------------*/ + + /** + * @param string $command + * @param Title $title + * @param array|bool $params + */ + public function __construct( $command, $title, $params = false ) { + $this->command = $command; + $this->title = $title; + $this->params = $params; + + // expensive jobs may set this to true + $this->removeDuplicates = false; + } + + /** + * @return string + */ + public function getType() { + return $this->command; + } + + /** + * @return Title + */ + public function getTitle() { + return $this->title; + } + + /** + * @return array + */ + public function getParams() { + return $this->params; + } + + /** + * @return int|null UNIX timestamp to delay running this job until, otherwise null + * @since 1.22 + */ + public function getReleaseTimestamp() { + return isset( $this->params['jobReleaseTimestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) + : null; + } + + /** + * @return bool Whether only one of each identical set of jobs should be run + */ + public function ignoreDuplicates() { + return $this->removeDuplicates; + } + + /** + * @return bool Whether this job can be retried on failure by job runners + * @since 1.21 + */ + public function allowRetries() { + return true; + } + + /** + * @return int Number of actually "work items" handled in this job + * @see $wgJobBackoffThrottling + * @since 1.23 + */ + public function workItemCount() { + return 1; + } + + /** + * Subclasses may need to override this to make duplication detection work. + * The resulting map conveys everything that makes the job unique. This is + * only checked if ignoreDuplicates() returns true, meaning that duplicate + * jobs are supposed to be ignored. + * + * @return array Map of key/values + * @since 1.21 + */ + public function getDeduplicationInfo() { + $info = array( + 'type' => $this->getType(), + 'namespace' => $this->getTitle()->getNamespace(), + 'title' => $this->getTitle()->getDBkey(), + 'params' => $this->getParams() + ); + if ( is_array( $info['params'] ) ) { + // Identical jobs with different "root" jobs should count as duplicates + unset( $info['params']['rootJobSignature'] ); + unset( $info['params']['rootJobTimestamp'] ); + // Likewise for jobs with different delay times + unset( $info['params']['jobReleaseTimestamp'] ); + } + + return $info; + } + + /** + * @see JobQueue::deduplicateRootJob() + * @param string $key A key that identifies the task + * @return array Map of: + * - rootJobSignature : hash (e.g. SHA1) that identifies the task + * - rootJobTimestamp : TS_MW timestamp of this instance of the task + * @since 1.21 + */ + public static function newRootJobParams( $key ) { + return array( + 'rootJobSignature' => sha1( $key ), + 'rootJobTimestamp' => wfTimestampNow() + ); + } + + /** + * @see JobQueue::deduplicateRootJob() + * @return array + * @since 1.21 + */ + public function getRootJobParams() { + return array( + 'rootJobSignature' => isset( $this->params['rootJobSignature'] ) + ? $this->params['rootJobSignature'] + : null, + 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] ) + ? $this->params['rootJobTimestamp'] + : null + ); + } + + /** + * @see JobQueue::deduplicateRootJob() + * @return bool + * @since 1.22 + */ + public function hasRootJobParams() { + return isset( $this->params['rootJobSignature'] ) + && isset( $this->params['rootJobTimestamp'] ); + } + + /** + * Insert a single job into the queue. + * @return bool True on success + * @deprecated since 1.21 + */ + public function insert() { + JobQueueGroup::singleton()->push( $this ); + return true; + } + + /** + * @return string + */ + public function toString() { + $truncFunc = function ( $value ) { + $value = (string)$value; + if ( mb_strlen( $value ) > 1024 ) { + $value = "string(" . mb_strlen( $value ) . ")"; + } + return $value; + }; + + $paramString = ''; + if ( $this->params ) { + foreach ( $this->params as $key => $value ) { + if ( $paramString != '' ) { + $paramString .= ' '; + } + if ( is_array( $value ) ) { + $filteredValue = array(); + foreach ( $value as $k => $v ) { + if ( is_scalar( $v ) ) { + $filteredValue[$k] = $truncFunc( $v ); + } else { + $filteredValue = null; + break; + } + } + if ( $filteredValue ) { + $value = FormatJson::encode( $filteredValue ); + } else { + $value = "array(" . count( $value ) . ")"; + } + } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) { + $value = "object(" . get_class( $value ) . ")"; + } + + $paramString .= "$key={$truncFunc( $value )}"; + } + } + + if ( is_object( $this->title ) ) { + $s = "{$this->command} " . $this->title->getPrefixedDBkey(); + if ( $paramString !== '' ) { + $s .= ' ' . $paramString; + } + + return $s; + } else { + return "{$this->command} $paramString"; + } + } + + protected function setLastError( $error ) { + $this->error = $error; + } + + public function getLastError() { + return $this->error; + } +} diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php new file mode 100644 index 00000000..c00d22e9 --- /dev/null +++ b/includes/jobqueue/JobQueue.php @@ -0,0 +1,740 @@ +<?php +/** + * Job queue base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @defgroup JobQueue JobQueue + * @author Aaron Schulz + */ + +/** + * Class to handle enqueueing and running of background jobs + * + * @ingroup JobQueue + * @since 1.21 + */ +abstract class JobQueue { + /** @var string Wiki ID */ + protected $wiki; + + /** @var string Job type */ + protected $type; + + /** @var string Job priority for pop() */ + protected $order; + + /** @var int Time to live in seconds */ + protected $claimTTL; + + /** @var int Maximum number of times to try a job */ + protected $maxTries; + + /** @var bool Allow delayed jobs */ + protected $checkDelay; + + /** @var BagOStuff */ + protected $dupCache; + + const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions + + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) + + /** + * @param array $params + * @throws MWException + */ + protected function __construct( array $params ) { + $this->wiki = $params['wiki']; + $this->type = $params['type']; + $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; + $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; + if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { + $this->order = $params['order']; + } else { + $this->order = $this->optimalOrder(); + } + if ( !in_array( $this->order, $this->supportedOrders() ) ) { + throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); + } + $this->checkDelay = !empty( $params['checkDelay'] ); + if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { + throw new MWException( __CLASS__ . " does not support delayed jobs." ); + } + $this->dupCache = wfGetCache( CACHE_ANYTHING ); + } + + /** + * Get a job queue object of the specified type. + * $params includes: + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. + * This lets delayed jobs wait in a staging area until a given timestamp is + * reached, at which point they will enter the queue. If this is not enabled + * or not supported, an exception will be thrown on delayed job insertion. + * + * Queue classes should throw an exception if they do not support the options given. + * + * @param array $params + * @return JobQueue + * @throws MWException + */ + final public static function factory( array $params ) { + $class = $params['class']; + if ( !class_exists( $class ) ) { + throw new MWException( "Invalid job queue class '$class'." ); + } + $obj = new $class( $params ); + if ( !( $obj instanceof self ) ) { + throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); + } + + return $obj; + } + + /** + * @return string Wiki ID + */ + final public function getWiki() { + return $this->wiki; + } + + /** + * @return string Job type that this queue handles + */ + final public function getType() { + return $this->type; + } + + /** + * @return string One of (random, timestamp, fifo, undefined) + */ + final public function getOrder() { + return $this->order; + } + + /** + * @return bool Whether delayed jobs are enabled + * @since 1.22 + */ + final public function delayedJobsEnabled() { + return $this->checkDelay; + } + + /** + * Get the allowed queue orders for configuration validation + * + * @return array Subset of (random, timestamp, fifo, undefined) + */ + abstract protected function supportedOrders(); + + /** + * Get the default queue order to use if configuration does not specify one + * + * @return string One of (random, timestamp, fifo, undefined) + */ + abstract protected function optimalOrder(); + + /** + * Find out if delayed jobs are supported for configuration validation + * + * @return bool Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this might return false when there are actually no jobs. + * If pop() is called and returns false then it should correct the cache. Also, + * calling flushCaches() first prevents this. However, this affect is typically + * not distinguishable from the race condition between isEmpty() and pop(). + * + * @return bool + * @throws JobQueueError + */ + final public function isEmpty() { + wfProfileIn( __METHOD__ ); + $res = $this->doIsEmpty(); + wfProfileOut( __METHOD__ ); + + return $res; + } + + /** + * @see JobQueue::isEmpty() + * @return bool + */ + abstract protected function doIsEmpty(); + + /** + * Get the number of available (unacquired, non-delayed) jobs in the queue. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + */ + final public function getSize() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetSize(); + wfProfileOut( __METHOD__ ); + + return $res; + } + + /** + * @see JobQueue::getSize() + * @return int + */ + abstract protected function doGetSize(); + + /** + * Get the number of acquired jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + */ + final public function getAcquiredCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAcquiredCount(); + wfProfileOut( __METHOD__ ); + + return $res; + } + + /** + * @see JobQueue::getAcquiredCount() + * @return int + */ + abstract protected function doGetAcquiredCount(); + + /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + * @since 1.22 + */ + final public function getDelayedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetDelayedCount(); + wfProfileOut( __METHOD__ ); + + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return int + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + + /** + * Get the number of acquired jobs that can no longer be attempted. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + */ + final public function getAbandonedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAbandonedCount(); + wfProfileOut( __METHOD__ ); + + return $res; + } + + /** + * @see JobQueue::getAbandonedCount() + * @return int + */ + protected function doGetAbandonedCount() { + return 0; // not implemented + } + + /** + * Push one or more jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param Job|array $jobs A single job or an array of Jobs + * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) + * @return void + * @throws JobQueueError + */ + final public function push( $jobs, $flags = 0 ) { + $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); + } + + /** + * Push a batch of jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param array $jobs List of Jobs + * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) + * @return void + * @throws MWException + */ + final public function batchPush( array $jobs, $flags = 0 ) { + if ( !count( $jobs ) ) { + return; // nothing to do + } + + foreach ( $jobs as $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); + } + } + + wfProfileIn( __METHOD__ ); + $this->doBatchPush( $jobs, $flags ); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::batchPush() + * @param array $jobs + * @param int $flags + */ + abstract protected function doBatchPush( array $jobs, $flags ); + + /** + * Pop a job off of the queue. + * This requires $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::pop() instead of this function. + * + * @throws MWException + * @return Job|bool Returns false if there are no jobs + */ + final public function pop() { + global $wgJobClasses; + + if ( $this->wiki !== wfWikiID() ) { + throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); + } elseif ( !isset( $wgJobClasses[$this->type] ) ) { + // Do not pop jobs if there is no class for the queue type + throw new MWException( "Unrecognized job type '{$this->type}'." ); + } + + wfProfileIn( __METHOD__ ); + $job = $this->doPop(); + wfProfileOut( __METHOD__ ); + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) { + // don't lose jobs over this + } + + return $job; + } + + /** + * @see JobQueue::pop() + * @return Job + */ + abstract protected function doPop(); + + /** + * Acknowledge that a job was completed. + * + * This does nothing for certain queue classes or if "claimTTL" is not set. + * Outside callers should use JobQueueGroup::ack() instead of this function. + * + * @param Job $job + * @return void + * @throws MWException + */ + final public function ack( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $this->doAck( $job ); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::ack() + * @param Job $job + */ + abstract protected function doAck( Job $job ); + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * This is used to turn older, duplicate, job entries into no-ops. The root job + * information will remain in the registry until it simply falls out of cache. + * + * This requires that $job has two special fields in the "params" array: + * - rootJobSignature : hash (e.g. SHA1) that identifies the task + * - rootJobTimestamp : TS_MW timestamp of this instance of the task + * + * A "root job" is a conceptual job that consist of potentially many smaller jobs + * that are actually inserted into the queue. For example, "refreshLinks" jobs are + * spawned when a template is edited. One can think of the task as "update links + * of pages that use template X" and an instance of that task as a "root job". + * However, what actually goes into the queue are range and leaf job subtypes. + * Since these jobs include things like page ID ranges and DB master positions, + * and can morph into smaller jobs recursively, simple duplicate detection + * for individual jobs being identical (like that of job_sha1) is not useful. + * + * In the case of "refreshLinks", if these jobs are still in the queue when the template + * is edited again, we want all of these old refreshLinks jobs for that template to become + * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. + * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a + * previous "root job" for the same task of "update links of pages that use template X". + * + * This does nothing for certain queue classes. + * + * @param Job $job + * @throws MWException + * @return bool + */ + final public function deduplicateRootJob( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $ok = $this->doDeduplicateRootJob( $job ); + wfProfileOut( __METHOD__ ); + + return $ok; + } + + /** + * @see JobQueue::deduplicateRootJob() + * @param Job $job + * @throws MWException + * @return bool + */ + protected function doDeduplicateRootJob( Job $job ) { + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param Job $job + * @throws MWException + * @return bool + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + wfProfileOut( __METHOD__ ); + + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Get the last time this root job was enqueued + $timestamp = $this->dupCache->get( $key ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } + + /** + * Deleted all unclaimed and delayed jobs from the queue + * + * @throws JobQueueError + * @since 1.22 + * @return void + */ + final public function delete() { + wfProfileIn( __METHOD__ ); + $this->doDelete(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::delete() + * @throws MWException + */ + protected function doDelete() { + throw new MWException( "This method is not implemented." ); + } + + /** + * Wait for any slaves or backup servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + * @throws JobQueueError + */ + final public function waitForBackups() { + wfProfileIn( __METHOD__ ); + $this->doWaitForBackups(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::waitForBackups() + * @return void + */ + protected function doWaitForBackups() { + } + + /** + * Return a map of task names to task definition maps. + * A "task" is a fast periodic queue maintenance action. + * Mutually exclusive tasks must implement their own locking in the callback. + * + * Each task value is an associative array with: + * - name : the name of the task + * - callback : a PHP callable that performs the task + * - period : the period in seconds corresponding to the task frequency + * + * @return array + */ + final public function getPeriodicTasks() { + $tasks = $this->doGetPeriodicTasks(); + foreach ( $tasks as $name => &$def ) { + $def['name'] = $name; + } + + return $tasks; + } + + /** + * @see JobQueue::getPeriodicTasks() + * @return array + */ + protected function doGetPeriodicTasks() { + return array(); + } + + /** + * Clear any process and persistent caches + * + * @return void + */ + final public function flushCaches() { + wfProfileIn( __METHOD__ ); + $this->doFlushCaches(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::flushCaches() + * @return void + */ + protected function doFlushCaches() { + } + + /** + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + */ + abstract public function getAllQueuedJobs(); + + /** + * Get an iterator to traverse over all delayed jobs in this queue. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + * @since 1.22 + */ + public function getAllDelayedJobs() { + return new ArrayIterator( array() ); // not implemented + } + + /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return null; + } + + /** + * Check whether each of the given queues are empty. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (list of non-empty queue types) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueuesWithJobs( array $types ) { + $section = new ProfileSection( __METHOD__ ); + + return $this->doGetSiblingQueuesWithJobs( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesWithJobs() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesWithJobs( array $types ) { + return null; // not supported + } + + /** + * Check the size of each of the given queues. + * For queues not served by the same store as this one, 0 is returned. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (job type => whether queue is empty) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueueSizes( array $types ) { + $section = new ProfileSection( __METHOD__ ); + + return $this->doGetSiblingQueueSizes( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return null; // not supported + } + + /** + * Call wfIncrStats() for the queue overall and for the queue type + * + * @param string $key Event type + * @param string $type Job type + * @param int $delta + * @param string $wiki Wiki ID (added in 1.23) + * @since 1.22 + */ + public static function incrStats( $key, $type, $delta = 1, $wiki = null ) { + wfIncrStats( $key, $delta ); + wfIncrStats( "{$key}-{$type}", $delta ); + if ( $wiki !== null ) { + wfIncrStats( "{$key}-{$type}-{$wiki}", $delta ); + } + } + + /** + * Namespace the queue with a key to isolate it for testing + * + * @param string $key + * @return void + * @throws MWException + */ + public function setTestingPrefix( $key ) { + throw new MWException( "Queue namespacing not supported for this queue type." ); + } +} + +/** + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueError extends MWException { +} + +class JobQueueConnectionError extends JobQueueError { +} diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php new file mode 100644 index 00000000..08873cc1 --- /dev/null +++ b/includes/jobqueue/JobQueueDB.php @@ -0,0 +1,849 @@ +<?php +/** + * Database-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle job queues stored in the DB + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueDB extends JobQueue { + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed + const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random + const MAX_OFFSET = 255; // integer; maximum number of rows to skip + + /** @var BagOStuff */ + protected $cache; + + /** @var bool|string Name of an external DB cluster. False if not set */ + protected $cluster = false; + + /** + * Additional parameters include: + * - cluster : The name of an external cluster registered via LBFactory. + * If not specified, the primary DB cluster for the wiki will be used. + * This can be overridden with a custom cluster so that DB handles will + * be retrieved via LBFactory::getExternalLB() and getConnection(). + * @param array $params + */ + protected function __construct( array $params ) { + global $wgMemc; + + parent::__construct( $params ); + + $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; + // Make sure that we don't use the SQL cache, which would be harmful + $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc; + } + + protected function supportedOrders() { + return array( 'random', 'timestamp', 'fifo' ); + } + + protected function optimalOrder() { + return 'random'; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + */ + protected function doIsEmpty() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + $dbr = $this->getSlaveDB(); + try { + $found = $dbr->selectField( // unclaimed job + 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); + + return !$found; + } + + /** + * @see JobQueue::doGetSize() + * @return int + */ + protected function doGetSize() { + $key = $this->getCacheKey( 'size' ); + + $size = $this->cache->get( $key ); + if ( is_int( $size ) ) { + return $size; + } + + try { + $dbr = $this->getSlaveDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); + + return $size; + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return int + */ + protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'acquiredcount' ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return int + * @throws MWException + */ + protected function doGetAbandonedCount() { + global $wgMemc; + + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'abandonedcount' ); + + $count = $wgMemc->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( + 'job_cmd' => $this->type, + "job_token != {$dbr->addQuotes( '' )}", + "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) + ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doBatchPush() + * @param array $jobs + * @param int $flags + * @throws DBError|Exception + * @return void + */ + protected function doBatchPush( array $jobs, $flags ) { + $dbw = $this->getMasterDB(); + + $that = $this; + $method = __METHOD__; + $dbw->onTransactionIdle( + function () use ( $dbw, $that, $jobs, $flags, $method ) { + $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); + } + ); + } + + /** + * This function should *not* be called outside of JobQueueDB + * + * @param IDatabase $dbw + * @param array $jobs + * @param int $flags + * @param string $method + * @throws DBError + * @return void + */ + public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { + if ( !count( $jobs ) ) { + return; + } + + $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated + $rowList = array(); // list of jobs for jobs that are are not de-duplicated + foreach ( $jobs as $job ) { + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } + } + + if ( $flags & self::QOS_ATOMIC ) { + $dbw->begin( $method ); // wrap all the job additions in one transaction + } + try { + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + array( + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ), + $method + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid slave lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { + $dbw->insert( 'job', $rowBatch, $method ); + } + JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki ); + JobQueue::incrStats( + 'job-insert-duplicate', + $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ), + $this->wiki + ); + } catch ( DBError $e ) { + if ( $flags & self::QOS_ATOMIC ) { + $dbw->rollback( $method ); + } + throw $e; + } + if ( $flags & self::QOS_ATOMIC ) { + $dbw->commit( $method ); + } + + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); + + return; + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + */ + protected function doPop() { + if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { + return false; // queue is empty + } + + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + $uuid = wfRandomString( 32 ); // pop attempt + $job = false; // job popped off + do { // retry when our row is invalid or deleted as a duplicate + // Try to reserve a row in the DB... + if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { + $row = $this->claimOldest( $uuid ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $row = $this->claimRandom( $uuid, $rand, $gte ); + } + // Check if we found a row to reserve... + if ( !$row ) { + $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); + break; // nothing to do + } + JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebug( "Row has invalid title '{$row->job_title}'.\n" ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + $job->metadata['id'] = $row->job_id; + break; // done + } while ( true ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $job; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @param int $rand Random unsigned integer (31 bits) + * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) + * @return stdClass|bool Row|false + */ + protected function claimRandom( $uuid, $rand, $gte ) { + $dbw = $this->getMasterDB(); + // Check cache to see if the queue has <= OFFSET items + $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); + + $row = false; // the row acquired + $invertedDirection = false; // whether one job_random direction was already scanned + // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT + // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is + // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot + // be used here with MySQL. + do { + if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows + // For small queues, using OFFSET will overshoot and return no rows more often. + // Instead, this uses job_random to pick a row (possibly checking both directions). + $ineq = $gte ? '>=' : '<='; + $dir = $gte ? 'ASC' : 'DESC'; + $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job + array( + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), + __METHOD__, + array( 'ORDER BY' => "job_random {$dir}" ) + ); + if ( !$row && !$invertedDirection ) { + $gte = !$gte; + $invertedDirection = true; + continue; // try the other direction + } + } else { // table *may* have >= MAX_OFFSET rows + // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU + // in MySQL if there are many rows for some reason. This uses a small OFFSET + // instead of job_random for reducing excess claim retries. + $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job + array( + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + ), + __METHOD__, + array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ) + ); + if ( !$row ) { + $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows + $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); + continue; // use job_random + } + } + + if ( $row ) { // claim the job + $dbw->update( 'job', // update by PK + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), + array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), + __METHOD__ + ); + // This might get raced out by another runner when claiming the previously + // selected row. The use of job_random should minimize this problem, however. + if ( !$dbw->affectedRows() ) { + $row = false; // raced out + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @return stdClass|bool Row|false + */ + protected function claimOldest( $uuid ) { + $dbw = $this->getMasterDB(); + + $row = false; // the row acquired + do { + if ( $dbw->getType() === 'mysql' ) { + // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the + // same table being changed in an UPDATE query in MySQL (gives Error: 1093). + // Oracle and Postgre have no such limitation. However, MySQL offers an + // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. + $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . + "SET " . + "job_token = {$dbw->addQuotes( $uuid ) }, " . + "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . + "job_attempts = job_attempts+1 " . + "WHERE ( " . + "job_cmd = {$dbw->addQuotes( $this->type )} " . + "AND job_token = {$dbw->addQuotes( '' )} " . + ") ORDER BY job_id ASC LIMIT 1", + __METHOD__ + ); + } else { + // Use a subquery to find the job, within an UPDATE to claim it. + // This uses as much of the DB wrapper functions as possible. + $dbw->update( 'job', + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), + array( 'job_id = (' . + $dbw->selectSQLText( 'job', 'job_id', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__, + array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) . + ')' + ), + __METHOD__ + ); + } + // Fetch any row that we just reserved... + if ( $dbw->affectedRows() ) { + $row = $dbw->selectRow( 'job', self::selectFields(), + array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ + ); + if ( !$row ) { // raced out by duplicate job removal + wfDebug( "Row deleted as duplicate by another process.\n" ); + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @throws MWException + * @return Job|bool + */ + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['id'] ) ) { + throw new MWException( "Job of type '{$job->getType()}' has no ID." ); + } + + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', + array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @throws MWException + * @return bool + */ + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $dbw = $this->getMasterDB(); + $cache = $this->dupCache; + $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) { + $timestamp = $cache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } ); + + return true; + } + + /** + * @see JobQueue::doDelete() + * @return bool + */ + protected function doDelete() { + $dbw = $this->getMasterDB(); + try { + $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return true; + } + + /** + * @see JobQueue::doWaitForBackups() + * @return void + */ + protected function doWaitForBackups() { + wfWaitForSlaves(); + } + + /** + * @return array + */ + protected function doGetPeriodicTasks() { + return array( + 'recycleAndDeleteStaleJobs' => array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) + ) + ); + } + + /** + * @return void + */ + protected function doFlushCaches() { + foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { + $this->cache->delete( $this->getCacheKey( $type ) ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + $dbr = $this->getSlaveDB(); + try { + return new MappedIterator( + $dbr->select( 'job', self::selectFields(), + array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), + function ( $row ) use ( $dbr ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false + ); + $job->metadata['id'] = $row->job_id; + return $job; + } + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + } + + public function getCoalesceLocationInternal() { + return $this->cluster + ? "DBCluster:{$this->cluster}:{$this->wiki}" + : "LBFactory:{$this->wiki}"; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + array( 'job_cmd' => $types ), __METHOD__ ); + + $types = array(); + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), + array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); + + $sizes = array(); + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + + return $sizes; + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return int Number of jobs recycled/deleted + */ + public function recycleAndDeleteStaleJobs() { + $now = time(); + $count = 0; // affected rows + $dbw = $this->getMasterDB(); + + try { + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } + + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', + array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left + __METHOD__ + ); + $ids = array_map( + function ( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_id' => $ids ), + __METHOD__ + ); + $affected = $dbw->affectedRows(); + $count += $affected; + JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki ); + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + } + } + + // Just destroy any stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale + ); + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; + } + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); + $ids = array_map( + function ( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); + $affected = $dbw->affectedRows(); + $count += $affected; + JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki ); + } + + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $count; + } + + /** + * @param IJobSpecification $job + * @return array + */ + protected function insertFields( IJobSpecification $job ) { + $dbw = $this->getMasterDB(); + + return array( + // Fields that describe the nature of the job + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), + // Additional job metadata + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => wfBaseConvert( + sha1( serialize( $job->getDeduplicationInfo() ) ), + 16, 36, 31 + ), + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + ); + } + + /** + * @throws JobQueueConnectionError + * @return DBConnRef + */ + protected function getSlaveDB() { + try { + return $this->getDB( DB_SLAVE ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @throws JobQueueConnectionError + * @return DBConnRef + */ + protected function getMasterDB() { + try { + return $this->getDB( DB_MASTER ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @param int $index (DB_SLAVE/DB_MASTER) + * @return DBConnRef + */ + protected function getDB( $index ) { + $lb = ( $this->cluster !== false ) + ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) + : wfGetLB( $this->wiki ); + + return $lb->getConnectionRef( $index, array(), $this->wiki ); + } + + /** + * @param string $property + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; + + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); + } + + /** + * @param array|bool $params + * @return string + */ + protected static function makeBlob( $params ) { + if ( $params !== false ) { + return serialize( $params ); + } else { + return ''; + } + } + + /** + * @param string $blob + * @return bool|mixed + */ + protected static function extractBlob( $blob ) { + if ( (string)$blob !== '' ) { + return unserialize( $blob ); + } else { + return false; + } + } + + /** + * @param DBError $e + * @throws JobQueueError + */ + protected function throwDBException( DBError $e ) { + throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); + } + + /** + * Return the list of job fields that should be selected. + * @since 1.23 + * @return array + */ + public static function selectFields() { + return array( + 'job_id', + 'job_cmd', + 'job_namespace', + 'job_title', + 'job_timestamp', + 'job_params', + 'job_random', + 'job_attempts', + 'job_token', + 'job_token_timestamp', + 'job_sha1', + ); + } +} diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php new file mode 100644 index 00000000..c4301eed --- /dev/null +++ b/includes/jobqueue/JobQueueFederated.php @@ -0,0 +1,559 @@ +<?php +/** + * Job queue code for federated queues. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle enqueueing and running of background jobs for federated queues + * + * This class allows for queues to be partitioned into smaller queues. + * A partition is defined by the configuration for a JobQueue instance. + * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a + * JobQueueFederated instance, which itself would consist of three JobQueueRedis + * instances, each using their own redis server. This would allow for the jobs + * to be split (evenly or based on weights) accross multiple servers if a single + * server becomes impractical or expensive. Different JobQueue classes can be mixed. + * + * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue + * is inherited by the partition queues. Additional configuration defines what + * section each wiki is in, what partition queues each section uses (and their weight), + * and the JobQueue configuration for each partition. Some sections might only need a + * single queue partition, like the sections for groups of small wikis. + * + * If used for performance, then $wgMainCacheType should be set to memcached/redis. + * Note that "fifo" cannot be used for the ordering, since the data is distributed. + * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, + * queue classes used by this should ignore down servers (with TTL) to avoid slowness. + * + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueFederated extends JobQueue { + /** @var HashRing */ + protected $partitionRing; + /** @var HashRing */ + protected $partitionPushRing; + /** @var array (partition name => JobQueue) reverse sorted by weight */ + protected $partitionQueues = array(); + + /** @var BagOStuff */ + protected $cache; + + /** @var int Maximum number of partitions to try */ + protected $maxPartitionsTry; + + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date + + /** + * @param array $params Possible keys: + * - sectionsByWiki : A map of wiki IDs to section names. + * Wikis will default to using the section "default". + * - partitionsBySection : Map of section names to maps of (partition name => weight). + * A section called 'default' must be defined if not all wikis + * have explicitly defined sections. + * - configByPartition : Map of queue partition names to configuration arrays. + * These configuration arrays are passed to JobQueue::factory(). + * The options set here are overriden by those passed to this + * the federated queue itself (e.g. 'order' and 'claimTTL'). + * - partitionsNoPush : List of partition names that can handle pop() but not push(). + * This can be used to migrate away from a certain partition. + * - maxPartitionsTry : Maximum number of times to attempt job insertion using + * different partition queues. This improves availability + * during failure, at the cost of added latency and somewhat + * less reliable job de-duplication mechanisms. + * @throws MWException + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $section = isset( $params['sectionsByWiki'][$this->wiki] ) + ? $params['sectionsByWiki'][$this->wiki] + : 'default'; + if ( !isset( $params['partitionsBySection'][$section] ) ) { + throw new MWException( "No configuration for section '$section'." ); + } + $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] ) + ? $params['maxPartitionsTry'] + : 2; + // Get the full partition map + $partitionMap = $params['partitionsBySection'][$section]; + arsort( $partitionMap, SORT_NUMERIC ); + // Get the partitions jobs can actually be pushed to + $partitionPushMap = $partitionMap; + if ( isset( $params['partitionsNoPush'] ) ) { + foreach ( $params['partitionsNoPush'] as $partition ) { + unset( $partitionPushMap[$partition] ); + } + } + // Get the config to pass to merge into each partition queue config + $baseConfig = $params; + foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry', + 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o + ) { + unset( $baseConfig[$o] ); // partition queue doesn't care about this + } + // Get the partition queue objects + foreach ( $partitionMap as $partition => $w ) { + if ( !isset( $params['configByPartition'][$partition] ) ) { + throw new MWException( "No configuration for partition '$partition'." ); + } + $this->partitionQueues[$partition] = JobQueue::factory( + $baseConfig + $params['configByPartition'][$partition] ); + } + // Ring of all partitions + $this->partitionRing = new HashRing( $partitionMap ); + // Get the ring of partitions to push jobs into + if ( count( $partitionPushMap ) === count( $partitionMap ) ) { + $this->partitionPushRing = clone $this->partitionRing; // faster + } else { + $this->partitionPushRing = new HashRing( $partitionPushMap ); + } + // Aggregate cache some per-queue values if there are multiple partition queues + $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); + } + + protected function supportedOrders() { + // No FIFO due to partitioning, though "rough timestamp order" is supported + return array( 'undefined', 'random', 'timestamp' ); + } + + protected function optimalOrder() { + return 'undefined'; // defer to the partitions + } + + protected function supportsDelayedJobs() { + return true; // defer checks to the partitions + } + + protected function doIsEmpty() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + $empty = true; + $failed = 0; + foreach ( $this->partitionQueues as $queue ) { + try { + $empty = $empty && $queue->doIsEmpty(); + } catch ( JobQueueError $e ) { + ++$failed; + MWExceptionHandler::logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG ); + return $empty; + } + + protected function doGetSize() { + return $this->getCrossPartitionSum( 'size', 'doGetSize' ); + } + + protected function doGetAcquiredCount() { + return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); + } + + protected function doGetDelayedCount() { + return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); + } + + protected function doGetAbandonedCount() { + return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); + } + + /** + * @param string $type + * @param string $method + * @return int + */ + protected function getCrossPartitionSum( $type, $method ) { + $key = $this->getCacheKey( $type ); + + $count = $this->cache->get( $key ); + if ( $count !== false ) { + return $count; + } + + $failed = 0; + foreach ( $this->partitionQueues as $queue ) { + try { + $count += $queue->$method(); + } catch ( JobQueueError $e ) { + ++$failed; + MWExceptionHandler::logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + protected function doBatchPush( array $jobs, $flags ) { + // Local ring variable that may be changed to point to a new ring on failure + $partitionRing = $this->partitionPushRing; + // Try to insert the jobs and update $partitionsTry on any failures. + // Retry to insert any remaning jobs again, ignoring the bad partitions. + $jobsLeft = $jobs; + // @codingStandardsIgnoreStart Generic.CodeAnalysis.ForLoopWithTestFunctionCall.NotAllowed + for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { + // @codingStandardsIgnoreEnd + try { + $partitionRing->getLiveRing(); + } catch ( UnexpectedValueException $e ) { + break; // all servers down; nothing to insert to + } + $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); + } + if ( count( $jobsLeft ) ) { + throw new JobQueueError( + "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); + } + } + + /** + * @param array $jobs + * @param HashRing $partitionRing + * @param int $flags + * @throws JobQueueError + * @return array List of Job object that could not be inserted + */ + protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { + $jobsLeft = array(); + + // Because jobs are spread across partitions, per-job de-duplication needs + // to use a consistent hash to avoid allowing duplicate jobs per partition. + // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. + $uJobsByPartition = array(); // (partition name => job list) + /** @var Job $job */ + foreach ( $jobs as $key => $job ) { + if ( $job->ignoreDuplicates() ) { + $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); + $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; + unset( $jobs[$key] ); + } + } + // Get the batches of jobs that are not de-duplicated + if ( $flags & self::QOS_ATOMIC ) { + $nuJobBatches = array( $jobs ); // all or nothing + } else { + // Split the jobs into batches and spread them out over servers if there + // are many jobs. This helps keep the partitions even. Otherwise, send all + // the jobs to a single partition queue to avoids the extra connections. + $nuJobBatches = array_chunk( $jobs, 300 ); + } + + // Insert the de-duplicated jobs into the queues... + foreach ( $uJobsByPartition as $partition => $jobBatch ) { + /** @var JobQueue $queue */ + $queue = $this->partitionQueues[$partition]; + try { + $ok = true; + $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + MWExceptionHandler::logException( $e ); + } + if ( $ok ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); + } else { + if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist + throw new JobQueueError( "Could not insert job(s), no partitions available." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + // Insert the jobs that are not de-duplicated into the queues... + foreach ( $nuJobBatches as $jobBatch ) { + $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); + $queue = $this->partitionQueues[$partition]; + try { + $ok = true; + $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + MWExceptionHandler::logException( $e ); + } + if ( $ok ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', self::CACHE_TTL_LONG ); + } else { + if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist + throw new JobQueueError( "Could not insert job(s), no partitions available." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + return $jobsLeft; + } + + protected function doPop() { + $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) + + $failed = 0; + while ( count( $partitionsTry ) ) { + $partition = ArrayUtils::pickRandom( $partitionsTry ); + if ( $partition === false ) { + break; // all partitions at 0 weight + } + + /** @var JobQueue $queue */ + $queue = $this->partitionQueues[$partition]; + try { + $job = $queue->pop(); + } catch ( JobQueueError $e ) { + ++$failed; + MWExceptionHandler::logException( $e ); + $job = false; + } + if ( $job ) { + $job->metadata['QueuePartition'] = $partition; + + return $job; + } else { + unset( $partitionsTry[$partition] ); // blacklist partition + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'true', self::CACHE_TTL_LONG ); + + return false; + } + + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['QueuePartition'] ) ) { + throw new MWException( "The given job has no defined partition name." ); + } + + return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); + } + + protected function doIsRootJobOldDuplicate( Job $job ) { + $params = $job->getRootJobParams(); + $sigature = $params['rootJobSignature']; + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + try { + return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); + } catch ( JobQueueError $e ) { + if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); + } + } + + return false; + } + + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getRootJobParams(); + $sigature = $params['rootJobSignature']; + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + try { + return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); + } catch ( JobQueueError $e ) { + if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); + } + } + + return false; + } + + protected function doDelete() { + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->doDelete(); + } catch ( JobQueueError $e ) { + ++$failed; + MWExceptionHandler::logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + return true; + } + + protected function doWaitForBackups() { + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->waitForBackups(); + } catch ( JobQueueError $e ) { + ++$failed; + MWExceptionHandler::logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + } + + protected function doGetPeriodicTasks() { + $tasks = array(); + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $partition => $queue ) { + foreach ( $queue->getPeriodicTasks() as $task => $def ) { + $tasks["{$partition}:{$task}"] = $def; + } + } + + return $tasks; + } + + protected function doFlushCaches() { + static $types = array( + 'empty', + 'size', + 'acquiredcount', + 'delayedcount', + 'abandonedcount' + ); + + foreach ( $types as $type ) { + $this->cache->delete( $this->getCacheKey( $type ) ); + } + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $queue->doFlushCaches(); + } + } + + public function getAllQueuedJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllQueuedJobs() ); + } + + return $iterator; + } + + public function getAllDelayedJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllDelayedJobs() ); + } + + return $iterator; + } + + public function getCoalesceLocationInternal() { + return "JobQueueFederated:wiki:{$this->wiki}" . + sha1( serialize( array_keys( $this->partitionQueues ) ) ); + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $result = array(); + + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); + if ( is_array( $nonEmpty ) ) { + $result = array_unique( array_merge( $result, $nonEmpty ) ); + } else { + return null; // not supported on all partitions; bail + } + if ( count( $result ) == count( $types ) ) { + break; // short-circuit + } + } catch ( JobQueueError $e ) { + ++$failed; + MWExceptionHandler::logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + return array_values( $result ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $result = array(); + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $sizes = $queue->doGetSiblingQueueSizes( $types ); + if ( is_array( $sizes ) ) { + foreach ( $sizes as $type => $size ) { + $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + } + } else { + return null; // not supported on all partitions; bail + } + } catch ( JobQueueError $e ) { + ++$failed; + MWExceptionHandler::logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + return $result; + } + + /** + * Throw an error if no partitions available + * + * @param int $down The number of up partitions down + * @return void + * @throws JobQueueError + */ + protected function throwErrorIfAllPartitionsDown( $down ) { + if ( $down >= count( $this->partitionQueues ) ) { + throw new JobQueueError( 'No queue partitions available.' ); + } + } + + public function setTestingPrefix( $key ) { + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $queue->setTestingPrefix( $key ); + } + } + + /** + * @param string $property + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); + } +} diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php new file mode 100644 index 00000000..98a78c5e --- /dev/null +++ b/includes/jobqueue/JobQueueGroup.php @@ -0,0 +1,440 @@ +<?php +/** + * Job queue base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle enqueueing of background jobs + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueGroup { + /** @var array */ + protected static $instances = array(); + + /** @var ProcessCacheLRU */ + protected $cache; + + /** @var string Wiki ID */ + protected $wiki; + + /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ + protected $coalescedQueues; + + const TYPE_DEFAULT = 1; // integer; jobs popped by default + const TYPE_ANY = 2; // integer; any job + + const USE_CACHE = 1; // integer; use process or persistent cache + + const PROC_CACHE_TTL = 15; // integer; seconds + + const CACHE_VERSION = 1; // integer; cache version + + /** + * @param string $wiki Wiki ID + */ + protected function __construct( $wiki ) { + $this->wiki = $wiki; + $this->cache = new ProcessCacheLRU( 10 ); + } + + /** + * @param bool|string $wiki Wiki ID + * @return JobQueueGroup + */ + public static function singleton( $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( !isset( self::$instances[$wiki] ) ) { + self::$instances[$wiki] = new self( $wiki ); + } + + return self::$instances[$wiki]; + } + + /** + * Destroy the singleton instances + * + * @return void + */ + public static function destroySingletons() { + self::$instances = array(); + } + + /** + * Get the job queue object for a given queue type + * + * @param string $type + * @return JobQueue + */ + public function get( $type ) { + global $wgJobTypeConf; + + $conf = array( 'wiki' => $this->wiki, 'type' => $type ); + if ( isset( $wgJobTypeConf[$type] ) ) { + $conf = $conf + $wgJobTypeConf[$type]; + } else { + $conf = $conf + $wgJobTypeConf['default']; + } + + return JobQueue::factory( $conf ); + } + + /** + * Insert jobs into the respective queues of with the belong. + * + * This inserts the jobs into the queue specified by $wgJobTypeConf + * and updates the aggregate job queue information cache as needed. + * + * @param Job|array $jobs A single Job or a list of Jobs + * @throws MWException + * @return void + */ + public function push( $jobs ) { + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + if ( !count( $jobs ) ) { + return; + } + + $jobsByType = array(); // (job type => list of jobs) + foreach ( $jobs as $job ) { + if ( $job instanceof IJobSpecification ) { + $jobsByType[$job->getType()][] = $job; + } else { + throw new MWException( "Attempted to push a non-Job object into a queue." ); + } + } + + foreach ( $jobsByType as $type => $jobs ) { + $this->get( $type )->push( $jobs ); + JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); + } + + if ( $this->cache->has( 'queues-ready', 'list' ) ) { + $list = $this->cache->get( 'queues-ready', 'list' ); + if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { + $this->cache->clear( 'queues-ready' ); + } + } + } + + /** + * Pop a job off one of the job queues + * + * This pops a job off a queue as specified by $wgJobTypeConf and + * updates the aggregate job queue information cache as needed. + * + * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string + * @param int $flags Bitfield of JobQueueGroup::USE_* constants + * @param array $blacklist List of job types to ignore + * @return Job|bool Returns false on failure + */ + public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) { + $job = false; + + if ( is_string( $qtype ) ) { // specific job type + if ( !in_array( $qtype, $blacklist ) ) { + $job = $this->get( $qtype )->pop(); + if ( !$job ) { + JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); + } + } + } else { // any job in the "default" jobs types + if ( $flags & self::USE_CACHE ) { + if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { + $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); + } + $types = $this->cache->get( 'queues-ready', 'list' ); + } else { + $types = $this->getQueuesWithJobs(); + } + + if ( $qtype == self::TYPE_DEFAULT ) { + $types = array_intersect( $types, $this->getDefaultQueueTypes() ); + } + + $types = array_diff( $types, $blacklist ); // avoid selected types + shuffle( $types ); // avoid starvation + + foreach ( $types as $type ) { // for each queue... + $job = $this->get( $type )->pop(); + if ( $job ) { // found + break; + } else { // not found + JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); + $this->cache->clear( 'queues-ready' ); + } + } + } + + return $job; + } + + /** + * Acknowledge that a job was completed + * + * @param Job $job + * @return bool + */ + public function ack( Job $job ) { + return $this->get( $job->getType() )->ack( $job ); + } + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * + * @param Job $job + * @return bool + */ + public function deduplicateRootJob( Job $job ) { + return $this->get( $job->getType() )->deduplicateRootJob( $job ); + } + + /** + * Wait for any slaves or backup queue servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + * @throws MWException + */ + public function waitForBackups() { + global $wgJobTypeConf; + + wfProfileIn( __METHOD__ ); + // Try to avoid doing this more than once per queue storage medium + foreach ( $wgJobTypeConf as $type => $conf ) { + $this->get( $type )->waitForBackups(); + } + wfProfileOut( __METHOD__ ); + } + + /** + * Get the list of queue types + * + * @return array List of strings + */ + public function getQueueTypes() { + return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); + } + + /** + * Get the list of default queue types + * + * @return array List of strings + */ + public function getDefaultQueueTypes() { + global $wgJobTypesExcludedFromDefaultQueue; + + return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); + } + + /** + * Check if there are any queues with jobs (this is cached) + * + * @param int $type JobQueueGroup::TYPE_* constant + * @return bool + * @since 1.23 + */ + public function queuesHaveJobs( $type = self::TYPE_ANY ) { + global $wgMemc; + + $key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type ); + + $value = $wgMemc->get( $key ); + if ( $value === false ) { + $queues = $this->getQueuesWithJobs(); + if ( $type == self::TYPE_DEFAULT ) { + $queues = array_intersect( $queues, $this->getDefaultQueueTypes() ); + } + $value = count( $queues ) ? 'true' : 'false'; + $wgMemc->add( $key, $value, 15 ); + } + + return ( $value === 'true' ); + } + + /** + * Get the list of job types that have non-empty queues + * + * @return array List of job types that have non-empty queues + */ + public function getQueuesWithJobs() { + $types = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); + if ( is_array( $nonEmpty ) ) { // batching features supported + $types = array_merge( $types, $nonEmpty ); + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + } + } + + return $types; + } + + /** + * Get the size of the queus for a list of job types + * + * @return array Map of (job type => size) + */ + public function getQueueSizes() { + $sizeMap = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); + if ( is_array( $sizes ) ) { // batching features supported + $sizeMap = $sizeMap + $sizes; + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + $sizeMap[$type] = $this->get( $type )->getSize(); + } + } + } + + return $sizeMap; + } + + /** + * @return array + */ + protected function getCoalescedQueues() { + global $wgJobTypeConf; + + if ( $this->coalescedQueues === null ) { + $this->coalescedQueues = array(); + foreach ( $wgJobTypeConf as $type => $conf ) { + $queue = JobQueue::factory( + array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); + $loc = $queue->getCoalesceLocationInternal(); + if ( !isset( $this->coalescedQueues[$loc] ) ) { + $this->coalescedQueues[$loc]['queue'] = $queue; + $this->coalescedQueues[$loc]['types'] = array(); + } + if ( $type === 'default' ) { + $this->coalescedQueues[$loc]['types'] = array_merge( + $this->coalescedQueues[$loc]['types'], + array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) + ); + } else { + $this->coalescedQueues[$loc]['types'][] = $type; + } + } + } + + return $this->coalescedQueues; + } + + /** + * Execute any due periodic queue maintenance tasks for all queues. + * + * A task is "due" if the time ellapsed since the last run is greater than + * the defined run period. Concurrent calls to this function will cause tasks + * to be attempted twice, so they may need their own methods of mutual exclusion. + * + * @return int Number of tasks run + */ + public function executeReadyPeriodicTasks() { + global $wgMemc; + + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); + $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) + + $count = 0; + $tasksRun = array(); // (queue => task => UNIX timestamp) + foreach ( $this->getQueueTypes() as $type ) { + $queue = $this->get( $type ); + foreach ( $queue->getPeriodicTasks() as $task => $definition ) { + if ( $definition['period'] <= 0 ) { + continue; // disabled + } elseif ( !isset( $lastRuns[$type][$task] ) + || $lastRuns[$type][$task] < ( time() - $definition['period'] ) + ) { + try { + if ( call_user_func( $definition['callback'] ) !== null ) { + $tasksRun[$type][$task] = time(); + ++$count; + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + } + // The tasks may have recycled jobs or release delayed jobs into the queue + if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) { + JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); + } + } + + if ( $count === 0 ) { + return $count; // nothing to update + } + + $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) { + if ( is_array( $lastRuns ) ) { + foreach ( $tasksRun as $type => $tasks ) { + foreach ( $tasks as $task => $timestamp ) { + if ( !isset( $lastRuns[$type][$task] ) + || $timestamp > $lastRuns[$type][$task] + ) { + $lastRuns[$type][$task] = $timestamp; + } + } + } + } else { + $lastRuns = $tasksRun; + } + + return $lastRuns; + } ); + + return $count; + } + + /** + * @param string $name + * @return mixed + */ + private function getCachedConfigVar( $name ) { + global $wgConf, $wgMemc; + + if ( $this->wiki === wfWikiID() ) { + return $GLOBALS[$name]; // common case + } else { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name ); + $value = $wgMemc->get( $key ); // ('v' => ...) or false + if ( is_array( $value ) ) { + return $value['v']; + } else { + $value = $wgConf->getConfig( $this->wiki, $name ); + $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) ); + + return $value; + } + } + } +} diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php new file mode 100644 index 00000000..3519eac8 --- /dev/null +++ b/includes/jobqueue/JobQueueRedis.php @@ -0,0 +1,865 @@ +<?php +/** + * Redis-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle job queues stored in Redis + * + * This is faster, less resource intensive, queue that JobQueueDB. + * All data for a queue using this class is placed into one redis server. + * + * There are eight main redis keys used to track jobs: + * - l-unclaimed : A list of job IDs used for ready unclaimed jobs + * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries + * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs + * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs + * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication + * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication + * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries + * - h-data : A hash of (job ID => serialized blobs) for job storage + * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. + * If an ID appears in any of those lists, it should have a h-data entry for its ID. + * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then + * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById + * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its + * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. + * + * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. + * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. + * All the keys are prefixed with the relevant wiki ID information. + * + * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. + * Additionally, it should be noted that redis has different persistence modes, such + * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be + * made on the servers based on what queues are using it and what tolerance they have. + * + * @ingroup JobQueue + * @ingroup Redis + * @since 1.22 + */ +class JobQueueRedis extends JobQueue { + /** @var RedisConnectionPool */ + protected $redisPool; + + /** @var string Server address */ + protected $server; + /** @var string Compression method to use */ + protected $compression; + /** @var bool */ + protected $daemonized; + + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) + + /** @var string Key to prefix the queue keys with (used for testing) */ + protected $key; + + /** + * @param array $params Possible keys: + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * Note that the serializer option is ignored as "none" is always used. + * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. + * - compression : The type of compression to use; one of (none,gzip). + * - daemonized : Set to true if the redisJobRunnerService runs in the background. + * This will disable job recycling/undelaying from the MediaWiki side + * to avoid redundance and out-of-sync configuration. + */ + public function __construct( array $params ) { + parent::__construct( $params ); + $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua + $this->server = $params['redisServer']; + $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + $this->daemonized = !empty( $params['daemonized'] ); + } + + protected function supportedOrders() { + return array( 'timestamp', 'fifo' ); + } + + protected function optimalOrder() { + return 'fifo'; + } + + protected function supportsDelayedJobs() { + return true; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + * @throws MWException + */ + protected function doIsEmpty() { + return $this->doGetSize() == 0; + } + + /** + * @see JobQueue::doGetSize() + * @return int + * @throws MWException + */ + protected function doGetSize() { + $conn = $this->getConnection(); + try { + return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return int + * @throws JobQueueError + */ + protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + $conn = $this->getConnection(); + try { + $conn->multi( Redis::PIPELINE ); + $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); + $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + + return array_sum( $conn->exec() ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doGetDelayedCount() + * @return int + * @throws JobQueueError + */ + protected function doGetDelayedCount() { + if ( !$this->checkDelay ) { + return 0; // no delayed jobs + } + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return int + * @throws JobQueueError + */ + protected function doGetAbandonedCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doBatchPush() + * @param array $jobs + * @param int $flags + * @return void + * @throws JobQueueError + */ + protected function doBatchPush( array $jobs, $flags ) { + // Convert the jobs into field maps (de-duplicated against each other) + $items = array(); // (job ID => job fields map) + foreach ( $jobs as $job ) { + $item = $this->getNewJobFields( $job ); + if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate + $items[$item['sha1']] = $item; + } else { + $items[$item['uuid']] = $item; + } + } + + if ( !count( $items ) ) { + return; // nothing to do + } + + $conn = $this->getConnection(); + try { + // Actually push the non-duplicate jobs into the queue... + if ( $flags & self::QOS_ATOMIC ) { + $batches = array( $items ); // all or nothing + } else { + $batches = array_chunk( $items, 500 ); // avoid tying up the server + } + $failed = 0; + $pushed = 0; + foreach ( $batches as $itemBatch ) { + $added = $this->pushBlobs( $conn, $itemBatch ); + if ( is_int( $added ) ) { + $pushed += $added; + } else { + $failed += count( $itemBatch ); + } + } + if ( $failed > 0 ) { + wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); + + throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." ); + } + JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $items ) - $failed - $pushed, $this->wiki ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @param RedisConnRef $conn + * @param array $items List of results from JobQueueRedis::getNewJobFields() + * @return int Number of jobs inserted (duplicates are ignored) + * @throws RedisException + */ + protected function pushBlobs( RedisConnRef $conn, array $items ) { + $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) + foreach ( $items as $item ) { + $args[] = (string)$item['uuid']; + $args[] = (string)$item['sha1']; + $args[] = (string)$item['rtimestamp']; + $args[] = (string)$this->serialize( $item ); + } + static $script = +<<<LUA + local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS) + if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end + local pushed = 0 + for i = 1,#ARGV,4 do + local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] + if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then + if 1*rtimestamp > 0 then + -- Insert into delayed queue (release time as score) + redis.call('zAdd',kDelayed,rtimestamp,id) + else + -- Insert into unclaimed queue + redis.call('lPush',kUnclaimed,id) + end + if sha1 ~= '' then + redis.call('hSet',kSha1ById,id,sha1) + redis.call('hSet',kIdBySha1,sha1,id) + end + redis.call('hSet',kData,id,blob) + pushed = pushed + 1 + end + end + return pushed +LUA; + return $conn->luaEval( $script, + array_merge( + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-delayed' ), # KEYS[4] + $this->getQueueKey( 'h-data' ), # KEYS[5] + ), + $args + ), + 5 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + * @throws JobQueueError + */ + protected function doPop() { + $job = false; + + // Push ready delayed jobs into the queue every 10 jobs to spread the load. + // This is also done as a periodic task, but we don't want too much done at once. + if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { + $this->recyclePruneAndUndelayJobs(); + } + + $conn = $this->getConnection(); + try { + do { + if ( $this->claimTTL > 0 ) { + // Keep the claimed job list down for high-traffic queues + if ( mt_rand( 0, 99 ) == 0 ) { + $this->recyclePruneAndUndelayJobs(); + } + $blob = $this->popAndAcquireBlob( $conn ); + } else { + $blob = $this->popAndDeleteBlob( $conn ); + } + if ( !is_string( $blob ) ) { + break; // no jobs; nothing to do + } + + JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); + $item = $this->unserialize( $blob ); + if ( $item === false ) { + wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); + continue; + } + + // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed + $job = $this->getJobFromFields( $item ); // may be false + } while ( !$job ); // job may be false if invalid + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $job; + } + + /** + * @param RedisConnRef $conn + * @return array Serialized string or false + * @throws RedisException + */ + protected function popAndDeleteBlob( RedisConnRef $conn ) { + static $script = +<<<LUA + local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS) + -- Pop an item off the queue + local id = redis.call('rpop',kUnclaimed) + if not id then return false end + -- Get the job data and remove it + local item = redis.call('hGet',kData,id) + redis.call('hDel',kData,id) + -- Allow new duplicates of this job + local sha1 = redis.call('hGet',kSha1ById,id) + if sha1 then redis.call('hDel',kIdBySha1,sha1) end + redis.call('hDel',kSha1ById,id) + -- Return the job data + return item +LUA; + return $conn->luaEval( $script, + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'h-data' ), # KEYS[4] + ), + 4 # number of first argument(s) that are keys + ); + } + + /** + * @param RedisConnRef $conn + * @return array Serialized string or false + * @throws RedisException + */ + protected function popAndAcquireBlob( RedisConnRef $conn ) { + static $script = +<<<LUA + local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) + -- Pop an item off the queue + local id = redis.call('rPop',kUnclaimed) + if not id then return false end + -- Allow new duplicates of this job + local sha1 = redis.call('hGet',kSha1ById,id) + if sha1 then redis.call('hDel',kIdBySha1,sha1) end + redis.call('hDel',kSha1ById,id) + -- Mark the jobs as claimed and return it + redis.call('zAdd',kClaimed,ARGV[1],id) + redis.call('hIncrBy',kAttempts,id,1) + return redis.call('hGet',kData,id) +LUA; + return $conn->luaEval( $script, + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-claimed' ), # KEYS[4] + $this->getQueueKey( 'h-attempts' ), # KEYS[5] + $this->getQueueKey( 'h-data' ), # KEYS[6] + time(), # ARGV[1] (injected to be replication-safe) + ), + 6 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @return Job|bool + * @throws MWException|JobQueueError + */ + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['uuid'] ) ) { + throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); + } + if ( $this->claimTTL > 0 ) { + $conn = $this->getConnection(); + try { + static $script = +<<<LUA + local kClaimed, kAttempts, kData = unpack(KEYS) + -- Unmark the job as claimed + redis.call('zRem',kClaimed,ARGV[1]) + redis.call('hDel',kAttempts,ARGV[1]) + -- Delete the job data itself + return redis.call('hDel',kData,ARGV[1]) +LUA; + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $job->metadata['uuid'] # ARGV[1] + ), + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + + return false; + } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @return bool + * @throws MWException|JobQueueError + */ + protected function doDeduplicateRootJob( Job $job ) { + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + + $conn = $this->getConnection(); + try { + $timestamp = $conn->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doIsRootJobOldDuplicate() + * @param Job $job + * @return bool + * @throws JobQueueError + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $conn = $this->getConnection(); + try { + // Get the last time this root job was enqueued + $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @see JobQueue::doDelete() + * @return bool + * @throws JobQueueError + */ + protected function doDelete() { + static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', + 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); + + $conn = $this->getConnection(); + try { + $keys = array(); + foreach ( $props as $prop ) { + $keys[] = $this->getQueueKey( $prop ); + } + + return ( $conn->delete( $keys ) !== false ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + + return new MappedIterator( + $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), + function ( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllDelayedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + + return new MappedIterator( // delayed jobs + $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), + function ( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + public function getCoalesceLocationInternal() { + return "RedisServer:" . $this->server; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $sizes = array(); // (type => size) + $types = array_values( $types ); // reindex + $conn = $this->getConnection(); + try { + $conn->multi( Redis::PIPELINE ); + foreach ( $types as $type ) { + $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); + } + $res = $conn->exec(); + if ( is_array( $res ) ) { + foreach ( $res as $i => $size ) { + $sizes[$types[$i]] = $size; + } + } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $sizes; + } + + /** + * This function should not be called outside JobQueueRedis + * + * @param string $uid + * @param RedisConnRef $conn + * @return Job|bool Returns false if the job does not exist + * @throws MWException|JobQueueError + */ + public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { + try { + $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); + if ( $data === false ) { + return false; // not found + } + $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); + if ( !is_array( $item ) ) { // this shouldn't happen + throw new MWException( "Could not find job with ID '$uid'." ); + } + $title = Title::makeTitle( $item['namespace'], $item['title'] ); + $job = Job::factory( $item['type'], $title, $item['params'] ); + $job->metadata['uuid'] = $item['uuid']; + + return $job; + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * and release any ready delayed jobs into the queue + * + * @return int Number of jobs recycled/deleted/undelayed + * @throws MWException|JobQueueError + */ + public function recyclePruneAndUndelayJobs() { + $count = 0; + // For each job item that can be retried, we need to add it back to the + // main queue and remove it from the list of currenty claimed job items. + // For those that cannot, they are marked as dead and kept around for + // investigation and manual job restoration but are eventually deleted. + $conn = $this->getConnection(); + try { + $now = time(); + static $script = +<<<LUA + local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS) + local released,abandoned,pruned,undelayed = 0,0,0,0 + -- Get all non-dead jobs that have an expired claim on them. + -- The score for each item is the last claim timestamp (UNIX). + local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1]) + for k,id in ipairs(staleClaims) do + local timestamp = redis.call('zScore',kClaimed,id) + local attempts = redis.call('hGet',kAttempts,id) + if attempts < ARGV[3] then + -- Claim expired and retries left: re-enqueue the job + redis.call('lPush',kUnclaimed,id) + redis.call('hIncrBy',kAttempts,id,1) + released = released + 1 + else + -- Claim expired and no retries left: mark the job as dead + redis.call('zAdd',kAbandoned,timestamp,id) + abandoned = abandoned + 1 + end + redis.call('zRem',kClaimed,id) + end + -- Get all of the dead jobs that have been marked as dead for too long. + -- The score for each item is the last claim timestamp (UNIX). + local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2]) + for k,id in ipairs(deadClaims) do + -- Stale and out of retries: remove any traces of the job + redis.call('zRem',kAbandoned,id) + redis.call('hDel',kAttempts,id) + redis.call('hDel',kData,id) + pruned = pruned + 1 + end + -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp) + local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4]) + -- Migrate the jobs from the "delayed" set to the "unclaimed" list + for k,id in ipairs(ids) do + redis.call('lPush',kUnclaimed,id) + redis.call('zRem',kDelayed,id) + end + undelayed = #ids + return {released,abandoned,pruned,undelayed} +LUA; + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] + $this->getQueueKey( 'h-data' ), # KEYS[4] + $this->getQueueKey( 'z-abandoned' ), # KEYS[5] + $this->getQueueKey( 'z-delayed' ), # KEYS[6] + $now - $this->claimTTL, # ARGV[1] + $now - self::MAX_AGE_PRUNE, # ARGV[2] + $this->maxTries, # ARGV[3] + $now # ARGV[4] + ), + 6 # number of first argument(s) that are keys + ); + if ( $res ) { + list( $released, $abandoned, $pruned, $undelayed ) = $res; + $count += $released + $pruned + $undelayed; + JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); + JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); + JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki ); + } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $count; + } + + /** + * @return array + */ + protected function doGetPeriodicTasks() { + if ( $this->daemonized ) { + return array(); // managed in the runner loop + } + $periods = array( 3600 ); // standard cleanup (useful on config change) + if ( $this->claimTTL > 0 ) { + $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing + } + if ( $this->checkDelay ) { + $periods[] = 300; // 5 minutes + } + $period = min( $periods ); + $period = max( $period, 30 ); // sanity + + return array( + 'recyclePruneAndUndelayJobs' => array( + 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), + 'period' => $period, + ) + ); + } + + /** + * @param IJobSpecification $job + * @return array + */ + protected function getNewJobFields( IJobSpecification $job ) { + return array( + // Fields that describe the nature of the job + 'type' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getDBkey(), + 'params' => $job->getParams(), + // Some jobs cannot run until a "release timestamp" + 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, + // Additional job metadata + 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), + 'sha1' => $job->ignoreDuplicates() + ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) + : '', + 'timestamp' => time() // UNIX timestamp + ); + } + + /** + * @param array $fields + * @return Job|bool + */ + protected function getJobFromFields( array $fields ) { + $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); + if ( $title ) { + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['uuid'] = $fields['uuid']; + + return $job; + } + + return false; + } + + /** + * @param array $fields + * @return string Serialized and possibly compressed version of $fields + */ + protected function serialize( array $fields ) { + $blob = serialize( $fields ); + if ( $this->compression === 'gzip' + && strlen( $blob ) >= 1024 + && function_exists( 'gzdeflate' ) + ) { + $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); + $blobz = serialize( $object ); + + return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; + } else { + return $blob; + } + } + + /** + * @param string $blob + * @return array|bool Unserialized version of $blob or false + */ + protected function unserialize( $blob ) { + $fields = unserialize( $blob ); + if ( is_object( $fields ) ) { + if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { + $fields = unserialize( gzinflate( $fields->blob ) ); + } else { + $fields = false; + } + } + + return is_array( $fields ) ? $fields : false; + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return RedisConnRef + * @throws JobQueueConnectionError + */ + protected function getConnection() { + $conn = $this->redisPool->getConnection( $this->server ); + if ( !$conn ) { + throw new JobQueueConnectionError( "Unable to connect to redis server." ); + } + + return $conn; + } + + /** + * @param RedisConnRef $conn + * @param RedisException $e + * @throws JobQueueError + */ + protected function throwRedisException( RedisConnRef $conn, $e ) { + $this->redisPool->handleError( $conn, $e ); + throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); + } + + /** + * @param string $prop + * @param string|null $type + * @return string + */ + private function getQueueKey( $prop, $type = null ) { + $type = is_string( $type ) ? $type : $this->type; + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + if ( strlen( $this->key ) ) { // namespaced queue (for testing) + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); + } else { + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); + } + } + + /** + * @param string $key + * @return void + */ + public function setTestingPrefix( $key ) { + $this->key = $key; + } +} diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php new file mode 100644 index 00000000..8cccedaf --- /dev/null +++ b/includes/jobqueue/JobRunner.php @@ -0,0 +1,350 @@ +<?php +/** + * Job queue runner utility methods + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job queue runner utility methods + * + * @ingroup JobQueue + * @since 1.24 + */ +class JobRunner { + /** @var callable|null Debug output handler */ + protected $debug; + + /** + * @param callable $debug Optional debug output handler + */ + public function setDebugHandler( $debug ) { + $this->debug = $debug; + } + + /** + * Run jobs of the specified number/type for the specified time + * + * The response map has a 'job' field that lists status of each job, including: + * - type : the job type + * - status : ok/failed + * - error : any error message string + * - time : the job run time in ms + * The response map also has: + * - backoffs : the (job type => seconds) map of backoff times + * - elapsed : the total time spent running tasks in ms + * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit) + * + * This method outputs status information only if a debug handler was set. + * Any exceptions are caught and logged, but are not reported as output. + * + * @param array $options Map of parameters: + * - type : the job type (or false for the default types) + * - maxJobs : maximum number of jobs to run + * - maxTime : maximum time in seconds before stopping + * - throttle : whether to respect job backoff configuration + * @return array Summary response that can easily be JSON serialized + */ + public function run( array $options ) { + $response = array( 'jobs' => array(), 'reached' => 'none-ready' ); + + $type = isset( $options['type'] ) ? $options['type'] : false; + $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false; + $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false; + $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; + + $group = JobQueueGroup::singleton(); + // Handle any required periodic queue maintenance + $count = $group->executeReadyPeriodicTasks(); + if ( $count > 0 ) { + $this->runJobsLog( "Executed $count periodic queue task(s)." ); + } + + // Flush any pending DB writes for sanity + wfGetLBFactory()->commitMasterChanges(); + + // Some jobs types should not run until a certain timestamp + $backoffs = array(); // map of (type => UNIX expiry) + $backoffDeltas = array(); // map of (type => seconds) + $wait = 'wait'; // block to read backoffs the first time + + $jobsRun = 0; + $timeMsTotal = 0; + $flags = JobQueueGroup::USE_CACHE; + $startTime = microtime( true ); // time since jobs started running + $lastTime = microtime( true ); // time since last slave check + do { + // Sync the persistent backoffs with concurrent runners + $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); + $blacklist = $noThrottle ? array() : array_keys( $backoffs ); + $wait = 'nowait'; // less important now + + if ( $type === false ) { + $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist ); + } elseif ( in_array( $type, $blacklist ) ) { + $job = false; // requested queue in backoff state + } else { + $job = $group->pop( $type ); // job from a single queue + } + + if ( $job ) { // found a job + $jType = $job->getType(); + + // Back off of certain jobs for a while (for throttling and for errors) + $ttw = $this->getBackoffTimeToWait( $job ); + if ( $ttw > 0 ) { + // Always add the delta for other runners in case the time running the + // job negated the backoff for each individually but not collectively. + $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) + ? $backoffDeltas[$jType] + $ttw + : $ttw; + $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); + } + + $this->runJobsLog( $job->toString() . " STARTING" ); + + // Run the job... + wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); + $jobStartTime = microtime( true ); + try { + ++$jobsRun; + $status = $job->run(); + $error = $job->getLastError(); + wfGetLBFactory()->commitMasterChanges(); + } catch ( MWException $e ) { + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + $status = false; + $error = get_class( $e ) . ': ' . $e->getMessage(); + MWExceptionHandler::logException( $e ); + } + $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); + wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); + $timeMsTotal += $timeMs; + + // Mark the job as done on success or when the job cannot be retried + if ( $status !== false || !$job->allowRetries() ) { + $group->ack( $job ); // done + } + + // Back off of certain jobs for a while (for throttling and for errors) + if ( $status === false && mt_rand( 0, 49 ) == 0 ) { + $ttw = max( $ttw, 30 ); // too many errors + $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) + ? $backoffDeltas[$jType] + $ttw + : $ttw; + } + + if ( $status === false ) { + $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); + } else { + $this->runJobsLog( $job->toString() . " t=$timeMs good" ); + } + + $response['jobs'][] = array( + 'type' => $jType, + 'status' => ( $status === false ) ? 'failed' : 'ok', + 'error' => $error, + 'time' => $timeMs + ); + + // Break out if we hit the job count or wall time limits... + if ( $maxJobs && $jobsRun >= $maxJobs ) { + $response['reached'] = 'job-limit'; + break; + } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) { + $response['reached'] = 'time-limit'; + break; + } + + // Don't let any of the main DB slaves get backed up + $timePassed = microtime( true ) - $lastTime; + if ( $timePassed >= 5 || $timePassed < 0 ) { + wfWaitForSlaves( $lastTime ); + $lastTime = microtime( true ); + } + // Don't let any queue slaves/backups fall behind + if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) { + $group->waitForBackups(); + } + + // Bail if near-OOM instead of in a job + $this->assertMemoryOK(); + } + } while ( $job ); // stop when there are no jobs + + // Sync the persistent backoffs for the next runJobs.php pass + if ( $backoffDeltas ) { + $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' ); + } + + $response['backoffs'] = $backoffs; + $response['elapsed'] = $timeMsTotal; + + return $response; + } + + /** + * @param Job $job + * @return int Seconds for this runner to avoid doing more jobs of this type + * @see $wgJobBackoffThrottling + */ + private function getBackoffTimeToWait( Job $job ) { + global $wgJobBackoffThrottling; + + if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) || + $job instanceof DuplicateJob // no work was done + ) { + return 0; // not throttled + } + + $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; + if ( $itemsPerSecond <= 0 ) { + return 0; // not throttled + } + + $seconds = 0; + if ( $job->workItemCount() > 0 ) { + $exactSeconds = $job->workItemCount() / $itemsPerSecond; + // use randomized rounding + $seconds = floor( $exactSeconds ); + $remainder = $exactSeconds - $seconds; + $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0; + } + + return (int)$seconds; + } + + /** + * Get the previous backoff expiries from persistent storage + * On I/O or lock acquisition failure this returns the original $backoffs. + * + * @param array $backoffs Map of (job type => UNIX timestamp) + * @param string $mode Lock wait mode - "wait" or "nowait" + * @return array Map of (job type => backoff expiry timestamp) + */ + private function loadBackoffs( array $backoffs, $mode = 'wait' ) { + $section = new ProfileSection( __METHOD__ ); + + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + if ( is_file( $file ) ) { + $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; + $handle = fopen( $file, 'rb' ); + if ( !flock( $handle, LOCK_SH | $noblock ) ) { + fclose( $handle ); + return $backoffs; // don't wait on lock + } + $content = stream_get_contents( $handle ); + flock( $handle, LOCK_UN ); + fclose( $handle ); + $ctime = microtime( true ); + $cBackoffs = json_decode( $content, true ) ?: array(); + foreach ( $cBackoffs as $type => $timestamp ) { + if ( $timestamp < $ctime ) { + unset( $cBackoffs[$type] ); + } + } + } else { + $cBackoffs = array(); + } + + return $cBackoffs; + } + + /** + * Merge the current backoff expiries from persistent storage + * + * The $deltas map is set to an empty array on success. + * On I/O or lock acquisition failure this returns the original $backoffs. + * + * @param array $backoffs Map of (job type => UNIX timestamp) + * @param array $deltas Map of (job type => seconds) + * @param string $mode Lock wait mode - "wait" or "nowait" + * @return array The new backoffs account for $backoffs and the latest file data + */ + private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { + $section = new ProfileSection( __METHOD__ ); + + if ( !$deltas ) { + return $this->loadBackoffs( $backoffs, $mode ); + } + + $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + $handle = fopen( $file, 'wb+' ); + if ( !flock( $handle, LOCK_EX | $noblock ) ) { + fclose( $handle ); + return $backoffs; // don't wait on lock + } + $ctime = microtime( true ); + $content = stream_get_contents( $handle ); + $cBackoffs = json_decode( $content, true ) ?: array(); + foreach ( $deltas as $type => $seconds ) { + $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime + ? $cBackoffs[$type] + $seconds + : $ctime + $seconds; + } + foreach ( $cBackoffs as $type => $timestamp ) { + if ( $timestamp < $ctime ) { + unset( $cBackoffs[$type] ); + } + } + ftruncate( $handle, 0 ); + fwrite( $handle, json_encode( $cBackoffs ) ); + flock( $handle, LOCK_UN ); + fclose( $handle ); + + $deltas = array(); + + return $cBackoffs; + } + + /** + * Make sure that this script is not too close to the memory usage limit. + * It is better to die in between jobs than OOM right in the middle of one. + * @throws MWException + */ + private function assertMemoryOK() { + static $maxBytes = null; + if ( $maxBytes === null ) { + $m = array(); + if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) { + list( , $num, $unit ) = $m; + $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ); + $maxBytes = $num * $conv[strtolower( $unit )]; + } else { + $maxBytes = 0; + } + } + $usedBytes = memory_get_usage(); + if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { + throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." ); + } + } + + /** + * Log the job message + * @param string $msg The message to log + */ + private function runJobsLog( $msg ) { + if ( $this->debug ) { + call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) ); + } + wfDebugLog( 'runJobs', $msg ); + } +} diff --git a/includes/jobqueue/JobSpecification.php b/includes/jobqueue/JobSpecification.php new file mode 100644 index 00000000..9fa7747f --- /dev/null +++ b/includes/jobqueue/JobSpecification.php @@ -0,0 +1,189 @@ +<?php +/** + * Job queue task description base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job queue task description interface + * + * @ingroup JobQueue + * @since 1.23 + */ +interface IJobSpecification { + /** + * @return string Job type + */ + public function getType(); + + /** + * @return array + */ + public function getParams(); + + /** + * @return int|null UNIX timestamp to delay running this job until, otherwise null + */ + public function getReleaseTimestamp(); + + /** + * @return bool Whether only one of each identical set of jobs should be run + */ + public function ignoreDuplicates(); + + /** + * Subclasses may need to override this to make duplication detection work. + * The resulting map conveys everything that makes the job unique. This is + * only checked if ignoreDuplicates() returns true, meaning that duplicate + * jobs are supposed to be ignored. + * + * @return array Map of key/values + */ + public function getDeduplicationInfo(); + + /** + * @return Title Descriptive title (this can simply be informative) + */ + public function getTitle(); +} + +/** + * Job queue task description base code + * + * Example usage: + * <code> + * $job = new JobSpecification( + * 'null', + * array( 'lives' => 1, 'usleep' => 100, 'pi' => 3.141569 ), + * array( 'removeDuplicates' => 1 ), + * Title::makeTitle( NS_SPECIAL, 'nullity' ) + * ); + * JobQueueGroup::singleton()->push( $job ) + * </code> + * + * @ingroup JobQueue + * @since 1.23 + */ +class JobSpecification implements IJobSpecification { + /** @var string */ + protected $type; + + /** @var array Array of job parameters or false if none */ + protected $params; + + /** @var Title */ + protected $title; + + /** @var bool Expensive jobs may set this to true */ + protected $ignoreDuplicates; + + /** + * @param string $type + * @param array $params Map of key/values + * @param array $opts Map of key/values + * @param Title $title Optional descriptive title + */ + public function __construct( + $type, array $params, array $opts = array(), Title $title = null + ) { + $this->validateParams( $params ); + + $this->type = $type; + $this->params = $params; + $this->title = $title ?: Title::newMainPage(); + $this->ignoreDuplicates = !empty( $opts['removeDuplicates'] ); + } + + /** + * @param array $params + */ + protected function validateParams( array $params ) { + foreach ( $params as $p => $v ) { + if ( is_array( $v ) ) { + $this->validateParams( $v ); + } elseif ( !is_scalar( $v ) && $v !== null ) { + throw new UnexpectedValueException( "Job parameter $p is not JSON serializable." ); + } + } + } + + /** + * @return string + */ + public function getType() { + return $this->type; + } + + /** + * @return Title + */ + public function getTitle() { + return $this->title; + } + + /** + * @return array + */ + public function getParams() { + return $this->params; + } + + /** + * @return int|null UNIX timestamp to delay running this job until, otherwise null + */ + public function getReleaseTimestamp() { + return isset( $this->params['jobReleaseTimestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) + : null; + } + + /** + * @return bool Whether only one of each identical set of jobs should be run + */ + public function ignoreDuplicates() { + return $this->ignoreDuplicates; + } + + /** + * Subclasses may need to override this to make duplication detection work. + * The resulting map conveys everything that makes the job unique. This is + * only checked if ignoreDuplicates() returns true, meaning that duplicate + * jobs are supposed to be ignored. + * + * @return array Map of key/values + */ + public function getDeduplicationInfo() { + $info = array( + 'type' => $this->getType(), + 'namespace' => $this->getTitle()->getNamespace(), + 'title' => $this->getTitle()->getDBkey(), + 'params' => $this->getParams() + ); + if ( is_array( $info['params'] ) ) { + // Identical jobs with different "root" jobs should count as duplicates + unset( $info['params']['rootJobSignature'] ); + unset( $info['params']['rootJobTimestamp'] ); + // Likewise for jobs with different delay times + unset( $info['params']['jobReleaseTimestamp'] ); + } + + return $info; + } +} diff --git a/includes/jobqueue/README b/includes/jobqueue/README new file mode 100644 index 00000000..c11d5a78 --- /dev/null +++ b/includes/jobqueue/README @@ -0,0 +1,81 @@ +/*! +\ingroup JobQueue +\page jobqueue_design Job queue design + +Notes on the Job queuing system architecture. + +\section intro Introduction + +The data model consist of the following main components: +* The Job object represents a particular deferred task that happens in the + background. All jobs subclass the Job object and put the main logic in the + function called run(). +* The JobQueue object represents a particular queue of jobs of a certain type. + For example there may be a queue for email jobs and a queue for squid purge + jobs. + +\section jobqueue Job queues + +Each job type has its own queue and is associated to a storage medium. One +queue might save its jobs in redis while another one uses would use a database. + +Storage medium are defined in a queue class. Before using it, you must +define in $wgJobTypeConf a mapping of the job type to a queue class. + +The factory class JobQueueGroup provides helper functions: +- getting the queue for a given job +- route new job insertions to the proper queue + +The following queue classes are available: +* JobQueueDB (stores jobs in the `job` table in a database) +* JobQueueRedis (stores jobs in a redis server) + +All queue classes support some basic operations (though some may be no-ops): +* enqueueing a batch of jobs +* dequeueing a single job +* acknowledging a job is completed +* checking if the queue is empty + +Some queue classes (like JobQueueDB) may dequeue jobs in random order while other +queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs +are executed in FIFO order. + +Also note that not all queue classes will have the same reliability guarantees. +In-memory queues may lose data when restarted depending on snapshot and journal +settings (including journal fsync() frequency). Some queue types may totally remove +jobs when dequeued while leaving the ack() function as a no-op; if a job is +dequeued by a job runner, which crashes before completion, the job will be +lost. Some jobs, like purging squid caches after a template change, may not +require durable queues, whereas other jobs might be more important. + +\section aggregator Job queue aggregator + +The aggregators are used by nextJobDB.php, which is a script that will return a +random ready queue (on any wiki in the farm) that can be used with runJobs.php. +This can be used in conjunction with any scripts that handle wiki farm job queues. +Note that $wgLocalDatabases defines what wikis are in the wiki farm. + +Since each job type has its own queue, and wiki-farms may have many wikis, +there might be a large number of queues to keep track of. To avoid wasting +large amounts of time polling empty queues, aggregators exists to keep track +of which queues are ready. + +The following queue aggregator classes are available: +* JobQueueAggregatorMemc (uses $wgMemc to track ready queues) +* JobQueueAggregatorRedis (uses a redis server to track ready queues) + +Some aggregators cache data for a few minutes while others may be always up to date. +This can be an important factor for jobs that need a low pickup time (or latency). + +\section jobs Jobs + +Callers should also try to make jobs maintain correctness when executed twice. +This is useful for queues that actually implement ack(), since they may recycle +dequeued but un-acknowledged jobs back into the queue to be attempted again. If +a runner dequeues a job, runs it, but then crashes before calling ack(), the +job may be returned to the queue and run a second time. Jobs like cache purging can +happen several times without any correctness problems. However, a pathological case +would be if a bug causes the problem to systematically keep repeating. For example, +a job may always throw a DB error at the end of run(). This problem is trickier to +solve and more obnoxious for things like email jobs, for example. For such jobs, +it might be useful to use a queue that does not retry jobs. diff --git a/includes/jobqueue/aggregator/JobQueueAggregator.php b/includes/jobqueue/aggregator/JobQueueAggregator.php new file mode 100644 index 00000000..8600eed9 --- /dev/null +++ b/includes/jobqueue/aggregator/JobQueueAggregator.php @@ -0,0 +1,162 @@ +<?php +/** + * Job queue aggregator code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle tracking information about all queues + * + * @ingroup JobQueue + * @since 1.21 + */ +abstract class JobQueueAggregator { + /** @var JobQueueAggregator */ + protected static $instance = null; + + /** + * @param array $params + */ + protected function __construct( array $params ) { + } + + /** + * @throws MWException + * @return JobQueueAggregator + */ + final public static function singleton() { + global $wgJobQueueAggregator; + + if ( !isset( self::$instance ) ) { + $class = $wgJobQueueAggregator['class']; + $obj = new $class( $wgJobQueueAggregator ); + if ( !( $obj instanceof JobQueueAggregator ) ) { + throw new MWException( "Class '$class' is not a JobQueueAggregator class." ); + } + self::$instance = $obj; + } + + return self::$instance; + } + + /** + * Destroy the singleton instance + * + * @return void + */ + final public static function destroySingleton() { + self::$instance = null; + } + + /** + * Mark a queue as being empty + * + * @param string $wiki + * @param string $type + * @return bool Success + */ + final public function notifyQueueEmpty( $wiki, $type ) { + wfProfileIn( __METHOD__ ); + $ok = $this->doNotifyQueueEmpty( $wiki, $type ); + wfProfileOut( __METHOD__ ); + + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueEmpty() + */ + abstract protected function doNotifyQueueEmpty( $wiki, $type ); + + /** + * Mark a queue as being non-empty + * + * @param string $wiki + * @param string $type + * @return bool Success + */ + final public function notifyQueueNonEmpty( $wiki, $type ) { + wfProfileIn( __METHOD__ ); + $ok = $this->doNotifyQueueNonEmpty( $wiki, $type ); + wfProfileOut( __METHOD__ ); + + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueNonEmpty() + */ + abstract protected function doNotifyQueueNonEmpty( $wiki, $type ); + + /** + * Get the list of all of the queues with jobs + * + * @return array (job type => (list of wiki IDs)) + */ + final public function getAllReadyWikiQueues() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAllReadyWikiQueues(); + wfProfileOut( __METHOD__ ); + + return $res; + } + + /** + * @see JobQueueAggregator::getAllReadyWikiQueues() + */ + abstract protected function doGetAllReadyWikiQueues(); + + /** + * Purge all of the aggregator information + * + * @return bool Success + */ + final public function purge() { + wfProfileIn( __METHOD__ ); + $res = $this->doPurge(); + wfProfileOut( __METHOD__ ); + + return $res; + } + + /** + * @see JobQueueAggregator::purge() + */ + abstract protected function doPurge(); + + /** + * Get all databases that have a pending job. + * This poll all the queues and is this expensive. + * + * @return array (job type => (list of wiki IDs)) + */ + protected function findPendingWikiQueues() { + global $wgLocalDatabases; + + $pendingDBs = array(); // (job type => (db list)) + foreach ( $wgLocalDatabases as $db ) { + foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) { + $pendingDBs[$type][] = $db; + } + } + + return $pendingDBs; + } +} diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php b/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php new file mode 100644 index 00000000..ae266ef3 --- /dev/null +++ b/includes/jobqueue/aggregator/JobQueueAggregatorMemc.php @@ -0,0 +1,125 @@ +<?php +/** + * Job queue aggregator code that uses BagOStuff. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle tracking information about all queues using BagOStuff + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueAggregatorMemc extends JobQueueAggregator { + /** @var BagOStuff */ + protected $cache; + + protected $cacheTTL; // integer; seconds + + /** + * @param array $params Possible keys: + * - objectCache : Name of an object cache registered in $wgObjectCaches. + * This defaults to the one specified by $wgMainCacheType. + * - cacheTTL : Seconds to cache the aggregate data before regenerating. + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $this->cache = isset( $params['objectCache'] ) + ? wfGetCache( $params['objectCache'] ) + : wfGetMainCache(); + $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min + } + + /** + * @see JobQueueAggregator::doNotifyQueueEmpty() + */ + protected function doNotifyQueueEmpty( $wiki, $type ) { + $key = $this->getReadyQueueCacheKey(); + // Delist the queue from the "ready queue" list + if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock + $curInfo = $this->cache->get( $key ); + if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { + if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { + $curInfo['pendingDBs'][$type] = array_diff( + $curInfo['pendingDBs'][$type], array( $wiki ) ); + $this->cache->set( $key, $curInfo ); + } + } + $this->cache->delete( "$key:lock" ); // unlock + } + + return true; + } + + /** + * @see JobQueueAggregator::doNotifyQueueNonEmpty() + */ + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + return true; // updated periodically + } + + /** + * @see JobQueueAggregator::doAllGetReadyWikiQueues() + */ + protected function doGetAllReadyWikiQueues() { + $key = $this->getReadyQueueCacheKey(); + // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, + // regenerate the cache. Use any available stale cache if another process is + // currently regenerating the pending DB information. + $pendingDbInfo = $this->cache->get( $key ); + if ( !is_array( $pendingDbInfo ) + || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL + || mt_rand( 0, 999 ) == 0 + ) { + if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock + $pendingDbInfo = array( + 'pendingDBs' => $this->findPendingWikiQueues(), + 'timestamp' => time() + ); + for ( $attempts = 1; $attempts <= 25; ++$attempts ) { + if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock + $this->cache->set( $key, $pendingDbInfo ); + $this->cache->delete( "$key:lock" ); // unlock + break; + } + } + $this->cache->delete( "$key:rebuild" ); // unlock + } + } + + return is_array( $pendingDbInfo ) + ? $pendingDbInfo['pendingDBs'] + : array(); // cache is both empty and locked + } + + /** + * @see JobQueueAggregator::doPurge() + */ + protected function doPurge() { + return $this->cache->delete( $this->getReadyQueueCacheKey() ); + } + + /** + * @return string + */ + private function getReadyQueueCacheKey() { + return "jobqueue:aggregator:ready-queues:v1"; // global + } +} diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php new file mode 100644 index 00000000..db9e764c --- /dev/null +++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php @@ -0,0 +1,218 @@ +<?php +/** + * Job queue aggregator code that uses PhpRedis. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle tracking information about all queues using PhpRedis + * + * @ingroup JobQueue + * @ingroup Redis + * @since 1.21 + */ +class JobQueueAggregatorRedis extends JobQueueAggregator { + /** @var RedisConnectionPool */ + protected $redisPool; + + /** @var array List of Redis server addresses */ + protected $servers; + + /** + * @param array $params Possible keys: + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * - redisServers : Array of server entries, the first being the primary and the + * others being fallback servers. Each entry is either a hostname/port + * combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $this->servers = isset( $params['redisServers'] ) + ? $params['redisServers'] + : array( $params['redisServer'] ); // b/c + $params['redisConfig']['serializer'] = 'none'; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + } + + protected function doNotifyQueueEmpty( $wiki, $type ) { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); + + return true; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + + return false; + } + } + + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->multi( Redis::PIPELINE ); + $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); + $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); + $conn->exec(); + + return true; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + + return false; + } + } + + protected function doGetAllReadyWikiQueues() { + $conn = $this->getConnection(); + if ( !$conn ) { + return array(); + } + try { + $map = $conn->hGetAll( $this->getReadyQueueKey() ); + + if ( is_array( $map ) && isset( $map['_epoch'] ) ) { + unset( $map['_epoch'] ); // ignore + $pendingDBs = array(); // (type => list of wikis) + foreach ( $map as $key => $time ) { + list( $type, $wiki ) = $this->dencQueueName( $key ); + $pendingDBs[$type][] = $wiki; + } + } else { + // Avoid duplicated effort + $rand = wfRandomString( 32 ); + $conn->multi( Redis::MULTI ); + $conn->setex( "{$rand}:lock", 3600, 1 ); + $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" ); + if ( $conn->exec() !== array( true, true ) ) { // lock + $conn->delete( "{$rand}:lock" ); + return array(); // already in progress + } + + $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) + + $conn->multi( Redis::PIPELINE ); + $now = time(); + $map = array( '_epoch' => time() ); // dummy key for empty Redis collections + foreach ( $pendingDBs as $type => $wikis ) { + $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); + foreach ( $wikis as $wiki ) { + $map[$this->encQueueName( $type, $wiki )] = $now; + } + } + $conn->hMSet( $this->getReadyQueueKey(), $map ); + $conn->exec(); + + $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock + } + + return $pendingDBs; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + + return array(); + } + } + + protected function doPurge() { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->delete( $this->getReadyQueueKey() ); + // leave key at getQueueTypesKey() alone + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + + return false; + } + + return true; + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return RedisConnRef|bool Returns false on failure + * @throws MWException + */ + protected function getConnection() { + $conn = false; + foreach ( $this->servers as $server ) { + $conn = $this->redisPool->getConnection( $server ); + if ( $conn ) { + break; + } + } + + return $conn; + } + + /** + * @param RedisConnRef $conn + * @param RedisException $e + * @return void + */ + protected function handleException( RedisConnRef $conn, $e ) { + $this->redisPool->handleError( $conn, $e ); + } + + /** + * @return string + */ + private function getReadyQueueKey() { + return "jobqueue:aggregator:h-ready-queues:v2"; // global + } + + /** + * @return string + */ + private function getQueueTypesKey() { + return "jobqueue:aggregator:h-queue-types:v2"; // global + } + + /** + * @param string $type + * @param string $wiki + * @return string + */ + private function encQueueName( $type, $wiki ) { + return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); + } + + /** + * @param string $name + * @return string + */ + private function dencQueueName( $name ) { + list( $type, $wiki ) = explode( '/', $name, 2 ); + + return array( rawurldecode( $type ), rawurldecode( $wiki ) ); + } +} diff --git a/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/includes/jobqueue/jobs/AssembleUploadChunksJob.php new file mode 100644 index 00000000..9e9bda6f --- /dev/null +++ b/includes/jobqueue/jobs/AssembleUploadChunksJob.php @@ -0,0 +1,136 @@ +<?php +/** + * Assemble the segments of a chunked upload. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Upload + */ + +/** + * Assemble the segments of a chunked upload. + * + * @ingroup Upload + */ +class AssembleUploadChunksJob extends Job { + public function __construct( $title, $params ) { + parent::__construct( 'AssembleUploadChunks', $title, $params ); + $this->removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); + try { + $user = $context->getUser(); + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + + return false; + } + + if ( count( $_SESSION ) === 0 ) { + // Empty session probably indicates that we didn't associate + // with the session correctly. Note that being able to load + // the user does not necessarily mean the session was loaded. + // Most likely cause by suhosin.session.encrypt = On. + $this->setLastError( "Error associating with user session. " . + "Try setting suhosin.session.encrypt = Off" ); + + return false; + } + + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ) + ); + + $upload = new UploadFromChunks( $user ); + $upload->continueChunks( + $this->params['filename'], + $this->params['filekey'], + $context->getRequest() + ); + + // Combine all of the chunks into a local file and upload that to a new stash file + $status = $upload->concatenateChunks(); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ) + ); + $this->setLastError( $status->getWikiText() ); + + return false; + } + + // We have a new filekey for the fully concatenated file + $newFileKey = $upload->getLocalFile()->getFileKey(); + + // Remove the old stash file row and first chunk file + $upload->stash->removeFileNoAuth( $this->params['filekey'] ); + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Success', + 'stage' => 'assembling', + 'filekey' => $newFileKey, + 'imageinfo' => $imageInfo, + 'status' => Status::newGood() + ) + ); + } catch ( MWException $e ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Failure', + 'stage' => 'assembling', + 'status' => Status::newFatal( 'api-error-stashfailed' ) + ) + ); + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); + // To be extra robust. + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + + return false; + } + + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); + } + + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/includes/jobqueue/jobs/DoubleRedirectJob.php b/includes/jobqueue/jobs/DoubleRedirectJob.php new file mode 100644 index 00000000..2561f2f1 --- /dev/null +++ b/includes/jobqueue/jobs/DoubleRedirectJob.php @@ -0,0 +1,250 @@ +<?php +/** + * Job to fix double redirects after moving a page. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job to fix double redirects after moving a page + * + * @ingroup JobQueue + */ +class DoubleRedirectJob extends Job { + /** @var string Reason for the change, 'maintenance' or 'move'. Suffix fo + * message key 'double-redirect-fixed-'. + */ + private $reason; + + /** @var Title The title which has changed, redirects pointing to this + * title are fixed + */ + private $redirTitle; + + /** @var User */ + private static $user; + + /** + * Insert jobs into the job queue to fix redirects to the given title + * @param string $reason The reason for the fix, see message + * "double-redirect-fixed-<reason>" + * @param Title $redirTitle The title which has changed, redirects + * pointing to this title are fixed + * @param bool $destTitle Not used + */ + public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) { + # Need to use the master to get the redirect table updated in the same transaction + $dbw = wfGetDB( DB_MASTER ); + $res = $dbw->select( + array( 'redirect', 'page' ), + array( 'page_namespace', 'page_title' ), + array( + 'page_id = rd_from', + 'rd_namespace' => $redirTitle->getNamespace(), + 'rd_title' => $redirTitle->getDBkey() + ), __METHOD__ ); + if ( !$res->numRows() ) { + return; + } + $jobs = array(); + foreach ( $res as $row ) { + $title = Title::makeTitle( $row->page_namespace, $row->page_title ); + if ( !$title ) { + continue; + } + + $jobs[] = new self( $title, array( + 'reason' => $reason, + 'redirTitle' => $redirTitle->getPrefixedDBkey() ) ); + # Avoid excessive memory usage + if ( count( $jobs ) > 10000 ) { + JobQueueGroup::singleton()->push( $jobs ); + $jobs = array(); + } + } + JobQueueGroup::singleton()->push( $jobs ); + } + + /** + * @param Title $title + * @param array|bool $params + */ + function __construct( $title, $params = false ) { + parent::__construct( 'fixDoubleRedirect', $title, $params ); + $this->reason = $params['reason']; + $this->redirTitle = Title::newFromText( $params['redirTitle'] ); + } + + /** + * @return bool + */ + function run() { + if ( !$this->redirTitle ) { + $this->setLastError( 'Invalid title' ); + + return false; + } + + $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST ); + if ( !$targetRev ) { + wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" ); + + return true; + } + $content = $targetRev->getContent(); + $currentDest = $content ? $content->getRedirectTarget() : null; + if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) { + wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" ); + + return true; + } + + // Check for a suppression tag (used e.g. in periodically archived discussions) + $mw = MagicWord::get( 'staticredirect' ); + if ( $content->matchMagicWord( $mw ) ) { + wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" ); + + return true; + } + + // Find the current final destination + $newTitle = self::getFinalDestination( $this->redirTitle ); + if ( !$newTitle ) { + wfDebug( __METHOD__ . + ": skipping: single redirect, circular redirect or invalid redirect destination\n" ); + + return true; + } + if ( $newTitle->equals( $this->redirTitle ) ) { + // The redirect is already right, no need to change it + // This can happen if the page was moved back (say after vandalism) + wfDebug( __METHOD__ . " : skipping, already good\n" ); + } + + // Preserve fragment (bug 14904) + $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(), + $currentDest->getFragment(), $newTitle->getInterwiki() ); + + // Fix the text + $newContent = $content->updateRedirect( $newTitle ); + + if ( $newContent->equals( $content ) ) { + $this->setLastError( 'Content unchanged???' ); + + return false; + } + + $user = $this->getUser(); + if ( !$user ) { + $this->setLastError( 'Invalid user' ); + + return false; + } + + // Save it + global $wgUser; + $oldUser = $wgUser; + $wgUser = $user; + $article = WikiPage::factory( $this->title ); + + // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance + $reason = wfMessage( 'double-redirect-fixed-' . $this->reason, + $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText() + )->inContentLanguage()->text(); + $article->doEditContent( $newContent, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $user ); + $wgUser = $oldUser; + + return true; + } + + /** + * Get the final destination of a redirect + * + * @param Title $title + * + * @return bool If the specified title is not a redirect, or if it is a circular redirect + */ + public static function getFinalDestination( $title ) { + $dbw = wfGetDB( DB_MASTER ); + + // Circular redirect check + $seenTitles = array(); + $dest = false; + + while ( true ) { + $titleText = $title->getPrefixedDBkey(); + if ( isset( $seenTitles[$titleText] ) ) { + wfDebug( __METHOD__, "Circular redirect detected, aborting\n" ); + + return false; + } + $seenTitles[$titleText] = true; + + if ( $title->isExternal() ) { + // If the target is interwiki, we have to break early (bug 40352). + // Otherwise it will look up a row in the local page table + // with the namespace/page of the interwiki target which can cause + // unexpected results (e.g. X -> foo:Bar -> Bar -> .. ) + break; + } + + $row = $dbw->selectRow( + array( 'redirect', 'page' ), + array( 'rd_namespace', 'rd_title', 'rd_interwiki' ), + array( + 'rd_from=page_id', + 'page_namespace' => $title->getNamespace(), + 'page_title' => $title->getDBkey() + ), __METHOD__ ); + if ( !$row ) { + # No redirect from here, chain terminates + break; + } else { + $dest = $title = Title::makeTitle( + $row->rd_namespace, + $row->rd_title, + '', + $row->rd_interwiki + ); + } + } + + return $dest; + } + + /** + * Get a user object for doing edits, from a request-lifetime cache + * False will be returned if the user name specified in the + * 'double-redirect-fixer' message is invalid. + * + * @return User|bool + */ + function getUser() { + if ( !self::$user ) { + $username = wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text(); + self::$user = User::newFromName( $username ); + # User::newFromName() can return false on a badly configured wiki. + if ( self::$user && !self::$user->isLoggedIn() ) { + self::$user->addToDatabase(); + } + } + + return self::$user; + } +} diff --git a/includes/jobqueue/jobs/DuplicateJob.php b/includes/jobqueue/jobs/DuplicateJob.php new file mode 100644 index 00000000..1fa6cefe --- /dev/null +++ b/includes/jobqueue/jobs/DuplicateJob.php @@ -0,0 +1,59 @@ +<?php +/** + * No-op job that does nothing. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +/** + * No-op job that does nothing. Used to represent duplicates. + * + * @ingroup JobQueue + */ +final class DuplicateJob extends Job { + /** + * Callers should use DuplicateJob::newFromJob() instead + * + * @param Title $title + * @param array $params Job parameters + */ + function __construct( $title, $params ) { + parent::__construct( 'duplicate', $title, $params ); + } + + /** + * Get a duplicate no-op version of a job + * + * @param Job $job + * @return Job + */ + public static function newFromJob( Job $job ) { + $djob = new self( $job->getTitle(), $job->getParams() ); + $djob->command = $job->getType(); + $djob->params = is_array( $djob->params ) ? $djob->params : array(); + $djob->params = array( 'isDuplicate' => true ) + $djob->params; + $djob->metadata = $job->metadata; + + return $djob; + } + + public function run() { + return true; + } +} diff --git a/includes/jobqueue/jobs/EmaillingJob.php b/includes/jobqueue/jobs/EmaillingJob.php new file mode 100644 index 00000000..df8ae63e --- /dev/null +++ b/includes/jobqueue/jobs/EmaillingJob.php @@ -0,0 +1,46 @@ +<?php +/** + * Old job for notification emails. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Old job used for sending single notification emails; + * kept for backwards-compatibility + * + * @ingroup JobQueue + */ +class EmaillingJob extends Job { + function __construct( $title, $params ) { + parent::__construct( 'sendMail', Title::newMainPage(), $params ); + } + + function run() { + $status = UserMailer::send( + $this->params['to'], + $this->params['from'], + $this->params['subj'], + $this->params['body'], + $this->params['replyto'] + ); + + return $status->isOK(); + } +} diff --git a/includes/jobqueue/jobs/EnotifNotifyJob.php b/includes/jobqueue/jobs/EnotifNotifyJob.php new file mode 100644 index 00000000..1ed99a58 --- /dev/null +++ b/includes/jobqueue/jobs/EnotifNotifyJob.php @@ -0,0 +1,57 @@ +<?php +/** + * Job for notification emails. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job for email notification mails + * + * @ingroup JobQueue + */ +class EnotifNotifyJob extends Job { + function __construct( $title, $params ) { + parent::__construct( 'enotifNotify', $title, $params ); + } + + function run() { + $enotif = new EmailNotification(); + // Get the user from ID (rename safe). Anons are 0, so defer to name. + if ( isset( $this->params['editorID'] ) && $this->params['editorID'] ) { + $editor = User::newFromId( $this->params['editorID'] ); + // B/C, only the name might be given. + } else { + # @todo FIXME: newFromName could return false on a badly configured wiki. + $editor = User::newFromName( $this->params['editor'], false ); + } + $enotif->actuallyNotifyOnPageChange( + $editor, + $this->title, + $this->params['timestamp'], + $this->params['summary'], + $this->params['minorEdit'], + $this->params['oldid'], + $this->params['watchers'], + $this->params['pageStatus'] + ); + + return true; + } +} diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php new file mode 100644 index 00000000..4d1e72c9 --- /dev/null +++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php @@ -0,0 +1,162 @@ +<?php +/** + * HTML cache invalidation of all pages linking to a given title. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +/** + * Job to purge the cache for all pages that link to or use another page or file + * + * This job comes in a few variants: + * - a) Recursive jobs to purge caches for backlink pages for a given title. + * These jobs have have (recursive:true,table:<table>) set. + * - b) Jobs to purge caches for a set of titles (the job title is ignored). + * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set. + * + * @ingroup JobQueue + */ +class HTMLCacheUpdateJob extends Job { + function __construct( $title, $params = '' ) { + parent::__construct( 'htmlCacheUpdate', $title, $params ); + // Base backlink purge jobs can be de-duplicated + $this->removeDuplicates = ( !isset( $params['range'] ) && !isset( $params['pages'] ) ); + } + + function run() { + global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery; + + static $expected = array( 'recursive', 'pages' ); // new jobs have one of these + + $oldRangeJob = false; + if ( !array_intersect( array_keys( $this->params ), $expected ) ) { + // B/C for older job params formats that lack these fields: + // a) base jobs with just ("table") and b) range jobs with ("table","start","end") + if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { + $oldRangeJob = true; + } else { + $this->params['recursive'] = true; // base job + } + } + + // Job to purge all (or a range of) backlink pages for a page + if ( !empty( $this->params['recursive'] ) ) { + // Convert this into no more than $wgUpdateRowsPerJob HTMLCacheUpdateJob per-title + // jobs and possibly a recursive HTMLCacheUpdateJob job for the rest of the backlinks + $jobs = BacklinkJobUtils::partitionBacklinkJob( + $this, + $wgUpdateRowsPerJob, + $wgUpdateRowsPerQuery, // jobs-per-title + // Carry over information for de-duplication + array( 'params' => $this->getRootJobParams() ) + ); + JobQueueGroup::singleton()->push( $jobs ); + // Job to purge pages for for a set of titles + } elseif ( isset( $this->params['pages'] ) ) { + $this->invalidateTitles( $this->params['pages'] ); + // B/C for job to purge a range of backlink pages for a given page + } elseif ( $oldRangeJob ) { + $titleArray = $this->title->getBacklinkCache()->getLinks( + $this->params['table'], $this->params['start'], $this->params['end'] ); + + $pages = array(); // same format BacklinkJobUtils uses + foreach ( $titleArray as $tl ) { + $pages[$tl->getArticleId()] = array( $tl->getNamespace(), $tl->getDbKey() ); + } + + $jobs = array(); + foreach ( array_chunk( $pages, $wgUpdateRowsPerJob ) as $pageChunk ) { + $jobs[] = new HTMLCacheUpdateJob( $this->title, + array( + 'table' => $this->params['table'], + 'pages' => $pageChunk + ) + $this->getRootJobParams() // carry over information for de-duplication + ); + } + JobQueueGroup::singleton()->push( $jobs ); + } + + return true; + } + + /** + * @param array $pages Map of (page ID => (namespace, DB key)) entries + */ + protected function invalidateTitles( array $pages ) { + global $wgUpdateRowsPerQuery, $wgUseFileCache, $wgUseSquid; + + // Get all page IDs in this query into an array + $pageIds = array_keys( $pages ); + if ( !$pageIds ) { + return; + } + + $dbw = wfGetDB( DB_MASTER ); + + // The page_touched field will need to be bumped for these pages. + // Only bump it to the present time if no "rootJobTimestamp" was known. + // If it is known, it can be used instead, which avoids invalidating output + // that was in fact generated *after* the relevant dependency change time + // (e.g. template edit). This is particularily useful since refreshLinks jobs + // save back parser output and usually run along side htmlCacheUpdate jobs; + // their saved output would be invalidated by using the current timestamp. + if ( isset( $this->params['rootJobTimestamp'] ) ) { + $touchTimestamp = $this->params['rootJobTimestamp']; + } else { + $touchTimestamp = wfTimestampNow(); + } + + // Update page_touched (skipping pages already touched since the root job). + // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already. + foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) { + $dbw->update( 'page', + array( 'page_touched' => $dbw->timestamp( $touchTimestamp ) ), + array( 'page_id' => $batch, + // don't invalidated pages that were already invalidated + "page_touched < " . $dbw->addQuotes( $dbw->timestamp( $touchTimestamp ) ) + ), + __METHOD__ + ); + } + // Get the list of affected pages (races only mean something else did the purge) + $titleArray = TitleArray::newFromResult( $dbw->select( + 'page', + array( 'page_namespace', 'page_title' ), + array( 'page_id' => $pageIds, 'page_touched' => $dbw->timestamp( $touchTimestamp ) ), + __METHOD__ + ) ); + + // Update squid + if ( $wgUseSquid ) { + $u = SquidUpdate::newFromTitles( $titleArray ); + $u->doUpdate(); + } + + // Update file cache + if ( $wgUseFileCache ) { + foreach ( $titleArray as $title ) { + HTMLFileCache::clearFileCache( $title ); + } + } + } + + public function workItemCount() { + return isset( $this->params['pages'] ) ? count( $this->params['pages'] ) : 1; + } +} diff --git a/includes/jobqueue/jobs/NullJob.php b/includes/jobqueue/jobs/NullJob.php new file mode 100644 index 00000000..66291e9d --- /dev/null +++ b/includes/jobqueue/jobs/NullJob.php @@ -0,0 +1,76 @@ +<?php +/** + * Degenerate job that does nothing. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +/** + * Degenerate job that does nothing, but can optionally replace itself + * in the queue and/or sleep for a brief time period. These can be used + * to represent "no-op" jobs or test lock contention and performance. + * + * @par Example: + * Inserting a null job in the configured job queue: + * @code + * $ php maintenance/eval.php + * > $queue = JobQueueGroup::singleton(); + * > $job = new NullJob( Title::newMainPage(), array( 'lives' => 10 ) ); + * > $queue->push( $job ); + * @endcode + * You can then confirm the job has been enqueued by using the showJobs.php + * maintenance utility: + * @code + * $ php maintenance/showJobs.php --group + * null: 1 queue; 0 claimed (0 active, 0 abandoned) + * $ + * @endcode + * + * @ingroup JobQueue + */ +class NullJob extends Job { + /** + * @param Title $title + * @param array $params Job parameters (lives, usleep) + */ + function __construct( $title, $params ) { + parent::__construct( 'null', $title, $params ); + if ( !isset( $this->params['lives'] ) ) { + $this->params['lives'] = 1; + } + if ( !isset( $this->params['usleep'] ) ) { + $this->params['usleep'] = 0; + } + $this->removeDuplicates = !empty( $this->params['removeDuplicates'] ); + } + + public function run() { + if ( $this->params['usleep'] > 0 ) { + usleep( $this->params['usleep'] ); + } + if ( $this->params['lives'] > 1 ) { + $params = $this->params; + $params['lives']--; + $job = new self( $this->title, $params ); + JobQueueGroup::singleton()->push( $job ); + } + + return true; + } +} diff --git a/includes/jobqueue/jobs/PublishStashedFileJob.php b/includes/jobqueue/jobs/PublishStashedFileJob.php new file mode 100644 index 00000000..918a392d --- /dev/null +++ b/includes/jobqueue/jobs/PublishStashedFileJob.php @@ -0,0 +1,150 @@ +<?php +/** + * Upload a file from the upload stash into the local file repo. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Upload + */ + +/** + * Upload a file from the upload stash into the local file repo. + * + * @ingroup Upload + */ +class PublishStashedFileJob extends Job { + public function __construct( $title, $params ) { + parent::__construct( 'PublishStashedFile', $title, $params ); + $this->removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); + try { + $user = $context->getUser(); + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + + return false; + } + + if ( count( $_SESSION ) === 0 ) { + // Empty session probably indicates that we didn't associate + // with the session correctly. Note that being able to load + // the user does not necessarily mean the session was loaded. + // Most likely cause by suhosin.session.encrypt = On. + $this->setLastError( "Error associating with user session. " . + "Try setting suhosin.session.encrypt = Off" ); + + return false; + } + + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ) + ); + + $upload = new UploadFromStash( $user ); + // @todo initialize() causes a GET, ideally we could frontload the antivirus + // checks and anything else to the stash stage (which includes concatenation and + // the local file is thus already there). That way, instead of GET+PUT, there could + // just be a COPY operation from the stash to the public zone. + $upload->initialize( $this->params['filekey'], $this->params['filename'] ); + + // Check if the local file checks out (this is generally a no-op) + $verification = $upload->verifyUpload(); + if ( $verification['status'] !== UploadBase::OK ) { + $status = Status::newFatal( 'verification-error' ); + $status->value = array( 'verification' => $verification ); + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) + ); + $this->setLastError( "Could not verify upload." ); + + return false; + } + + // Upload the stashed file to a permanent location + $status = $upload->performUpload( + $this->params['comment'], + $this->params['text'], + $this->params['watch'], + $user + ); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) + ); + $this->setLastError( $status->getWikiText() ); + + return false; + } + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Success', + 'stage' => 'publish', + 'filename' => $upload->getLocalFile()->getName(), + 'imageinfo' => $imageInfo, + 'status' => Status::newGood() + ) + ); + } catch ( MWException $e ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Failure', + 'stage' => 'publish', + 'status' => Status::newFatal( 'api-error-publishfailed' ) + ) + ); + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); + // To prevent potential database referential integrity issues. + // See bug 32551. + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + + return false; + } + + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); + } + + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php new file mode 100644 index 00000000..f82af273 --- /dev/null +++ b/includes/jobqueue/jobs/RefreshLinksJob.php @@ -0,0 +1,199 @@ +<?php +/** + * Job to update link tables for pages + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job to update link tables for pages + * + * This job comes in a few variants: + * - a) Recursive jobs to update links for backlink pages for a given title. + * These jobs have have (recursive:true,table:<table>) set. + * - b) Jobs to update links for a set of pages (the job title is ignored). + * These jobs have have (pages:(<page ID>:(<namespace>,<title>),...) set. + * - c) Jobs to update links for a single page (the job title) + * These jobs need no extra fields set. + * + * @ingroup JobQueue + */ +class RefreshLinksJob extends Job { + const PARSE_THRESHOLD_SEC = 1.0; + + function __construct( $title, $params = '' ) { + parent::__construct( 'refreshLinks', $title, $params ); + // Base backlink update jobs and per-title update jobs can be de-duplicated. + // If template A changes twice before any jobs run, a clean queue will have: + // (A base, A base) + // The second job is ignored by the queue on insertion. + // Suppose, many pages use template A, and that template itself uses template B. + // An edit to both will first create two base jobs. A clean FIFO queue will have: + // (A base, B base) + // When these jobs run, the queue will have per-title and remnant partition jobs: + // (titleX,titleY,titleZ,...,A remnant,titleM,titleN,titleO,...,B remnant) + // Some these jobs will be the same, and will automatically be ignored by + // the queue upon insertion. Some title jobs will run before the duplicate is + // inserted, so the work will still be done twice in those cases. More titles + // can be de-duplicated as the remnant jobs continue to be broken down. This + // works best when $wgUpdateRowsPerJob, and either the pages have few backlinks + // and/or the backlink sets for pages A and B are almost identical. + $this->removeDuplicates = !isset( $params['range'] ) + && ( !isset( $params['pages'] ) || count( $params['pages'] ) == 1 ); + } + + function run() { + global $wgUpdateRowsPerJob; + + // Job to update all (or a range of) backlink pages for a page + if ( !empty( $this->params['recursive'] ) ) { + // Carry over information for de-duplication + $extraParams = $this->getRootJobParams(); + // Avoid slave lag when fetching templates. + // When the outermost job is run, we know that the caller that enqueued it must have + // committed the relevant changes to the DB by now. At that point, record the master + // position and pass it along as the job recursively breaks into smaller range jobs. + // Hopefully, when leaf jobs are popped, the slaves will have reached that position. + if ( isset( $this->params['masterPos'] ) ) { + $extraParams['masterPos'] = $this->params['masterPos']; + } elseif ( wfGetLB()->getServerCount() > 1 ) { + $extraParams['masterPos'] = wfGetLB()->getMasterPos(); + } else { + $extraParams['masterPos'] = false; + } + // Convert this into no more than $wgUpdateRowsPerJob RefreshLinks per-title + // jobs and possibly a recursive RefreshLinks job for the rest of the backlinks + $jobs = BacklinkJobUtils::partitionBacklinkJob( + $this, + $wgUpdateRowsPerJob, + 1, // job-per-title + array( 'params' => $extraParams ) + ); + JobQueueGroup::singleton()->push( $jobs ); + // Job to update link tables for for a set of titles + } elseif ( isset( $this->params['pages'] ) ) { + foreach ( $this->params['pages'] as $pageId => $nsAndKey ) { + list( $ns, $dbKey ) = $nsAndKey; + $this->runForTitle( Title::makeTitleSafe( $ns, $dbKey ) ); + } + // Job to update link tables for a given title + } else { + $this->runForTitle( $this->title ); + } + + return true; + } + + protected function runForTitle( Title $title = null ) { + $linkCache = LinkCache::singleton(); + $linkCache->clear(); + + if ( is_null( $title ) ) { + $this->setLastError( "refreshLinks: Invalid title" ); + return false; + } + + // Wait for the DB of the current/next slave DB handle to catch up to the master. + // This way, we get the correct page_latest for templates or files that just changed + // milliseconds ago, having triggered this job to begin with. + if ( isset( $this->params['masterPos'] ) && $this->params['masterPos'] !== false ) { + wfGetLB()->waitFor( $this->params['masterPos'] ); + } + + $page = WikiPage::factory( $title ); + + // Fetch the current revision... + $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL ); + if ( !$revision ) { + $this->setLastError( "refreshLinks: Article not found {$title->getPrefixedDBkey()}" ); + return false; // XXX: what if it was just deleted? + } + $content = $revision->getContent( Revision::RAW ); + if ( !$content ) { + // If there is no content, pretend the content is empty + $content = $revision->getContentHandler()->makeEmptyContent(); + } + + $parserOutput = false; + $parserOptions = $page->makeParserOptions( 'canonical' ); + // If page_touched changed after this root job (with a good slave lag skew factor), + // then it is likely that any views of the pages already resulted in re-parses which + // are now in cache. This can be reused to avoid expensive parsing in some cases. + if ( isset( $this->params['rootJobTimestamp'] ) ) { + $skewedTimestamp = wfTimestamp( TS_UNIX, $this->params['rootJobTimestamp'] ) + 5; + if ( $page->getLinksTimestamp() > wfTimestamp( TS_MW, $skewedTimestamp ) ) { + // Something already updated the backlinks since this job was made + return true; + } + if ( $page->getTouched() > wfTimestamp( TS_MW, $skewedTimestamp ) ) { + $parserOutput = ParserCache::singleton()->getDirty( $page, $parserOptions ); + if ( $parserOutput && $parserOutput->getCacheTime() <= $skewedTimestamp ) { + $parserOutput = false; // too stale + } + } + } + // Fetch the current revision and parse it if necessary... + if ( $parserOutput == false ) { + $start = microtime( true ); + // Revision ID must be passed to the parser output to get revision variables correct + $parserOutput = $content->getParserOutput( + $title, $revision->getId(), $parserOptions, false ); + $ellapsed = microtime( true ) - $start; + // If it took a long time to render, then save this back to the cache to avoid + // wasted CPU by other apaches or job runners. We don't want to always save to + // cache as this cause cause high cache I/O and LRU churn when a template changes. + if ( $ellapsed >= self::PARSE_THRESHOLD_SEC + && $page->isParserCacheUsed( $parserOptions, $revision->getId() ) + && $parserOutput->isCacheable() + ) { + $ctime = wfTimestamp( TS_MW, (int)$start ); // cache time + ParserCache::singleton()->save( + $parserOutput, $page, $parserOptions, $ctime, $revision->getId() + ); + } + } + + $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput ); + DataUpdate::runUpdates( $updates ); + + InfoAction::invalidateCache( $title ); + + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + // Don't let highly unique "masterPos" values ruin duplicate detection + unset( $info['params']['masterPos'] ); + // For per-pages jobs, the job title is that of the template that changed + // (or similar), so remove that since it ruins duplicate detection + if ( isset( $info['pages'] ) ) { + unset( $info['namespace'] ); + unset( $info['title'] ); + } + } + + return $info; + } + + public function workItemCount() { + return isset( $this->params['pages'] ) ? count( $this->params['pages'] ) : 1; + } +} diff --git a/includes/jobqueue/jobs/RefreshLinksJob2.php b/includes/jobqueue/jobs/RefreshLinksJob2.php new file mode 100644 index 00000000..97405aeb --- /dev/null +++ b/includes/jobqueue/jobs/RefreshLinksJob2.php @@ -0,0 +1,141 @@ +<?php +/** + * Job to update links for a given title. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Background job to update links for titles in certain backlink range by page ID. + * Newer version for high use templates. This is deprecated by RefreshLinksPartitionJob. + * + * @ingroup JobQueue + * @deprecated since 1.23 + */ +class RefreshLinksJob2 extends Job { + function __construct( $title, $params ) { + parent::__construct( 'refreshLinks2', $title, $params ); + // Base jobs for large templates can easily be de-duplicated + $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] ); + } + + /** + * Run a refreshLinks2 job + * @return bool Success + */ + function run() { + global $wgUpdateRowsPerJob; + + $linkCache = LinkCache::singleton(); + $linkCache->clear(); + + if ( is_null( $this->title ) ) { + $this->error = "refreshLinks2: Invalid title"; + return false; + } + + // Back compat for pre-r94435 jobs + $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks'; + + // Avoid slave lag when fetching templates. + // When the outermost job is run, we know that the caller that enqueued it must have + // committed the relevant changes to the DB by now. At that point, record the master + // position and pass it along as the job recursively breaks into smaller range jobs. + // Hopefully, when leaf jobs are popped, the slaves will have reached that position. + if ( isset( $this->params['masterPos'] ) ) { + $masterPos = $this->params['masterPos']; + } elseif ( wfGetLB()->getServerCount() > 1 ) { + $masterPos = wfGetLB()->getMasterPos(); + } else { + $masterPos = false; + } + + $tbc = $this->title->getBacklinkCache(); + + $jobs = array(); // jobs to insert + if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { + # This is a partition job to trigger the insertion of leaf jobs... + $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); + } else { + # This is a base job to trigger the insertion of partitioned jobs... + if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) { + # Just directly insert the single per-title jobs + $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); + } else { + # Insert the partition jobs to make per-title jobs + foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) { + list( $start, $end ) = $batch; + $jobs[] = new RefreshLinksJob2( $this->title, + array( + 'table' => $table, + 'start' => $start, + 'end' => $end, + 'masterPos' => $masterPos, + ) + $this->getRootJobParams() // carry over information for de-duplication + ); + } + } + } + + if ( count( $jobs ) ) { + JobQueueGroup::singleton()->push( $jobs ); + } + + return true; + } + + /** + * @param string $table + * @param mixed $masterPos + * @return array + */ + protected function getSingleTitleJobs( $table, $masterPos ) { + # The "start"/"end" fields are not set for the base jobs + $start = isset( $this->params['start'] ) ? $this->params['start'] : false; + $end = isset( $this->params['end'] ) ? $this->params['end'] : false; + $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end ); + # Convert into single page refresh links jobs. + # This handles well when in sapi mode and is useful in any case for job + # de-duplication. If many pages use template A, and that template itself + # uses template B, then an edit to both will create many duplicate jobs. + # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will + # get run first, and when it does, it will remove the duplicates. Of course, + # one page could have its job popped when the other page's job is still + # buried within the logic of a refreshLinks2 job. + $jobs = array(); + foreach ( $titles as $title ) { + $jobs[] = new RefreshLinksJob( $title, + array( 'masterPos' => $masterPos ) + $this->getRootJobParams() + ); // carry over information for de-duplication + } + return $jobs; + } + + /** + * @return array + */ + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + // Don't let highly unique "masterPos" values ruin duplicate detection + if ( is_array( $info['params'] ) ) { + unset( $info['params']['masterPos'] ); + } + return $info; + } +} diff --git a/includes/jobqueue/jobs/UploadFromUrlJob.php b/includes/jobqueue/jobs/UploadFromUrlJob.php new file mode 100644 index 00000000..a09db15a --- /dev/null +++ b/includes/jobqueue/jobs/UploadFromUrlJob.php @@ -0,0 +1,187 @@ +<?php +/** + * Job for asynchronous upload-by-url. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job for asynchronous upload-by-url. + * + * This job is in fact an interface to UploadFromUrl, which is designed such + * that it does not require any globals. If it does, fix it elsewhere, do not + * add globals in here. + * + * @ingroup JobQueue + */ +class UploadFromUrlJob extends Job { + const SESSION_KEYNAME = 'wsUploadFromUrlJobData'; + + /** @var UploadFromUrl */ + public $upload; + + /** @var User */ + protected $user; + + public function __construct( $title, $params ) { + parent::__construct( 'uploadFromUrl', $title, $params ); + } + + public function run() { + global $wgCopyUploadAsyncTimeout; + # Initialize this object and the upload object + $this->upload = new UploadFromUrl(); + $this->upload->initialize( + $this->title->getText(), + $this->params['url'], + false + ); + $this->user = User::newFromName( $this->params['userName'] ); + + # Fetch the file + $opts = array(); + if ( $wgCopyUploadAsyncTimeout ) { + $opts['timeout'] = $wgCopyUploadAsyncTimeout; + } + $status = $this->upload->fetchFile( $opts ); + if ( !$status->isOk() ) { + $this->leaveMessage( $status ); + + return true; + } + + # Verify upload + $result = $this->upload->verifyUpload(); + if ( $result['status'] != UploadBase::OK ) { + $status = $this->upload->convertVerifyErrorToStatus( $result ); + $this->leaveMessage( $status ); + + return true; + } + + # Check warnings + if ( !$this->params['ignoreWarnings'] ) { + $warnings = $this->upload->checkWarnings(); + if ( $warnings ) { + + # Stash the upload + $key = $this->upload->stashFile(); + + // @todo FIXME: This has been broken for a while. + // User::leaveUserMessage() does not exist. + if ( $this->params['leaveMessage'] ) { + $this->user->leaveUserMessage( + wfMessage( 'upload-warning-subj' )->text(), + wfMessage( 'upload-warning-msg', + $key, + $this->params['url'] )->text() + ); + } else { + wfSetupSession( $this->params['sessionId'] ); + $this->storeResultInSession( 'Warning', + 'warnings', $warnings ); + session_write_close(); + } + + return true; + } + } + + # Perform the upload + $status = $this->upload->performUpload( + $this->params['comment'], + $this->params['pageText'], + $this->params['watch'], + $this->user + ); + $this->leaveMessage( $status ); + + return true; + } + + /** + * Leave a message on the user talk page or in the session according to + * $params['leaveMessage']. + * + * @param Status $status + */ + protected function leaveMessage( $status ) { + if ( $this->params['leaveMessage'] ) { + if ( $status->isGood() ) { + // @todo FIXME: user->leaveUserMessage does not exist. + $this->user->leaveUserMessage( wfMessage( 'upload-success-subj' )->text(), + wfMessage( 'upload-success-msg', + $this->upload->getTitle()->getText(), + $this->params['url'] + )->text() ); + } else { + // @todo FIXME: user->leaveUserMessage does not exist. + $this->user->leaveUserMessage( wfMessage( 'upload-failure-subj' )->text(), + wfMessage( 'upload-failure-msg', + $status->getWikiText(), + $this->params['url'] + )->text() ); + } + } else { + wfSetupSession( $this->params['sessionId'] ); + if ( $status->isOk() ) { + $this->storeResultInSession( 'Success', + 'filename', $this->upload->getLocalFile()->getName() ); + } else { + $this->storeResultInSession( 'Failure', + 'errors', $status->getErrorsArray() ); + } + session_write_close(); + } + } + + /** + * Store a result in the session data. Note that the caller is responsible + * for appropriate session_start and session_write_close calls. + * + * @param string $result The result (Success|Warning|Failure) + * @param string $dataKey The key of the extra data + * @param mixed $dataValue The extra data itself + */ + protected function storeResultInSession( $result, $dataKey, $dataValue ) { + $session =& self::getSessionData( $this->params['sessionKey'] ); + $session['result'] = $result; + $session[$dataKey] = $dataValue; + } + + /** + * Initialize the session data. Sets the intial result to queued. + */ + public function initializeSessionData() { + $session =& self::getSessionData( $this->params['sessionKey'] ); + $$session['result'] = 'Queued'; + } + + /** + * @param string $key + * @return mixed + */ + public static function &getSessionData( $key ) { + if ( !isset( $_SESSION[self::SESSION_KEYNAME][$key] ) ) { + $_SESSION[self::SESSION_KEYNAME][$key] = array(); + } + + return $_SESSION[self::SESSION_KEYNAME][$key]; + } +} diff --git a/includes/jobqueue/utils/BacklinkJobUtils.php b/includes/jobqueue/utils/BacklinkJobUtils.php new file mode 100644 index 00000000..c8e5df66 --- /dev/null +++ b/includes/jobqueue/utils/BacklinkJobUtils.php @@ -0,0 +1,122 @@ +<?php +/** + * Job to update links for a given title. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + * @author Aaron Schulz + */ + +/** + * Class with Backlink related Job helper methods + * + * @ingroup JobQueue + * @since 1.23 + */ +class BacklinkJobUtils { + /** + * Break down $job into approximately ($bSize/$cSize) leaf jobs and a single partition + * job that covers the remaining backlink range (if needed). Jobs for the first $bSize + * titles are collated ($cSize per job) into leaf jobs to do actual work. All the + * resulting jobs are of the same class as $job. No partition job is returned if the + * range covered by $job was less than $bSize, as the leaf jobs have full coverage. + * + * The leaf jobs have the 'pages' param set to a (<page ID>:(<namespace>,<DB key>),...) + * map so that the run() function knows what pages to act on. The leaf jobs will keep + * the same job title as the parent job (e.g. $job). + * + * The partition jobs have the 'range' parameter set to a map of the format + * (start:<integer>, end:<integer>, batchSize:<integer>, subranges:((<start>,<end>),...)), + * the 'table' parameter set to that of $job, and the 'recursive' parameter set to true. + * This method can be called on the resulting job to repeat the process again. + * + * The job provided ($job) must have the 'recursive' parameter set to true and the 'table' + * parameter must be set to a backlink table. The job title will be used as the title to + * find backlinks for. Any 'range' parameter must follow the same format as mentioned above. + * This should be managed by recursive calls to this method. + * + * The first jobs return are always the leaf jobs. This lets the caller use push() to + * put them directly into the queue and works well if the queue is FIFO. In such a queue, + * the leaf jobs have to get finished first before anything can resolve the next partition + * job, which keeps the queue very small. + * + * $opts includes: + * - params : extra job parameters to include in each job + * + * @param Job $job + * @param int $bSize BacklinkCache partition size; usually $wgUpdateRowsPerJob + * @param int $cSize Max titles per leaf job; Usually 1 or a modest value + * @param array $opts Optional parameter map + * @return Job[] List of Job objects + */ + public static function partitionBacklinkJob( Job $job, $bSize, $cSize, $opts = array() ) { + $class = get_class( $job ); + $title = $job->getTitle(); + $params = $job->getParams(); + + if ( isset( $params['pages'] ) || empty( $params['recursive'] ) ) { + $ranges = array(); // sanity; this is a leaf node + wfWarn( __METHOD__ . " called on {$job->getType()} leaf job (explosive recursion)." ); + } elseif ( isset( $params['range'] ) ) { + // This is a range job to trigger the insertion of partitioned/title jobs... + $ranges = $params['range']['subranges']; + $realBSize = $params['range']['batchSize']; + } else { + // This is a base job to trigger the insertion of partitioned jobs... + $ranges = $title->getBacklinkCache()->partition( $params['table'], $bSize ); + $realBSize = $bSize; + } + + $extraParams = isset( $opts['params'] ) ? $opts['params'] : array(); + + $jobs = array(); + // Combine the first range (of size $bSize) backlinks into leaf jobs + if ( isset( $ranges[0] ) ) { + list( $start, $end ) = $ranges[0]; + $titles = $title->getBacklinkCache()->getLinks( $params['table'], $start, $end ); + foreach ( array_chunk( iterator_to_array( $titles ), $cSize ) as $titleBatch ) { + $pages = array(); + foreach ( $titleBatch as $tl ) { + $pages[$tl->getArticleId()] = array( $tl->getNamespace(), $tl->getDBKey() ); + } + $jobs[] = new $class( + $title, // maintain parent job title + array( 'pages' => $pages ) + $extraParams + ); + } + } + // Take all of the remaining ranges and build a partition job from it + if ( isset( $ranges[1] ) ) { + $jobs[] = new $class( + $title, // maintain parent job title + array( + 'recursive' => true, + 'table' => $params['table'], + 'range' => array( + 'start' => $ranges[1][0], + 'end' => $ranges[count( $ranges ) - 1][1], + 'batchSize' => $realBSize, + 'subranges' => array_slice( $ranges, 1 ) + ), + ) + $extraParams + ); + } + + return $jobs; + } +} |