diff options
Diffstat (limited to 'includes/job')
21 files changed, 4948 insertions, 536 deletions
diff --git a/includes/job/Job.php b/includes/job/Job.php index d777a5d4..ab7df5d2 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -23,11 +23,11 @@ /** * Class to both describe a background job and handle jobs. + * The queue aspects of this class are now deprecated. * * @ingroup JobQueue */ abstract class Job { - /** * @var Title */ @@ -39,6 +39,9 @@ abstract class Job { $removeDuplicates, $error; + /** @var Array Additional queue metadata */ + public $metadata = array(); + /*------------------------------------------------------------------------- * Abstract functions *------------------------------------------------------------------------*/ @@ -47,178 +50,25 @@ abstract class Job { * Run the job * @return boolean success */ - abstract function run(); + abstract public function run(); /*------------------------------------------------------------------------- * Static functions *------------------------------------------------------------------------*/ /** - * Pop a job of a certain type. This tries less hard than pop() to - * actually find a job; it may be adversely affected by concurrent job - * runners. - * - * @param $type string - * - * @return Job - */ - static function pop_type( $type ) { - wfProfilein( __METHOD__ ); - - $dbw = wfGetDB( DB_MASTER ); - - $dbw->begin( __METHOD__ ); - - $row = $dbw->selectRow( - 'job', - '*', - array( 'job_cmd' => $type ), - __METHOD__, - array( 'LIMIT' => 1, 'FOR UPDATE' ) - ); - - if ( $row === false ) { - $dbw->commit( __METHOD__ ); - wfProfileOut( __METHOD__ ); - return false; - } - - /* Ensure we "own" this row */ - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - - if ( $affected == 0 ) { - wfProfileOut( __METHOD__ ); - return false; - } - - wfIncrStats( 'job-pop' ); - $namespace = $row->job_namespace; - $dbkey = $row->job_title; - $title = Title::makeTitleSafe( $namespace, $dbkey ); - $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), - $row->job_id ); - - $job->removeDuplicates(); - - wfProfileOut( __METHOD__ ); - return $job; - } - - /** - * Pop a job off the front of the queue - * - * @param $offset Integer: Number of jobs to skip - * @return Job or false if there's no jobs - */ - static function pop( $offset = 0 ) { - wfProfileIn( __METHOD__ ); - - $dbr = wfGetDB( DB_SLAVE ); - - /* Get a job from the slave, start with an offset, - scan full set afterwards, avoid hitting purged rows - - NB: If random fetch previously was used, offset - will always be ahead of few entries - */ - - $conditions = self::defaultQueueConditions(); - - $offset = intval( $offset ); - $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' ); - - $row = $dbr->selectRow( 'job', '*', - array_merge( $conditions, array( "job_id >= $offset" ) ), - __METHOD__, - $options - ); - - // Refetching without offset is needed as some of job IDs could have had delayed commits - // and have lower IDs than jobs already executed, blame concurrency :) - // - if ( $row === false ) { - if ( $offset != 0 ) { - $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options ); - } - - if ( $row === false ) { - wfProfileOut( __METHOD__ ); - return false; - } - } - - // Try to delete it from the master - $dbw = wfGetDB( DB_MASTER ); - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - - if ( !$affected ) { - // Failed, someone else beat us to it - // Try getting a random row - $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)', - 'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ ); - if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) { - // No jobs to get - wfProfileOut( __METHOD__ ); - return false; - } - // Get the random row - $row = $dbw->selectRow( 'job', '*', - 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ ); - if ( $row === false ) { - // Random job gone before we got the chance to select it - // Give up - wfProfileOut( __METHOD__ ); - return false; - } - // Delete the random row - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - - if ( !$affected ) { - // Random job gone before we exclusively deleted it - // Give up - wfProfileOut( __METHOD__ ); - return false; - } - } - - // If execution got to here, there's a row in $row that has been deleted from the database - // by this thread. Hence the concurrent pop was successful. - wfIncrStats( 'job-pop' ); - $namespace = $row->job_namespace; - $dbkey = $row->job_title; - $title = Title::makeTitleSafe( $namespace, $dbkey ); - - if ( is_null( $title ) ) { - wfProfileOut( __METHOD__ ); - return false; - } - - $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id ); - - // Remove any duplicates it may have later in the queue - $job->removeDuplicates(); - - wfProfileOut( __METHOD__ ); - return $job; - } - - /** * Create the appropriate object to handle a specific job * - * @param $command String: Job command + * @param string $command Job command * @param $title Title: Associated title - * @param $params Array|bool: Job parameters - * @param $id Int: Job identifier + * @param array|bool $params Job parameters + * @param int $id Job identifier * @throws MWException * @return Job */ - static function factory( $command, Title $title, $params = false, $id = 0 ) { + public static function factory( $command, Title $title, $params = false, $id = 0 ) { global $wgJobClasses; - if( isset( $wgJobClasses[$command] ) ) { + if ( isset( $wgJobClasses[$command] ) ) { $class = $wgJobClasses[$command]; return new $class( $title, $params, $id ); } @@ -226,64 +76,18 @@ abstract class Job { } /** - * @param $params - * @return string - */ - static function makeBlob( $params ) { - if ( $params !== false ) { - return serialize( $params ); - } else { - return ''; - } - } - - /** - * @param $blob - * @return bool|mixed - */ - static function extractBlob( $blob ) { - if ( (string)$blob !== '' ) { - return unserialize( $blob ); - } else { - return false; - } - } - - /** * Batch-insert a group of jobs into the queue. * This will be wrapped in a transaction with a forced commit. * * This may add duplicate at insert time, but they will be * removed later on, when the first one is popped. * - * @param $jobs array of Job objects + * @param array $jobs of Job objects + * @return bool + * @deprecated since 1.21 */ - static function batchInsert( $jobs ) { - if ( !count( $jobs ) ) { - return; - } - $dbw = wfGetDB( DB_MASTER ); - $rows = array(); - - /** - * @var $job Job - */ - foreach ( $jobs as $job ) { - $rows[] = $job->insertFields(); - if ( count( $rows ) >= 50 ) { - # Do a small transaction to avoid slave lag - $dbw->begin( __METHOD__ ); - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $dbw->commit( __METHOD__ ); - $rows = array(); - } - } - if ( $rows ) { // last chunk - $dbw->begin( __METHOD__ ); - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $dbw->commit( __METHOD__ ); - } - wfIncrStats( 'job-insert', count( $jobs ) ); + public static function batchInsert( $jobs ) { + return JobQueueGroup::singleton()->push( $jobs ); } /** @@ -293,46 +97,36 @@ abstract class Job { * be rolled-back as part of a larger transaction. However, * large batches of jobs can cause slave lag. * - * @param $jobs array of Job objects + * @param array $jobs of Job objects + * @return bool + * @deprecated since 1.21 */ - static function safeBatchInsert( $jobs ) { - if ( !count( $jobs ) ) { - return; - } - $dbw = wfGetDB( DB_MASTER ); - $rows = array(); - foreach ( $jobs as $job ) { - $rows[] = $job->insertFields(); - if ( count( $rows ) >= 500 ) { - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $rows = array(); - } - } - if ( $rows ) { // last chunk - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - } - wfIncrStats( 'job-insert', count( $jobs ) ); + public static function safeBatchInsert( $jobs ) { + return JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC ); } - /** - * SQL conditions to apply on most JobQueue queries + * Pop a job of a certain type. This tries less hard than pop() to + * actually find a job; it may be adversely affected by concurrent job + * runners. * - * Whenever we exclude jobs types from the default queue, we want to make - * sure that queries to the job queue actually ignore them. + * @param $type string + * @return Job|bool Returns false if there are no jobs + * @deprecated since 1.21 + */ + public static function pop_type( $type ) { + return JobQueueGroup::singleton()->get( $type )->pop(); + } + + /** + * Pop a job off the front of the queue. + * This is subject to $wgJobTypesExcludedFromDefaultQueue. * - * @return array SQL conditions suitable for Database:: methods + * @return Job or false if there's no jobs + * @deprecated since 1.21 */ - static function defaultQueueConditions( ) { - global $wgJobTypesExcludedFromDefaultQueue; - $conditions = array(); - if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) { - $dbr = wfGetDB( DB_SLAVE ); - foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) { - $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType ); - } - } - return $conditions; + public static function pop() { + return JobQueueGroup::singleton()->pop(); } /*------------------------------------------------------------------------- @@ -345,83 +139,163 @@ abstract class Job { * @param $params array|bool * @param $id int */ - function __construct( $command, $title, $params = false, $id = 0 ) { + public function __construct( $command, $title, $params = false, $id = 0 ) { $this->command = $command; $this->title = $title; $this->params = $params; $this->id = $id; - // A bit of premature generalisation - // Oh well, the whole class is premature generalisation really - $this->removeDuplicates = true; + $this->removeDuplicates = false; // expensive jobs may set this to true } /** - * Insert a single job into the queue. - * @return bool true on success + * @return integer May be 0 for jobs stored outside the DB + * @deprecated since 1.22 */ - function insert() { - $fields = $this->insertFields(); + public function getId() { + return $this->id; + } - $dbw = wfGetDB( DB_MASTER ); + /** + * @return string + */ + public function getType() { + return $this->command; + } - if ( $this->removeDuplicates ) { - $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ ); - if ( $dbw->numRows( $res ) ) { - return true; - } - } - wfIncrStats( 'job-insert' ); - return $dbw->insert( 'job', $fields, __METHOD__ ); + /** + * @return Title + */ + public function getTitle() { + return $this->title; } /** * @return array */ - protected function insertFields() { - $dbw = wfGetDB( DB_MASTER ); + public function getParams() { + return $this->params; + } + + /** + * @return integer|null UNIX timestamp to delay running this job until, otherwise null + * @since 1.22 + */ + public function getReleaseTimestamp() { + return isset( $this->params['jobReleaseTimestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) + : null; + } + + /** + * @return bool Whether only one of each identical set of jobs should be run + */ + public function ignoreDuplicates() { + return $this->removeDuplicates; + } + + /** + * @return bool Whether this job can be retried on failure by job runners + * @since 1.21 + */ + public function allowRetries() { + return true; + } + + /** + * Subclasses may need to override this to make duplication detection work. + * The resulting map conveys everything that makes the job unique. This is + * only checked if ignoreDuplicates() returns true, meaning that duplicate + * jobs are supposed to be ignored. + * + * @return Array Map of key/values + * @since 1.21 + */ + public function getDeduplicationInfo() { + $info = array( + 'type' => $this->getType(), + 'namespace' => $this->getTitle()->getNamespace(), + 'title' => $this->getTitle()->getDBkey(), + 'params' => $this->getParams() + ); + if ( is_array( $info['params'] ) ) { + // Identical jobs with different "root" jobs should count as duplicates + unset( $info['params']['rootJobSignature'] ); + unset( $info['params']['rootJobTimestamp'] ); + // Likewise for jobs with different delay times + unset( $info['params']['jobReleaseTimestamp'] ); + } + return $info; + } + + /** + * @see JobQueue::deduplicateRootJob() + * @param string $key A key that identifies the task + * @return Array + * @since 1.21 + */ + public static function newRootJobParams( $key ) { return array( - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), - 'job_cmd' => $this->command, - 'job_namespace' => $this->title->getNamespace(), - 'job_title' => $this->title->getDBkey(), - 'job_timestamp' => $dbw->timestamp(), - 'job_params' => Job::makeBlob( $this->params ) + 'rootJobSignature' => sha1( $key ), + 'rootJobTimestamp' => wfTimestampNow() ); } /** - * Remove jobs in the job queue which are duplicates of this job. - * This is deadlock-prone and so starts its own transaction. + * @see JobQueue::deduplicateRootJob() + * @return Array + * @since 1.21 */ - function removeDuplicates() { - if ( !$this->removeDuplicates ) { - return; - } + public function getRootJobParams() { + return array( + 'rootJobSignature' => isset( $this->params['rootJobSignature'] ) + ? $this->params['rootJobSignature'] + : null, + 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] ) + ? $this->params['rootJobTimestamp'] + : null + ); + } - $fields = $this->insertFields(); - unset( $fields['job_id'] ); - unset( $fields['job_timestamp'] ); - $dbw = wfGetDB( DB_MASTER ); - $dbw->begin( __METHOD__ ); - $dbw->delete( 'job', $fields, __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - if ( $affected ) { - wfIncrStats( 'job-dup-delete', $affected ); - } + /** + * @see JobQueue::deduplicateRootJob() + * @return bool + * @since 1.22 + */ + public function hasRootJobParams() { + return isset( $this->params['rootJobSignature'] ) + && isset( $this->params['rootJobTimestamp'] ); + } + + /** + * Insert a single job into the queue. + * @return bool true on success + * @deprecated since 1.21 + */ + public function insert() { + return JobQueueGroup::singleton()->push( $this ); } /** * @return string */ - function toString() { + public function toString() { $paramString = ''; if ( $this->params ) { foreach ( $this->params as $key => $value ) { if ( $paramString != '' ) { $paramString .= ' '; } + if ( is_array( $value ) ) { + $value = "array(" . count( $value ) . ")"; + } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) { + $value = "object(" . get_class( $value ) . ")"; + } + $value = (string)$value; + if ( mb_strlen( $value ) > 1024 ) { + $value = "string(" . mb_strlen( $value ) . ")"; + } + $paramString .= "$key=$value"; } } @@ -441,7 +315,7 @@ abstract class Job { $this->error = $error; } - function getLastError() { + public function getLastError() { return $this->error; } } diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php new file mode 100644 index 00000000..6556ee85 --- /dev/null +++ b/includes/job/JobQueue.php @@ -0,0 +1,706 @@ +<?php +/** + * Job queue base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @defgroup JobQueue JobQueue + * @author Aaron Schulz + */ + +/** + * Class to handle enqueueing and running of background jobs + * + * @ingroup JobQueue + * @since 1.21 + */ +abstract class JobQueue { + protected $wiki; // string; wiki ID + protected $type; // string; job type + protected $order; // string; job priority for pop() + protected $claimTTL; // integer; seconds + protected $maxTries; // integer; maximum number of times to try a job + protected $checkDelay; // boolean; allow delayed jobs + + /** @var BagOStuff */ + protected $dupCache; + + const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions + + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) + + /** + * @param $params array + */ + protected function __construct( array $params ) { + $this->wiki = $params['wiki']; + $this->type = $params['type']; + $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; + $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; + if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { + $this->order = $params['order']; + } else { + $this->order = $this->optimalOrder(); + } + if ( !in_array( $this->order, $this->supportedOrders() ) ) { + throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); + } + $this->checkDelay = !empty( $params['checkDelay'] ); + if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { + throw new MWException( __CLASS__ . " does not support delayed jobs." ); + } + $this->dupCache = wfGetCache( CACHE_ANYTHING ); + } + + /** + * Get a job queue object of the specified type. + * $params includes: + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. + * This lets delayed jobs wait in a staging area until a given timestamp is + * reached, at which point they will enter the queue. If this is not enabled + * or not supported, an exception will be thrown on delayed job insertion. + * + * Queue classes should throw an exception if they do not support the options given. + * + * @param $params array + * @return JobQueue + * @throws MWException + */ + final public static function factory( array $params ) { + $class = $params['class']; + if ( !class_exists( $class ) ) { + throw new MWException( "Invalid job queue class '$class'." ); + } + $obj = new $class( $params ); + if ( !( $obj instanceof self ) ) { + throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); + } + return $obj; + } + + /** + * @return string Wiki ID + */ + final public function getWiki() { + return $this->wiki; + } + + /** + * @return string Job type that this queue handles + */ + final public function getType() { + return $this->type; + } + + /** + * @return string One of (random, timestamp, fifo, undefined) + */ + final public function getOrder() { + return $this->order; + } + + /** + * @return bool Whether delayed jobs are enabled + * @since 1.22 + */ + final public function delayedJobsEnabled() { + return $this->checkDelay; + } + + /** + * Get the allowed queue orders for configuration validation + * + * @return Array Subset of (random, timestamp, fifo, undefined) + */ + abstract protected function supportedOrders(); + + /** + * Get the default queue order to use if configuration does not specify one + * + * @return string One of (random, timestamp, fifo, undefined) + */ + abstract protected function optimalOrder(); + + /** + * Find out if delayed jobs are supported for configuration validation + * + * @return boolean Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this might return false when there are actually no jobs. + * If pop() is called and returns false then it should correct the cache. Also, + * calling flushCaches() first prevents this. However, this affect is typically + * not distinguishable from the race condition between isEmpty() and pop(). + * + * @return bool + * @throws JobQueueError + */ + final public function isEmpty() { + wfProfileIn( __METHOD__ ); + $res = $this->doIsEmpty(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::isEmpty() + * @return bool + */ + abstract protected function doIsEmpty(); + + /** + * Get the number of available (unacquired, non-delayed) jobs in the queue. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + */ + final public function getSize() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetSize(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getSize() + * @return integer + */ + abstract protected function doGetSize(); + + /** + * Get the number of acquired jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + */ + final public function getAcquiredCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAcquiredCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getAcquiredCount() + * @return integer + */ + abstract protected function doGetAcquiredCount(); + + /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + * @since 1.22 + */ + final public function getDelayedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetDelayedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return integer + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + + /** + * Get the number of acquired jobs that can no longer be attempted. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws JobQueueError + */ + final public function getAbandonedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAbandonedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getAbandonedCount() + * @return integer + */ + protected function doGetAbandonedCount() { + return 0; // not implemented + } + + /** + * Push a single jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param $jobs Job|Array + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) + * @return bool Returns false on failure + * @throws JobQueueError + */ + final public function push( $jobs, $flags = 0 ) { + return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); + } + + /** + * Push a batch of jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param array $jobs List of Jobs + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) + * @return bool Returns false on failure + * @throws JobQueueError + */ + final public function batchPush( array $jobs, $flags = 0 ) { + if ( !count( $jobs ) ) { + return true; // nothing to do + } + + foreach ( $jobs as $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); + } + } + + wfProfileIn( __METHOD__ ); + $ok = $this->doBatchPush( $jobs, $flags ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::batchPush() + * @return bool + */ + abstract protected function doBatchPush( array $jobs, $flags ); + + /** + * Pop a job off of the queue. + * This requires $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::pop() instead of this function. + * + * @return Job|bool Returns false if there are no jobs + * @throws JobQueueError + */ + final public function pop() { + global $wgJobClasses; + + if ( $this->wiki !== wfWikiID() ) { + throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); + } elseif ( !isset( $wgJobClasses[$this->type] ) ) { + // Do not pop jobs if there is no class for the queue type + throw new MWException( "Unrecognized job type '{$this->type}'." ); + } + + wfProfileIn( __METHOD__ ); + $job = $this->doPop(); + wfProfileOut( __METHOD__ ); + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + JobQueue::incrStats( 'job-pop-duplicate', $this->type ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) {} // don't lose jobs over this + + return $job; + } + + /** + * @see JobQueue::pop() + * @return Job + */ + abstract protected function doPop(); + + /** + * Acknowledge that a job was completed. + * + * This does nothing for certain queue classes or if "claimTTL" is not set. + * Outside callers should use JobQueueGroup::ack() instead of this function. + * + * @param $job Job + * @return bool + * @throws JobQueueError + */ + final public function ack( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $ok = $this->doAck( $job ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::ack() + * @return bool + */ + abstract protected function doAck( Job $job ); + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * This is used to turn older, duplicate, job entries into no-ops. The root job + * information will remain in the registry until it simply falls out of cache. + * + * This requires that $job has two special fields in the "params" array: + * - rootJobSignature : hash (e.g. SHA1) that identifies the task + * - rootJobTimestamp : TS_MW timestamp of this instance of the task + * + * A "root job" is a conceptual job that consist of potentially many smaller jobs + * that are actually inserted into the queue. For example, "refreshLinks" jobs are + * spawned when a template is edited. One can think of the task as "update links + * of pages that use template X" and an instance of that task as a "root job". + * However, what actually goes into the queue are potentially many refreshLinks2 jobs. + * Since these jobs include things like page ID ranges and DB master positions, and morph + * into smaller refreshLinks2 jobs recursively, simple duplicate detection (like job_sha1) + * for individual jobs being identical is not useful. + * + * In the case of "refreshLinks", if these jobs are still in the queue when the template + * is edited again, we want all of these old refreshLinks jobs for that template to become + * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. + * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a + * previous "root job" for the same task of "update links of pages that use template X". + * + * This does nothing for certain queue classes. + * + * @param $job Job + * @return bool + * @throws JobQueueError + */ + final public function deduplicateRootJob( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $ok = $this->doDeduplicateRootJob( $job ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::deduplicateRootJob() + * @param $job Job + * @return bool + */ + protected function doDeduplicateRootJob( Job $job ) { + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + * @throws JobQueueError + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + wfProfileOut( __METHOD__ ); + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Get the last time this root job was enqueued + $timestamp = $this->dupCache->get( $key ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } + + /** + * Deleted all unclaimed and delayed jobs from the queue + * + * @return bool Success + * @throws JobQueueError + * @since 1.22 + */ + final public function delete() { + wfProfileIn( __METHOD__ ); + $res = $this->doDelete(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::delete() + * @return bool Success + */ + protected function doDelete() { + throw new MWException( "This method is not implemented." ); + } + + /** + * Wait for any slaves or backup servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + * @throws JobQueueError + */ + final public function waitForBackups() { + wfProfileIn( __METHOD__ ); + $this->doWaitForBackups(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::waitForBackups() + * @return void + */ + protected function doWaitForBackups() {} + + /** + * Return a map of task names to task definition maps. + * A "task" is a fast periodic queue maintenance action. + * Mutually exclusive tasks must implement their own locking in the callback. + * + * Each task value is an associative array with: + * - name : the name of the task + * - callback : a PHP callable that performs the task + * - period : the period in seconds corresponding to the task frequency + * + * @return Array + */ + final public function getPeriodicTasks() { + $tasks = $this->doGetPeriodicTasks(); + foreach ( $tasks as $name => &$def ) { + $def['name'] = $name; + } + return $tasks; + } + + /** + * @see JobQueue::getPeriodicTasks() + * @return Array + */ + protected function doGetPeriodicTasks() { + return array(); + } + + /** + * Clear any process and persistent caches + * + * @return void + */ + final public function flushCaches() { + wfProfileIn( __METHOD__ ); + $this->doFlushCaches(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::flushCaches() + * @return void + */ + protected function doFlushCaches() {} + + /** + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + */ + abstract public function getAllQueuedJobs(); + + /** + * Get an iterator to traverse over all delayed jobs in this queue. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + * @since 1.22 + */ + public function getAllDelayedJobs() { + return new ArrayIterator( array() ); // not implemented + } + + /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return null; + } + + /** + * Check whether each of the given queues are empty. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (list of non-empty queue types) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueuesWithJobs( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueuesWithJobs( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesWithJobs() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesWithJobs( array $types ) { + return null; // not supported + } + + /** + * Check the size of each of the given queues. + * For queues not served by the same store as this one, 0 is returned. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (job type => whether queue is empty) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueueSizes( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueueSizes( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return null; // not supported + } + + /** + * Call wfIncrStats() for the queue overall and for the queue type + * + * @param string $key Event type + * @param string $type Job type + * @param integer $delta + * @since 1.22 + */ + public static function incrStats( $key, $type, $delta = 1 ) { + wfIncrStats( $key, $delta ); + wfIncrStats( "{$key}-{$type}", $delta ); + } + + /** + * Namespace the queue with a key to isolate it for testing + * + * @param $key string + * @return void + * @throws MWException + */ + public function setTestingPrefix( $key ) { + throw new MWException( "Queue namespacing not supported for this queue type." ); + } +} + +/** + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueError extends MWException {} +class JobQueueConnectionError extends JobQueueError {} diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php new file mode 100644 index 00000000..c39083df --- /dev/null +++ b/includes/job/JobQueueDB.php @@ -0,0 +1,816 @@ +<?php +/** + * Database-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle job queues stored in the DB + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueDB extends JobQueue { + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed + const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random + const MAX_OFFSET = 255; // integer; maximum number of rows to skip + + /** @var BagOStuff */ + protected $cache; + + protected $cluster = false; // string; name of an external DB cluster + + /** + * Additional parameters include: + * - cluster : The name of an external cluster registered via LBFactory. + * If not specified, the primary DB cluster for the wiki will be used. + * This can be overridden with a custom cluster so that DB handles will + * be retrieved via LBFactory::getExternalLB() and getConnection(). + * @param $params array + */ + protected function __construct( array $params ) { + global $wgMemc; + + parent::__construct( $params ); + + $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; + // Make sure that we don't use the SQL cache, which would be harmful + $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc; + } + + protected function supportedOrders() { + return array( 'random', 'timestamp', 'fifo' ); + } + + protected function optimalOrder() { + return 'random'; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + */ + protected function doIsEmpty() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + $dbr = $this->getSlaveDB(); + try { + $found = $dbr->selectField( // unclaimed job + 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); + + return !$found; + } + + /** + * @see JobQueue::doGetSize() + * @return integer + */ + protected function doGetSize() { + $key = $this->getCacheKey( 'size' ); + + $size = $this->cache->get( $key ); + if ( is_int( $size ) ) { + return $size; + } + + try { + $dbr = $this->getSlaveDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); + + return $size; + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return integer + */ + protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'acquiredcount' ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return integer + * @throws MWException + */ + protected function doGetAbandonedCount() { + global $wgMemc; + + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'abandonedcount' ); + + $count = $wgMemc->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getSlaveDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( + 'job_cmd' => $this->type, + "job_token != {$dbr->addQuotes( '' )}", + "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) + ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doBatchPush() + * @param array $jobs + * @param $flags + * @throws DBError|Exception + * @return bool + */ + protected function doBatchPush( array $jobs, $flags ) { + $dbw = $this->getMasterDB(); + + $that = $this; + $method = __METHOD__; + $dbw->onTransactionIdle( + function() use ( $dbw, $that, $jobs, $flags, $method ) { + $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); + } + ); + + return true; + } + + /** + * This function should *not* be called outside of JobQueueDB + * + * @param DatabaseBase $dbw + * @param array $jobs + * @param int $flags + * @param string $method + * @return boolean + * @throws type + */ + public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { + if ( !count( $jobs ) ) { + return true; + } + + $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated + $rowList = array(); // list of jobs for jobs that are are not de-duplicated + foreach ( $jobs as $job ) { + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } + } + + if ( $flags & self::QOS_ATOMIC ) { + $dbw->begin( $method ); // wrap all the job additions in one transaction + } + try { + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + array( + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ), + $method + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid slave lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { + $dbw->insert( 'job', $rowBatch, $method ); + } + JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) ); + } catch ( DBError $e ) { + if ( $flags & self::QOS_ATOMIC ) { + $dbw->rollback( $method ); + } + throw $e; + } + if ( $flags & self::QOS_ATOMIC ) { + $dbw->commit( $method ); + } + + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); + + return true; + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + */ + protected function doPop() { + if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { + return false; // queue is empty + } + + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + $uuid = wfRandomString( 32 ); // pop attempt + $job = false; // job popped off + do { // retry when our row is invalid or deleted as a duplicate + // Try to reserve a row in the DB... + if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { + $row = $this->claimOldest( $uuid ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $row = $this->claimRandom( $uuid, $rand, $gte ); + } + // Check if we found a row to reserve... + if ( !$row ) { + $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); + break; // nothing to do + } + JobQueue::incrStats( 'job-pop', $this->type ); + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebug( "Row has invalid title '{$row->job_title}'." ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + break; // done + } while ( true ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $job; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @param $rand integer Random unsigned integer (31 bits) + * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) + * @return Row|false + */ + protected function claimRandom( $uuid, $rand, $gte ) { + $dbw = $this->getMasterDB(); + // Check cache to see if the queue has <= OFFSET items + $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); + + $row = false; // the row acquired + $invertedDirection = false; // whether one job_random direction was already scanned + // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT + // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is + // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot + // be used here with MySQL. + do { + if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows + // For small queues, using OFFSET will overshoot and return no rows more often. + // Instead, this uses job_random to pick a row (possibly checking both directions). + $ineq = $gte ? '>=' : '<='; + $dir = $gte ? 'ASC' : 'DESC'; + $row = $dbw->selectRow( 'job', '*', // find a random job + array( + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), + __METHOD__, + array( 'ORDER BY' => "job_random {$dir}" ) + ); + if ( !$row && !$invertedDirection ) { + $gte = !$gte; + $invertedDirection = true; + continue; // try the other direction + } + } else { // table *may* have >= MAX_OFFSET rows + // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU + // in MySQL if there are many rows for some reason. This uses a small OFFSET + // instead of job_random for reducing excess claim retries. + $row = $dbw->selectRow( 'job', '*', // find a random job + array( + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + ), + __METHOD__, + array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ) + ); + if ( !$row ) { + $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows + $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); + continue; // use job_random + } + } + if ( $row ) { // claim the job + $dbw->update( 'job', // update by PK + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), + array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), + __METHOD__ + ); + // This might get raced out by another runner when claiming the previously + // selected row. The use of job_random should minimize this problem, however. + if ( !$dbw->affectedRows() ) { + $row = false; // raced out + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @return Row|false + */ + protected function claimOldest( $uuid ) { + $dbw = $this->getMasterDB(); + + $row = false; // the row acquired + do { + if ( $dbw->getType() === 'mysql' ) { + // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the + // same table being changed in an UPDATE query in MySQL (gives Error: 1093). + // Oracle and Postgre have no such limitation. However, MySQL offers an + // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. + $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . + "SET " . + "job_token = {$dbw->addQuotes( $uuid ) }, " . + "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . + "job_attempts = job_attempts+1 " . + "WHERE ( " . + "job_cmd = {$dbw->addQuotes( $this->type )} " . + "AND job_token = {$dbw->addQuotes( '' )} " . + ") ORDER BY job_id ASC LIMIT 1", + __METHOD__ + ); + } else { + // Use a subquery to find the job, within an UPDATE to claim it. + // This uses as much of the DB wrapper functions as possible. + $dbw->update( 'job', + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), + array( 'job_id = (' . + $dbw->selectSQLText( 'job', 'job_id', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__, + array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) . + ')' + ), + __METHOD__ + ); + } + // Fetch any row that we just reserved... + if ( $dbw->affectedRows() ) { + $row = $dbw->selectRow( 'job', '*', + array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ + ); + if ( !$row ) { // raced out by duplicate job removal + wfDebug( "Row deleted as duplicate by another process." ); + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @throws MWException + * @return Job|bool + */ + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['id'] ) ) { + throw new MWException( "Job of type '{$job->getType()}' has no ID." ); + } + + $dbw = $this->getMasterDB(); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', + array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @throws MWException + * @return bool + */ + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $dbw = $this->getMasterDB(); + $cache = $this->dupCache; + $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) { + $timestamp = $cache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } ); + + return true; + } + + /** + * @see JobQueue::doDelete() + * @return bool + */ + protected function doDelete() { + $dbw = $this->getMasterDB(); + try { + $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + return true; + } + + /** + * @see JobQueue::doWaitForBackups() + * @return void + */ + protected function doWaitForBackups() { + wfWaitForSlaves(); + } + + /** + * @return Array + */ + protected function doGetPeriodicTasks() { + return array( + 'recycleAndDeleteStaleJobs' => array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) + ) + ); + } + + /** + * @return void + */ + protected function doFlushCaches() { + foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { + $this->cache->delete( $this->getCacheKey( $type ) ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + $dbr = $this->getSlaveDB(); + try { + return new MappedIterator( + $dbr->select( 'job', '*', + array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), + function( $row ) use ( $dbr ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, + $row->job_id + ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + return $job; + } + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + } + + public function getCoalesceLocationInternal() { + return $this->cluster + ? "DBCluster:{$this->cluster}:{$this->wiki}" + : "LBFactory:{$this->wiki}"; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + array( 'job_cmd' => $types ), __METHOD__ ); + + $types = array(); + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), + array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); + + $sizes = array(); + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + return $sizes; + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted + */ + public function recycleAndDeleteStaleJobs() { + $now = time(); + $count = 0; // affected rows + $dbw = $this->getMasterDB(); + + try { + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } + + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', + array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left + __METHOD__ + ); + $ids = array_map( + function( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_id' => $ids ), + __METHOD__ + ); + $count += $dbw->affectedRows(); + JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() ); + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + } + } + + // Just destroy any stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale + ); + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; + } + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); + $ids = array_map( + function( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); + $count += $dbw->affectedRows(); + JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() ); + } + + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $count; + } + + /** + * @param $job Job + * @return array + */ + protected function insertFields( Job $job ) { + $dbw = $this->getMasterDB(); + return array( + // Fields that describe the nature of the job + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), + // Additional job metadata + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => wfBaseConvert( + sha1( serialize( $job->getDeduplicationInfo() ) ), + 16, 36, 31 + ), + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + ); + } + + /** + * @return DBConnRef + */ + protected function getSlaveDB() { + try { + return $this->getDB( DB_SLAVE ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @return DBConnRef + */ + protected function getMasterDB() { + try { + return $this->getDB( DB_MASTER ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @param $index integer (DB_SLAVE/DB_MASTER) + * @return DBConnRef + */ + protected function getDB( $index ) { + $lb = ( $this->cluster !== false ) + ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) + : wfGetLB( $this->wiki ); + return $lb->getConnectionRef( $index, array(), $this->wiki ); + } + + /** + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); + } + + /** + * @param $params + * @return string + */ + protected static function makeBlob( $params ) { + if ( $params !== false ) { + return serialize( $params ); + } else { + return ''; + } + } + + /** + * @param $blob + * @return bool|mixed + */ + protected static function extractBlob( $blob ) { + if ( (string)$blob !== '' ) { + return unserialize( $blob ); + } else { + return false; + } + } + + /** + * @param DBError $e + * @throws JobQueueError + */ + protected function throwDBException( DBError $e ) { + throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); + } +} diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php new file mode 100644 index 00000000..d3ce164a --- /dev/null +++ b/includes/job/JobQueueFederated.php @@ -0,0 +1,473 @@ +<?php +/** + * Job queue code for federated queues. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle enqueueing and running of background jobs for federated queues + * + * This class allows for queues to be partitioned into smaller queues. + * A partition is defined by the configuration for a JobQueue instance. + * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a + * JobQueueFederated instance, which itself would consist of three JobQueueRedis + * instances, each using their own redis server. This would allow for the jobs + * to be split (evenly or based on weights) accross multiple servers if a single + * server becomes impractical or expensive. Different JobQueue classes can be mixed. + * + * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue + * is inherited by the partition queues. Additional configuration defines what + * section each wiki is in, what partition queues each section uses (and their weight), + * and the JobQueue configuration for each partition. Some sections might only need a + * single queue partition, like the sections for groups of small wikis. + * + * If used for performance, then $wgMainCacheType should be set to memcached/redis. + * Note that "fifo" cannot be used for the ordering, since the data is distributed. + * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, + * queue classes used by this should ignore down servers (with TTL) to avoid slowness. + * + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueFederated extends JobQueue { + /** @var Array (partition name => weight) reverse sorted by weight */ + protected $partitionMap = array(); + /** @var Array (partition name => JobQueue) reverse sorted by weight */ + protected $partitionQueues = array(); + /** @var HashRing */ + protected $partitionPushRing; + /** @var BagOStuff */ + protected $cache; + + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date + + /** + * @params include: + * - sectionsByWiki : A map of wiki IDs to section names. + * Wikis will default to using the section "default". + * - partitionsBySection : Map of section names to maps of (partition name => weight). + * A section called 'default' must be defined if not all wikis + * have explicitly defined sections. + * - configByPartition : Map of queue partition names to configuration arrays. + * These configuration arrays are passed to JobQueue::factory(). + * The options set here are overriden by those passed to this + * the federated queue itself (e.g. 'order' and 'claimTTL'). + * - partitionsNoPush : List of partition names that can handle pop() but not push(). + * This can be used to migrate away from a certain partition. + * @param array $params + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $section = isset( $params['sectionsByWiki'][$this->wiki] ) + ? $params['sectionsByWiki'][$this->wiki] + : 'default'; + if ( !isset( $params['partitionsBySection'][$section] ) ) { + throw new MWException( "No configuration for section '$section'." ); + } + // Get the full partition map + $this->partitionMap = $params['partitionsBySection'][$section]; + arsort( $this->partitionMap, SORT_NUMERIC ); + // Get the partitions jobs can actually be pushed to + $partitionPushMap = $this->partitionMap; + if ( isset( $params['partitionsNoPush'] ) ) { + foreach ( $params['partitionsNoPush'] as $partition ) { + unset( $partitionPushMap[$partition] ); + } + } + // Get the config to pass to merge into each partition queue config + $baseConfig = $params; + foreach ( array( 'class', 'sectionsByWiki', + 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o ) + { + unset( $baseConfig[$o] ); + } + // Get the partition queue objects + foreach ( $this->partitionMap as $partition => $w ) { + if ( !isset( $params['configByPartition'][$partition] ) ) { + throw new MWException( "No configuration for partition '$partition'." ); + } + $this->partitionQueues[$partition] = JobQueue::factory( + $baseConfig + $params['configByPartition'][$partition] ); + } + // Get the ring of partitions to push jobs into + $this->partitionPushRing = new HashRing( $partitionPushMap ); + // Aggregate cache some per-queue values if there are multiple partition queues + $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); + } + + protected function supportedOrders() { + // No FIFO due to partitioning, though "rough timestamp order" is supported + return array( 'undefined', 'random', 'timestamp' ); + } + + protected function optimalOrder() { + return 'undefined'; // defer to the partitions + } + + protected function supportsDelayedJobs() { + return true; // defer checks to the partitions + } + + protected function doIsEmpty() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + foreach ( $this->partitionQueues as $queue ) { + try { + if ( !$queue->doIsEmpty() ) { + $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); + return false; + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + + $this->cache->add( $key, 'true', self::CACHE_TTL_LONG ); + return true; + } + + protected function doGetSize() { + return $this->getCrossPartitionSum( 'size', 'doGetSize' ); + } + + protected function doGetAcquiredCount() { + return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); + } + + protected function doGetDelayedCount() { + return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); + } + + protected function doGetAbandonedCount() { + return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); + } + + /** + * @param string $type + * @param string $method + * @return integer + */ + protected function getCrossPartitionSum( $type, $method ) { + $key = $this->getCacheKey( $type ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $count = 0; + foreach ( $this->partitionQueues as $queue ) { + try { + $count += $queue->$method(); + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + return $count; + } + + protected function doBatchPush( array $jobs, $flags ) { + if ( !count( $jobs ) ) { + return true; // nothing to do + } + // Local ring variable that may be changed to point to a new ring on failure + $partitionRing = $this->partitionPushRing; + // Try to insert the jobs and update $partitionsTry on any failures + $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags ); + if ( count( $jobsLeft ) ) { // some jobs failed to insert? + // Try to insert the remaning jobs once more, ignoring the bad partitions + return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) ); + } + return true; + } + + /** + * @param array $jobs + * @param HashRing $partitionRing + * @param integer $flags + * @return array List of Job object that could not be inserted + */ + protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { + $jobsLeft = array(); + + // Because jobs are spread across partitions, per-job de-duplication needs + // to use a consistent hash to avoid allowing duplicate jobs per partition. + // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. + $uJobsByPartition = array(); // (partition name => job list) + foreach ( $jobs as $key => $job ) { + if ( $job->ignoreDuplicates() ) { + $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); + $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job; + unset( $jobs[$key] ); + } + } + // Get the batches of jobs that are not de-duplicated + if ( $flags & self::QOS_ATOMIC ) { + $nuJobBatches = array( $jobs ); // all or nothing + } else { + // Split the jobs into batches and spread them out over servers if there + // are many jobs. This helps keep the partitions even. Otherwise, send all + // the jobs to a single partition queue to avoids the extra connections. + $nuJobBatches = array_chunk( $jobs, 300 ); + } + + // Insert the de-duplicated jobs into the queues... + foreach ( $uJobsByPartition as $partition => $jobBatch ) { + $queue = $this->partitionQueues[$partition]; + try { + $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + MWExceptionHandler::logException( $e ); + } + if ( $ok ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); + } else { + $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist + if ( !$partitionRing ) { + throw new JobQueueError( "Could not insert job(s), all partitions are down." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + // Insert the jobs that are not de-duplicated into the queues... + foreach ( $nuJobBatches as $jobBatch ) { + $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() ); + $queue = $this->partitionQueues[$partition]; + try { + $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + MWExceptionHandler::logException( $e ); + } + if ( $ok ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); + } else { + $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist + if ( !$partitionRing ) { + throw new JobQueueError( "Could not insert job(s), all partitions are down." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + return $jobsLeft; + } + + protected function doPop() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return false; + } + + $partitionsTry = $this->partitionMap; // (partition => weight) + + while ( count( $partitionsTry ) ) { + $partition = ArrayUtils::pickRandom( $partitionsTry ); + if ( $partition === false ) { + break; // all partitions at 0 weight + } + $queue = $this->partitionQueues[$partition]; + try { + $job = $queue->pop(); + } catch ( JobQueueError $e ) { + $job = false; + MWExceptionHandler::logException( $e ); + } + if ( $job ) { + $job->metadata['QueuePartition'] = $partition; + return $job; + } else { + unset( $partitionsTry[$partition] ); // blacklist partition + } + } + + $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG ); + return false; + } + + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['QueuePartition'] ) ) { + throw new MWException( "The given job has no defined partition name." ); + } + return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); + } + + protected function doIsRootJobOldDuplicate( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); + } catch ( JobQueueError $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); + } + } + return false; + } + + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); + } catch ( JobQueueError $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); + } + } + return false; + } + + protected function doDelete() { + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->doDelete(); + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + } + + protected function doWaitForBackups() { + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->waitForBackups(); + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + } + + protected function doGetPeriodicTasks() { + $tasks = array(); + foreach ( $this->partitionQueues as $partition => $queue ) { + foreach ( $queue->getPeriodicTasks() as $task => $def ) { + $tasks["{$partition}:{$task}"] = $def; + } + } + return $tasks; + } + + protected function doFlushCaches() { + static $types = array( + 'empty', + 'size', + 'acquiredcount', + 'delayedcount', + 'abandonedcount' + ); + foreach ( $types as $type ) { + $this->cache->delete( $this->getCacheKey( $type ) ); + } + foreach ( $this->partitionQueues as $queue ) { + $queue->doFlushCaches(); + } + } + + public function getAllQueuedJobs() { + $iterator = new AppendIterator(); + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllQueuedJobs() ); + } + return $iterator; + } + + public function getAllDelayedJobs() { + $iterator = new AppendIterator(); + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllDelayedJobs() ); + } + return $iterator; + } + + public function getCoalesceLocationInternal() { + return "JobQueueFederated:wiki:{$this->wiki}" . + sha1( serialize( array_keys( $this->partitionMap ) ) ); + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + try { + $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); + if ( is_array( $nonEmpty ) ) { + $result = array_unique( array_merge( $result, $nonEmpty ) ); + } else { + return null; // not supported on all partitions; bail + } + if ( count( $result ) == count( $types ) ) { + break; // short-circuit + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + return array_values( $result ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + try { + $sizes = $queue->doGetSiblingQueueSizes( $types ); + if ( is_array( $sizes ) ) { + foreach ( $sizes as $type => $size ) { + $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + } + } else { + return null; // not supported on all partitions; bail + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + return $result; + } + + public function setTestingPrefix( $key ) { + foreach ( $this->partitionQueues as $queue ) { + $queue->setTestingPrefix( $key ); + } + } + + /** + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); + } +} diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php new file mode 100644 index 00000000..fa7fee5f --- /dev/null +++ b/includes/job/JobQueueGroup.php @@ -0,0 +1,427 @@ +<?php +/** + * Job queue base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle enqueueing of background jobs + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueGroup { + /** @var Array */ + protected static $instances = array(); + + /** @var ProcessCacheLRU */ + protected $cache; + + protected $wiki; // string; wiki ID + + /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ + protected $coalescedQueues; + + const TYPE_DEFAULT = 1; // integer; jobs popped by default + const TYPE_ANY = 2; // integer; any job + + const USE_CACHE = 1; // integer; use process or persistent cache + const USE_PRIORITY = 2; // integer; respect deprioritization + + const PROC_CACHE_TTL = 15; // integer; seconds + + const CACHE_VERSION = 1; // integer; cache version + + /** + * @param string $wiki Wiki ID + */ + protected function __construct( $wiki ) { + $this->wiki = $wiki; + $this->cache = new ProcessCacheLRU( 10 ); + } + + /** + * @param string $wiki Wiki ID + * @return JobQueueGroup + */ + public static function singleton( $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( !isset( self::$instances[$wiki] ) ) { + self::$instances[$wiki] = new self( $wiki ); + } + return self::$instances[$wiki]; + } + + /** + * Destroy the singleton instances + * + * @return void + */ + public static function destroySingletons() { + self::$instances = array(); + } + + /** + * Get the job queue object for a given queue type + * + * @param $type string + * @return JobQueue + */ + public function get( $type ) { + global $wgJobTypeConf; + + $conf = array( 'wiki' => $this->wiki, 'type' => $type ); + if ( isset( $wgJobTypeConf[$type] ) ) { + $conf = $conf + $wgJobTypeConf[$type]; + } else { + $conf = $conf + $wgJobTypeConf['default']; + } + + return JobQueue::factory( $conf ); + } + + /** + * Insert jobs into the respective queues of with the belong. + * + * This inserts the jobs into the queue specified by $wgJobTypeConf + * and updates the aggregate job queue information cache as needed. + * + * @param $jobs Job|array A single Job or a list of Jobs + * @throws MWException + * @return bool + */ + public function push( $jobs ) { + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + + $jobsByType = array(); // (job type => list of jobs) + foreach ( $jobs as $job ) { + if ( $job instanceof Job ) { + $jobsByType[$job->getType()][] = $job; + } else { + throw new MWException( "Attempted to push a non-Job object into a queue." ); + } + } + + $ok = true; + foreach ( $jobsByType as $type => $jobs ) { + if ( $this->get( $type )->push( $jobs ) ) { + JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); + } else { + $ok = false; + } + } + + if ( $this->cache->has( 'queues-ready', 'list' ) ) { + $list = $this->cache->get( 'queues-ready', 'list' ); + if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { + $this->cache->clear( 'queues-ready' ); + } + } + + return $ok; + } + + /** + * Pop a job off one of the job queues + * + * This pops a job off a queue as specified by $wgJobTypeConf and + * updates the aggregate job queue information cache as needed. + * + * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string + * @param $flags integer Bitfield of JobQueueGroup::USE_* constants + * @return Job|bool Returns false on failure + */ + public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { + if ( is_string( $qtype ) ) { // specific job type + if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) { + return false; // back off + } + $job = $this->get( $qtype )->pop(); + if ( !$job ) { + JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); + } + return $job; + } else { // any job in the "default" jobs types + if ( $flags & self::USE_CACHE ) { + if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { + $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); + } + $types = $this->cache->get( 'queues-ready', 'list' ); + } else { + $types = $this->getQueuesWithJobs(); + } + + if ( $qtype == self::TYPE_DEFAULT ) { + $types = array_intersect( $types, $this->getDefaultQueueTypes() ); + } + shuffle( $types ); // avoid starvation + + foreach ( $types as $type ) { // for each queue... + if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) { + continue; // back off + } + $job = $this->get( $type )->pop(); + if ( $job ) { // found + return $job; + } else { // not found + JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); + $this->cache->clear( 'queues-ready' ); + } + } + + return false; // no jobs found + } + } + + /** + * Acknowledge that a job was completed + * + * @param $job Job + * @return bool + */ + public function ack( Job $job ) { + return $this->get( $job->getType() )->ack( $job ); + } + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * + * @param $job Job + * @return bool + */ + public function deduplicateRootJob( Job $job ) { + return $this->get( $job->getType() )->deduplicateRootJob( $job ); + } + + /** + * Wait for any slaves or backup queue servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + * @throws MWException + */ + public function waitForBackups() { + global $wgJobTypeConf; + + wfProfileIn( __METHOD__ ); + // Try to avoid doing this more than once per queue storage medium + foreach ( $wgJobTypeConf as $type => $conf ) { + $this->get( $type )->waitForBackups(); + } + wfProfileOut( __METHOD__ ); + } + + /** + * Get the list of queue types + * + * @return array List of strings + */ + public function getQueueTypes() { + return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); + } + + /** + * Get the list of default queue types + * + * @return array List of strings + */ + public function getDefaultQueueTypes() { + global $wgJobTypesExcludedFromDefaultQueue; + + return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); + } + + /** + * Get the list of job types that have non-empty queues + * + * @return Array List of job types that have non-empty queues + */ + public function getQueuesWithJobs() { + $types = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); + if ( is_array( $nonEmpty ) ) { // batching features supported + $types = array_merge( $types, $nonEmpty ); + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + } + } + return $types; + } + + /** + * Get the size of the queus for a list of job types + * + * @return Array Map of (job type => size) + */ + public function getQueueSizes() { + $sizeMap = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); + if ( is_array( $sizes ) ) { // batching features supported + $sizeMap = $sizeMap + $sizes; + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + $sizeMap[$type] = $this->get( $type )->getSize(); + } + } + } + return $sizeMap; + } + + /** + * @return array + */ + protected function getCoalescedQueues() { + global $wgJobTypeConf; + + if ( $this->coalescedQueues === null ) { + $this->coalescedQueues = array(); + foreach ( $wgJobTypeConf as $type => $conf ) { + $queue = JobQueue::factory( + array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); + $loc = $queue->getCoalesceLocationInternal(); + if ( !isset( $this->coalescedQueues[$loc] ) ) { + $this->coalescedQueues[$loc]['queue'] = $queue; + $this->coalescedQueues[$loc]['types'] = array(); + } + if ( $type === 'default' ) { + $this->coalescedQueues[$loc]['types'] = array_merge( + $this->coalescedQueues[$loc]['types'], + array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) + ); + } else { + $this->coalescedQueues[$loc]['types'][] = $type; + } + } + } + + return $this->coalescedQueues; + } + + /** + * Check if jobs should not be popped of a queue right now. + * This is only used for performance, such as to avoid spamming + * the queue with many sub-jobs before they actually get run. + * + * @param $type string + * @return bool + */ + public function isQueueDeprioritized( $type ) { + if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) { + return $this->cache->get( 'isDeprioritized', $type ); + } + if ( $type === 'refreshLinks2' ) { + // Don't keep converting refreshLinks2 => refreshLinks jobs if the + // later jobs have not been done yet. This helps throttle queue spam. + $deprioritized = !$this->get( 'refreshLinks' )->isEmpty(); + $this->cache->set( 'isDeprioritized', $type, $deprioritized ); + return $deprioritized; + } + return false; + } + + /** + * Execute any due periodic queue maintenance tasks for all queues. + * + * A task is "due" if the time ellapsed since the last run is greater than + * the defined run period. Concurrent calls to this function will cause tasks + * to be attempted twice, so they may need their own methods of mutual exclusion. + * + * @return integer Number of tasks run + */ + public function executeReadyPeriodicTasks() { + global $wgMemc; + + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); + $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) + + $count = 0; + $tasksRun = array(); // (queue => task => UNIX timestamp) + foreach ( $this->getQueueTypes() as $type ) { + $queue = $this->get( $type ); + foreach ( $queue->getPeriodicTasks() as $task => $definition ) { + if ( $definition['period'] <= 0 ) { + continue; // disabled + } elseif ( !isset( $lastRuns[$type][$task] ) + || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) + { + try { + if ( call_user_func( $definition['callback'] ) !== null ) { + $tasksRun[$type][$task] = time(); + ++$count; + } + } catch ( JobQueueError $e ) { + MWExceptionHandler::logException( $e ); + } + } + } + } + + $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) { + if ( is_array( $lastRuns ) ) { + foreach ( $tasksRun as $type => $tasks ) { + foreach ( $tasks as $task => $timestamp ) { + if ( !isset( $lastRuns[$type][$task] ) + || $timestamp > $lastRuns[$type][$task] ) + { + $lastRuns[$type][$task] = $timestamp; + } + } + } + } else { + $lastRuns = $tasksRun; + } + return $lastRuns; + } ); + + return $count; + } + + /** + * @param $name string + * @return mixed + */ + private function getCachedConfigVar( $name ) { + global $wgConf, $wgMemc; + + if ( $this->wiki === wfWikiID() ) { + return $GLOBALS[$name]; // common case + } else { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name ); + $value = $wgMemc->get( $key ); // ('v' => ...) or false + if ( is_array( $value ) ) { + return $value['v']; + } else { + $value = $wgConf->getConfig( $this->wiki, $name ); + $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) ); + return $value; + } + } + } +} diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php new file mode 100644 index 00000000..378e1755 --- /dev/null +++ b/includes/job/JobQueueRedis.php @@ -0,0 +1,856 @@ +<?php +/** + * Redis-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle job queues stored in Redis + * + * This is faster, less resource intensive, queue that JobQueueDB. + * All data for a queue using this class is placed into one redis server. + * + * There are eight main redis keys used to track jobs: + * - l-unclaimed : A list of job IDs used for ready unclaimed jobs + * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries + * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs + * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs + * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication + * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication + * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries + * - h-data : A hash of (job ID => serialized blobs) for job storage + * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. + * If an ID appears in any of those lists, it should have a h-data entry for its ID. + * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then + * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById + * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its + * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. + * + * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. + * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. + * All the keys are prefixed with the relevant wiki ID information. + * + * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. + * Additionally, it should be noted that redis has different persistence modes, such + * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be + * made on the servers based on what queues are using it and what tolerance they have. + * + * @ingroup JobQueue + * @ingroup Redis + * @since 1.22 + */ +class JobQueueRedis extends JobQueue { + /** @var RedisConnectionPool */ + protected $redisPool; + + protected $server; // string; server address + protected $compression; // string; compression method to use + + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) + + protected $key; // string; key to prefix the queue keys with (used for testing) + + /** + * @params include: + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * Note that the serializer option is ignored "none" is always used. + * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. + * - compression : The type of compression to use; one of (none,gzip). + * @param array $params + */ + public function __construct( array $params ) { + parent::__construct( $params ); + $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua + $this->server = $params['redisServer']; + $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + } + + protected function supportedOrders() { + return array( 'timestamp', 'fifo' ); + } + + protected function optimalOrder() { + return 'fifo'; + } + + protected function supportsDelayedJobs() { + return true; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + * @throws MWException + */ + protected function doIsEmpty() { + return $this->doGetSize() == 0; + } + + /** + * @see JobQueue::doGetSize() + * @return integer + * @throws MWException + */ + protected function doGetSize() { + $conn = $this->getConnection(); + try { + return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return integer + * @throws MWException + */ + protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + $conn = $this->getConnection(); + try { + $conn->multi( Redis::PIPELINE ); + $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); + $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + return array_sum( $conn->exec() ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetDelayedCount() + * @return integer + * @throws MWException + */ + protected function doGetDelayedCount() { + if ( !$this->checkDelay ) { + return 0; // no delayed jobs + } + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return integer + * @throws MWException + */ + protected function doGetAbandonedCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doBatchPush() + * @param array $jobs + * @param $flags + * @return bool + * @throws MWException + */ + protected function doBatchPush( array $jobs, $flags ) { + // Convert the jobs into field maps (de-duplicated against each other) + $items = array(); // (job ID => job fields map) + foreach ( $jobs as $job ) { + $item = $this->getNewJobFields( $job ); + if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate + $items[$item['sha1']] = $item; + } else { + $items[$item['uuid']] = $item; + } + } + + if ( !count( $items ) ) { + return true; // nothing to do + } + + $conn = $this->getConnection(); + try { + // Actually push the non-duplicate jobs into the queue... + if ( $flags & self::QOS_ATOMIC ) { + $batches = array( $items ); // all or nothing + } else { + $batches = array_chunk( $items, 500 ); // avoid tying up the server + } + $failed = 0; + $pushed = 0; + foreach ( $batches as $itemBatch ) { + $added = $this->pushBlobs( $conn, $itemBatch ); + if ( is_int( $added ) ) { + $pushed += $added; + } else { + $failed += count( $itemBatch ); + } + } + if ( $failed > 0 ) { + wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); + return false; + } + JobQueue::incrStats( 'job-insert', $this->type, count( $items ) ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $items ) - $failed - $pushed ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return true; + } + + /** + * @param RedisConnRef $conn + * @param array $items List of results from JobQueueRedis::getNewJobFields() + * @return integer Number of jobs inserted (duplicates are ignored) + * @throws RedisException + */ + protected function pushBlobs( RedisConnRef $conn, array $items ) { + $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) + foreach ( $items as $item ) { + $args[] = (string)$item['uuid']; + $args[] = (string)$item['sha1']; + $args[] = (string)$item['rtimestamp']; + $args[] = (string)$this->serialize( $item ); + } + static $script = +<<<LUA + if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end + local pushed = 0 + for i = 1,#ARGV,4 do + local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] + if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then + if 1*rtimestamp > 0 then + -- Insert into delayed queue (release time as score) + redis.call('zAdd',KEYS[4],rtimestamp,id) + else + -- Insert into unclaimed queue + redis.call('lPush',KEYS[1],id) + end + if sha1 ~= '' then + redis.call('hSet',KEYS[2],id,sha1) + redis.call('hSet',KEYS[3],sha1,id) + end + redis.call('hSet',KEYS[5],id,blob) + pushed = pushed + 1 + end + end + return pushed +LUA; + return $conn->luaEval( $script, + array_merge( + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-delayed' ), # KEYS[4] + $this->getQueueKey( 'h-data' ), # KEYS[5] + ), + $args + ), + 5 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + * @throws MWException + */ + protected function doPop() { + $job = false; + + // Push ready delayed jobs into the queue every 10 jobs to spread the load. + // This is also done as a periodic task, but we don't want too much done at once. + if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { + $this->releaseReadyDelayedJobs(); + } + + $conn = $this->getConnection(); + try { + do { + if ( $this->claimTTL > 0 ) { + // Keep the claimed job list down for high-traffic queues + if ( mt_rand( 0, 99 ) == 0 ) { + $this->recycleAndDeleteStaleJobs(); + } + $blob = $this->popAndAcquireBlob( $conn ); + } else { + $blob = $this->popAndDeleteBlob( $conn ); + } + if ( $blob === false ) { + break; // no jobs; nothing to do + } + + JobQueue::incrStats( 'job-pop', $this->type ); + $item = $this->unserialize( $blob ); + if ( $item === false ) { + wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); + continue; + } + + // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed + $job = $this->getJobFromFields( $item ); // may be false + } while ( !$job ); // job may be false if invalid + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $job; + } + + /** + * @param RedisConnRef $conn + * @return array serialized string or false + * @throws RedisException + */ + protected function popAndDeleteBlob( RedisConnRef $conn ) { + static $script = +<<<LUA + -- Pop an item off the queue + local id = redis.call('rpop',KEYS[1]) + if not id then return false end + -- Get the job data and remove it + local item = redis.call('hGet',KEYS[4],id) + redis.call('hDel',KEYS[4],id) + -- Allow new duplicates of this job + local sha1 = redis.call('hGet',KEYS[2],id) + if sha1 then redis.call('hDel',KEYS[3],sha1) end + redis.call('hDel',KEYS[2],id) + -- Return the job data + return item +LUA; + return $conn->luaEval( $script, + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'h-data' ), # KEYS[4] + ), + 4 # number of first argument(s) that are keys + ); + } + + /** + * @param RedisConnRef $conn + * @return array serialized string or false + * @throws RedisException + */ + protected function popAndAcquireBlob( RedisConnRef $conn ) { + static $script = +<<<LUA + -- Pop an item off the queue + local id = redis.call('rPop',KEYS[1]) + if not id then return false end + -- Allow new duplicates of this job + local sha1 = redis.call('hGet',KEYS[2],id) + if sha1 then redis.call('hDel',KEYS[3],sha1) end + redis.call('hDel',KEYS[2],id) + -- Mark the jobs as claimed and return it + redis.call('zAdd',KEYS[4],ARGV[1],id) + redis.call('hIncrBy',KEYS[5],id,1) + return redis.call('hGet',KEYS[6],id) +LUA; + return $conn->luaEval( $script, + array( + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-claimed' ), # KEYS[4] + $this->getQueueKey( 'h-attempts' ), # KEYS[5] + $this->getQueueKey( 'h-data' ), # KEYS[6] + time(), # ARGV[1] (injected to be replication-safe) + ), + 6 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @return Job|bool + * @throws MWException + */ + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['uuid'] ) ) { + throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); + } + if ( $this->claimTTL > 0 ) { + $conn = $this->getConnection(); + try { + static $script = +<<<LUA + -- Unmark the job as claimed + redis.call('zRem',KEYS[1],ARGV[1]) + redis.call('hDel',KEYS[2],ARGV[1]) + -- Delete the job data itself + return redis.call('hDel',KEYS[3],ARGV[1]) +LUA; + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $job->metadata['uuid'] # ARGV[1] + ), + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + return false; + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @return bool + * @throws MWException + */ + protected function doDeduplicateRootJob( Job $job ) { + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + + $conn = $this->getConnection(); + try { + $timestamp = $conn->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + // Update the timestamp of the last root job started at the location... + return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doIsRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $conn = $this->getConnection(); + try { + // Get the last time this root job was enqueued + $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @see JobQueue::doDelete() + * @return bool + */ + protected function doDelete() { + static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', + 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); + + $conn = $this->getConnection(); + try { + $keys = array(); + foreach ( $props as $prop ) { + $keys[] = $this->getQueueKey( $prop ); + } + return ( $conn->delete( $keys ) !== false ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + return new MappedIterator( + $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), + function( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { return is_object( $job ); } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllDelayedJobs() { + $conn = $this->getConnection(); + try { + $that = $this; + return new MappedIterator( // delayed jobs + $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), + function( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + }, + array( 'accept' => function ( $job ) { return is_object( $job ); } ) + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + public function getCoalesceLocationInternal() { + return "RedisServer:" . $this->server; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $sizes = array(); // (type => size) + $types = array_values( $types ); // reindex + try { + $conn = $this->getConnection(); + $conn->multi( Redis::PIPELINE ); + foreach ( $types as $type ) { + $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); + } + $res = $conn->exec(); + if ( is_array( $res ) ) { + foreach ( $res as $i => $size ) { + $sizes[$types[$i]] = $size; + } + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + return $sizes; + } + + /** + * This function should not be called outside JobQueueRedis + * + * @param $uid string + * @param $conn RedisConnRef + * @return Job|bool Returns false if the job does not exist + * @throws MWException + */ + public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { + try { + $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); + if ( $data === false ) { + return false; // not found + } + $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); + if ( !is_array( $item ) ) { // this shouldn't happen + throw new MWException( "Could not find job with ID '$uid'." ); + } + $title = Title::makeTitle( $item['namespace'], $item['title'] ); + $job = Job::factory( $item['type'], $title, $item['params'] ); + $job->metadata['uuid'] = $item['uuid']; + return $job; + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * Release any ready delayed jobs into the queue + * + * @return integer Number of jobs released + * @throws MWException + */ + public function releaseReadyDelayedJobs() { + $count = 0; + + $conn = $this->getConnection(); + try { + static $script = +<<<LUA + -- Get the list of ready delayed jobs, sorted by readiness + local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1]) + -- Migrate the jobs from the "delayed" set to the "unclaimed" list + for k,id in ipairs(ids) do + redis.call('lPush',KEYS[2],id) + redis.call('zRem',KEYS[1],id) + end + return #ids +LUA; + $count += (int)$conn->luaEval( $script, + array( + $this->getQueueKey( 'z-delayed' ), // KEYS[1] + $this->getQueueKey( 'l-unclaimed' ), // KEYS[2] + time() // ARGV[1]; max "delay until" UNIX timestamp + ), + 2 # first two arguments are keys + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $count; + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted + * @throws MWException + */ + public function recycleAndDeleteStaleJobs() { + if ( $this->claimTTL <= 0 ) { // sanity + throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." ); + } + $count = 0; + // For each job item that can be retried, we need to add it back to the + // main queue and remove it from the list of currenty claimed job items. + // For those that cannot, they are marked as dead and kept around for + // investigation and manual job restoration but are eventually deleted. + $conn = $this->getConnection(); + try { + $now = time(); + static $script = +<<<LUA + local released,abandoned,pruned = 0,0,0 + -- Get all non-dead jobs that have an expired claim on them. + -- The score for each item is the last claim timestamp (UNIX). + local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1]) + for k,id in ipairs(staleClaims) do + local timestamp = redis.call('zScore',KEYS[1],id) + local attempts = redis.call('hGet',KEYS[2],id) + if attempts < ARGV[3] then + -- Claim expired and retries left: re-enqueue the job + redis.call('lPush',KEYS[3],id) + redis.call('hIncrBy',KEYS[2],id,1) + released = released + 1 + else + -- Claim expired and no retries left: mark the job as dead + redis.call('zAdd',KEYS[5],timestamp,id) + abandoned = abandoned + 1 + end + redis.call('zRem',KEYS[1],id) + end + -- Get all of the dead jobs that have been marked as dead for too long. + -- The score for each item is the last claim timestamp (UNIX). + local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2]) + for k,id in ipairs(deadClaims) do + -- Stale and out of retries: remove any traces of the job + redis.call('zRem',KEYS[5],id) + redis.call('hDel',KEYS[2],id) + redis.call('hDel',KEYS[4],id) + pruned = pruned + 1 + end + return {released,abandoned,pruned} +LUA; + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] + $this->getQueueKey( 'h-data' ), # KEYS[4] + $this->getQueueKey( 'z-abandoned' ), # KEYS[5] + $now - $this->claimTTL, # ARGV[1] + $now - self::MAX_AGE_PRUNE, # ARGV[2] + $this->maxTries # ARGV[3] + ), + 5 # number of first argument(s) that are keys + ); + if ( $res ) { + list( $released, $abandoned, $pruned ) = $res; + $count += $released + $pruned; + JobQueue::incrStats( 'job-recycle', $this->type, $released ); + JobQueue::incrStats( 'job-abandon', $this->type, $abandoned ); + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $count; + } + + /** + * @return Array + */ + protected function doGetPeriodicTasks() { + $tasks = array(); + if ( $this->claimTTL > 0 ) { + $tasks['recycleAndDeleteStaleJobs'] = array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) + ); + } + if ( $this->checkDelay ) { + $tasks['releaseReadyDelayedJobs'] = array( + 'callback' => array( $this, 'releaseReadyDelayedJobs' ), + 'period' => 300 // 5 minutes + ); + } + return $tasks; + } + + /** + * @param $job Job + * @return array + */ + protected function getNewJobFields( Job $job ) { + return array( + // Fields that describe the nature of the job + 'type' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getDBkey(), + 'params' => $job->getParams(), + // Some jobs cannot run until a "release timestamp" + 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, + // Additional job metadata + 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), + 'sha1' => $job->ignoreDuplicates() + ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) + : '', + 'timestamp' => time() // UNIX timestamp + ); + } + + /** + * @param $fields array + * @return Job|bool + */ + protected function getJobFromFields( array $fields ) { + $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); + if ( $title ) { + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['uuid'] = $fields['uuid']; + return $job; + } + return false; + } + + /** + * @param array $fields + * @return string Serialized and possibly compressed version of $fields + */ + protected function serialize( array $fields ) { + $blob = serialize( $fields ); + if ( $this->compression === 'gzip' + && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) ) + { + $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); + $blobz = serialize( $object ); + return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; + } else { + return $blob; + } + } + + /** + * @param string $blob + * @return array|bool Unserialized version of $blob or false + */ + protected function unserialize( $blob ) { + $fields = unserialize( $blob ); + if ( is_object( $fields ) ) { + if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { + $fields = unserialize( gzinflate( $fields->blob ) ); + } else { + $fields = false; + } + } + return is_array( $fields ) ? $fields : false; + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return Array (server name, Redis instance) + * @throws MWException + */ + protected function getConnection() { + $conn = $this->redisPool->getConnection( $this->server ); + if ( !$conn ) { + throw new JobQueueConnectionError( "Unable to connect to redis server." ); + } + return $conn; + } + + /** + * @param $server string + * @param $conn RedisConnRef + * @param $e RedisException + * @throws MWException + */ + protected function throwRedisException( $server, RedisConnRef $conn, $e ) { + $this->redisPool->handleException( $server, $conn, $e ); + throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); + } + + /** + * @param $prop string + * @param $type string|null + * @return string + */ + private function getQueueKey( $prop, $type = null ) { + $type = is_string( $type ) ? $type : $this->type; + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + if ( strlen( $this->key ) ) { // namespaced queue (for testing) + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); + } else { + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); + } + } + + /** + * @param $key string + * @return void + */ + public function setTestingPrefix( $key ) { + $this->key = $key; + } +} diff --git a/includes/job/README b/includes/job/README new file mode 100644 index 00000000..c11d5a78 --- /dev/null +++ b/includes/job/README @@ -0,0 +1,81 @@ +/*! +\ingroup JobQueue +\page jobqueue_design Job queue design + +Notes on the Job queuing system architecture. + +\section intro Introduction + +The data model consist of the following main components: +* The Job object represents a particular deferred task that happens in the + background. All jobs subclass the Job object and put the main logic in the + function called run(). +* The JobQueue object represents a particular queue of jobs of a certain type. + For example there may be a queue for email jobs and a queue for squid purge + jobs. + +\section jobqueue Job queues + +Each job type has its own queue and is associated to a storage medium. One +queue might save its jobs in redis while another one uses would use a database. + +Storage medium are defined in a queue class. Before using it, you must +define in $wgJobTypeConf a mapping of the job type to a queue class. + +The factory class JobQueueGroup provides helper functions: +- getting the queue for a given job +- route new job insertions to the proper queue + +The following queue classes are available: +* JobQueueDB (stores jobs in the `job` table in a database) +* JobQueueRedis (stores jobs in a redis server) + +All queue classes support some basic operations (though some may be no-ops): +* enqueueing a batch of jobs +* dequeueing a single job +* acknowledging a job is completed +* checking if the queue is empty + +Some queue classes (like JobQueueDB) may dequeue jobs in random order while other +queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs +are executed in FIFO order. + +Also note that not all queue classes will have the same reliability guarantees. +In-memory queues may lose data when restarted depending on snapshot and journal +settings (including journal fsync() frequency). Some queue types may totally remove +jobs when dequeued while leaving the ack() function as a no-op; if a job is +dequeued by a job runner, which crashes before completion, the job will be +lost. Some jobs, like purging squid caches after a template change, may not +require durable queues, whereas other jobs might be more important. + +\section aggregator Job queue aggregator + +The aggregators are used by nextJobDB.php, which is a script that will return a +random ready queue (on any wiki in the farm) that can be used with runJobs.php. +This can be used in conjunction with any scripts that handle wiki farm job queues. +Note that $wgLocalDatabases defines what wikis are in the wiki farm. + +Since each job type has its own queue, and wiki-farms may have many wikis, +there might be a large number of queues to keep track of. To avoid wasting +large amounts of time polling empty queues, aggregators exists to keep track +of which queues are ready. + +The following queue aggregator classes are available: +* JobQueueAggregatorMemc (uses $wgMemc to track ready queues) +* JobQueueAggregatorRedis (uses a redis server to track ready queues) + +Some aggregators cache data for a few minutes while others may be always up to date. +This can be an important factor for jobs that need a low pickup time (or latency). + +\section jobs Jobs + +Callers should also try to make jobs maintain correctness when executed twice. +This is useful for queues that actually implement ack(), since they may recycle +dequeued but un-acknowledged jobs back into the queue to be attempted again. If +a runner dequeues a job, runs it, but then crashes before calling ack(), the +job may be returned to the queue and run a second time. Jobs like cache purging can +happen several times without any correctness problems. However, a pathological case +would be if a bug causes the problem to systematically keep repeating. For example, +a job may always throw a DB error at the end of run(). This problem is trickier to +solve and more obnoxious for things like email jobs, for example. For such jobs, +it might be useful to use a queue that does not retry jobs. diff --git a/includes/job/RefreshLinksJob.php b/includes/job/RefreshLinksJob.php deleted file mode 100644 index b23951c6..00000000 --- a/includes/job/RefreshLinksJob.php +++ /dev/null @@ -1,202 +0,0 @@ -<?php -/** - * Job to update links for a given title. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - * http://www.gnu.org/copyleft/gpl.html - * - * @file - * @ingroup JobQueue - */ - -/** - * Background job to update links for a given title. - * - * @ingroup JobQueue - */ -class RefreshLinksJob extends Job { - - function __construct( $title, $params = '', $id = 0 ) { - parent::__construct( 'refreshLinks', $title, $params, $id ); - } - - /** - * Run a refreshLinks job - * @return boolean success - */ - function run() { - wfProfileIn( __METHOD__ ); - - $linkCache = LinkCache::singleton(); - $linkCache->clear(); - - if ( is_null( $this->title ) ) { - $this->error = "refreshLinks: Invalid title"; - wfProfileOut( __METHOD__ ); - return false; - } - - # Wait for the DB of the current/next slave DB handle to catch up to the master. - # This way, we get the correct page_latest for templates or files that just changed - # milliseconds ago, having triggered this job to begin with. - if ( isset( $this->params['masterPos'] ) ) { - wfGetLB()->waitFor( $this->params['masterPos'] ); - } - - $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL ); - if ( !$revision ) { - $this->error = 'refreshLinks: Article not found "' . - $this->title->getPrefixedDBkey() . '"'; - wfProfileOut( __METHOD__ ); - return false; // XXX: what if it was just deleted? - } - - self::runForTitleInternal( $this->title, $revision, __METHOD__ ); - - wfProfileOut( __METHOD__ ); - return true; - } - - public static function runForTitleInternal( Title $title, Revision $revision, $fname ) { - global $wgParser, $wgContLang; - - wfProfileIn( $fname . '-parse' ); - $options = ParserOptions::newFromUserAndLang( new User, $wgContLang ); - $parserOutput = $wgParser->parse( - $revision->getText(), $title, $options, true, true, $revision->getId() ); - wfProfileOut( $fname . '-parse' ); - - wfProfileIn( $fname . '-update' ); - $updates = $parserOutput->getSecondaryDataUpdates( $title, false ); - DataUpdate::runUpdates( $updates ); - wfProfileOut( $fname . '-update' ); - } -} - -/** - * Background job to update links for a given title. - * Newer version for high use templates. - * - * @ingroup JobQueue - */ -class RefreshLinksJob2 extends Job { - const MAX_TITLES_RUN = 10; - - function __construct( $title, $params, $id = 0 ) { - parent::__construct( 'refreshLinks2', $title, $params, $id ); - } - - /** - * Run a refreshLinks2 job - * @return boolean success - */ - function run() { - wfProfileIn( __METHOD__ ); - - $linkCache = LinkCache::singleton(); - $linkCache->clear(); - - if ( is_null( $this->title ) ) { - $this->error = "refreshLinks2: Invalid title"; - wfProfileOut( __METHOD__ ); - return false; - } elseif ( !isset( $this->params['start'] ) || !isset( $this->params['end'] ) ) { - $this->error = "refreshLinks2: Invalid params"; - wfProfileOut( __METHOD__ ); - return false; - } - - // Back compat for pre-r94435 jobs - $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks'; - - // Avoid slave lag when fetching templates - if ( isset( $this->params['masterPos'] ) ) { - $masterPos = $this->params['masterPos']; - } elseif ( wfGetLB()->getServerCount() > 1 ) { - $masterPos = wfGetLB()->getMasterPos(); - } else { - $masterPos = false; - } - - $titles = $this->title->getBacklinkCache()->getLinks( - $table, $this->params['start'], $this->params['end'] ); - - if ( $titles->count() > self::MAX_TITLES_RUN ) { - # We don't want to parse too many pages per job as it can starve other jobs. - # If there are too many pages to parse, break this up into smaller jobs. By passing - # in the master position here we can cut down on the time spent waiting for slaves to - # catch up by the runners handling these jobs since time will have passed between now - # and when they pop these jobs off the queue. - $start = 0; // batch start - $end = 0; // batch end - $bsize = 0; // batch size - $first = true; // first of batch - $jobs = array(); - foreach ( $titles as $title ) { - $start = $first ? $title->getArticleId() : $start; - $end = $title->getArticleId(); - $first = false; - if ( ++$bsize >= self::MAX_TITLES_RUN ) { - $jobs[] = new RefreshLinksJob2( $this->title, array( - 'table' => $table, - 'start' => $start, - 'end' => $end, - 'masterPos' => $masterPos - ) ); - $first = true; - $start = $end = $bsize = 0; - } - } - if ( $bsize > 0 ) { // group remaining pages into a job - $jobs[] = new RefreshLinksJob2( $this->title, array( - 'table' => $table, - 'start' => $start, - 'end' => $end, - 'masterPos' => $masterPos - ) ); - } - Job::batchInsert( $jobs ); - } elseif ( php_sapi_name() != 'cli' ) { - # Not suitable for page load triggered job running! - # Gracefully switch to refreshLinks jobs if this happens. - $jobs = array(); - foreach ( $titles as $title ) { - $jobs[] = new RefreshLinksJob( $title, array( 'masterPos' => $masterPos ) ); - } - Job::batchInsert( $jobs ); - } else { - # Wait for the DB of the current/next slave DB handle to catch up to the master. - # This way, we get the correct page_latest for templates or files that just changed - # milliseconds ago, having triggered this job to begin with. - if ( $masterPos ) { - wfGetLB()->waitFor( $masterPos ); - } - # Re-parse each page that transcludes this page and update their tracking links... - foreach ( $titles as $title ) { - $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL ); - if ( !$revision ) { - $this->error = 'refreshLinks: Article not found "' . - $title->getPrefixedDBkey() . '"'; - continue; // skip this page - } - RefreshLinksJob::runForTitleInternal( $title, $revision, __METHOD__ ); - wfWaitForSlaves(); - } - } - - wfProfileOut( __METHOD__ ); - return true; - } -} diff --git a/includes/job/aggregator/JobQueueAggregator.php b/includes/job/aggregator/JobQueueAggregator.php new file mode 100644 index 00000000..a8186abd --- /dev/null +++ b/includes/job/aggregator/JobQueueAggregator.php @@ -0,0 +1,156 @@ +<?php +/** + * Job queue aggregator code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle tracking information about all queues + * + * @ingroup JobQueue + * @since 1.21 + */ +abstract class JobQueueAggregator { + /** @var JobQueueAggregator */ + protected static $instance = null; + + /** + * @param array $params + */ + protected function __construct( array $params ) {} + + /** + * @return JobQueueAggregator + */ + final public static function singleton() { + global $wgJobQueueAggregator; + + if ( !isset( self::$instance ) ) { + $class = $wgJobQueueAggregator['class']; + $obj = new $class( $wgJobQueueAggregator ); + if ( !( $obj instanceof JobQueueAggregator ) ) { + throw new MWException( "Class '$class' is not a JobQueueAggregator class." ); + } + self::$instance = $obj; + } + + return self::$instance; + } + + /** + * Destroy the singleton instance + * + * @return void + */ + final public static function destroySingleton() { + self::$instance = null; + } + + /** + * Mark a queue as being empty + * + * @param string $wiki + * @param string $type + * @return bool Success + */ + final public function notifyQueueEmpty( $wiki, $type ) { + wfProfileIn( __METHOD__ ); + $ok = $this->doNotifyQueueEmpty( $wiki, $type ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueEmpty() + */ + abstract protected function doNotifyQueueEmpty( $wiki, $type ); + + /** + * Mark a queue as being non-empty + * + * @param string $wiki + * @param string $type + * @return bool Success + */ + final public function notifyQueueNonEmpty( $wiki, $type ) { + wfProfileIn( __METHOD__ ); + $ok = $this->doNotifyQueueNonEmpty( $wiki, $type ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueNonEmpty() + */ + abstract protected function doNotifyQueueNonEmpty( $wiki, $type ); + + /** + * Get the list of all of the queues with jobs + * + * @return Array (job type => (list of wiki IDs)) + */ + final public function getAllReadyWikiQueues() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAllReadyWikiQueues(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueueAggregator::getAllReadyWikiQueues() + */ + abstract protected function doGetAllReadyWikiQueues(); + + /** + * Purge all of the aggregator information + * + * @return bool Success + */ + final public function purge() { + wfProfileIn( __METHOD__ ); + $res = $this->doPurge(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueueAggregator::purge() + */ + abstract protected function doPurge(); + + /** + * Get all databases that have a pending job. + * This poll all the queues and is this expensive. + * + * @return Array (job type => (list of wiki IDs)) + */ + protected function findPendingWikiQueues() { + global $wgLocalDatabases; + + $pendingDBs = array(); // (job type => (db list)) + foreach ( $wgLocalDatabases as $db ) { + foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) { + $pendingDBs[$type][] = $db; + } + } + + return $pendingDBs; + } +} diff --git a/includes/job/aggregator/JobQueueAggregatorMemc.php b/includes/job/aggregator/JobQueueAggregatorMemc.php new file mode 100644 index 00000000..9434da04 --- /dev/null +++ b/includes/job/aggregator/JobQueueAggregatorMemc.php @@ -0,0 +1,124 @@ +<?php +/** + * Job queue aggregator code that uses BagOStuff. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle tracking information about all queues using BagOStuff + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueAggregatorMemc extends JobQueueAggregator { + /** @var BagOStuff */ + protected $cache; + + protected $cacheTTL; // integer; seconds + + /** + * @params include: + * - objectCache : Name of an object cache registered in $wgObjectCaches. + * This defaults to the one specified by $wgMainCacheType. + * - cacheTTL : Seconds to cache the aggregate data before regenerating. + * @param array $params + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $this->cache = isset( $params['objectCache'] ) + ? wfGetCache( $params['objectCache'] ) + : wfGetMainCache(); + $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min + } + + /** + * @see JobQueueAggregator::doNotifyQueueEmpty() + */ + protected function doNotifyQueueEmpty( $wiki, $type ) { + $key = $this->getReadyQueueCacheKey(); + // Delist the queue from the "ready queue" list + if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock + $curInfo = $this->cache->get( $key ); + if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { + if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { + $curInfo['pendingDBs'][$type] = array_diff( + $curInfo['pendingDBs'][$type], array( $wiki ) ); + $this->cache->set( $key, $curInfo ); + } + } + $this->cache->delete( "$key:lock" ); // unlock + } + return true; + } + + /** + * @see JobQueueAggregator::doNotifyQueueNonEmpty() + */ + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + return true; // updated periodically + } + + /** + * @see JobQueueAggregator::doAllGetReadyWikiQueues() + */ + protected function doGetAllReadyWikiQueues() { + $key = $this->getReadyQueueCacheKey(); + // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, + // regenerate the cache. Use any available stale cache if another process is + // currently regenerating the pending DB information. + $pendingDbInfo = $this->cache->get( $key ); + if ( !is_array( $pendingDbInfo ) + || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL + || mt_rand( 0, 999 ) == 0 + ) { + if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock + $pendingDbInfo = array( + 'pendingDBs' => $this->findPendingWikiQueues(), + 'timestamp' => time() + ); + for ( $attempts = 1; $attempts <= 25; ++$attempts ) { + if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock + $this->cache->set( $key, $pendingDbInfo ); + $this->cache->delete( "$key:lock" ); // unlock + break; + } + } + $this->cache->delete( "$key:rebuild" ); // unlock + } + } + return is_array( $pendingDbInfo ) + ? $pendingDbInfo['pendingDBs'] + : array(); // cache is both empty and locked + } + + /** + * @see JobQueueAggregator::doPurge() + */ + protected function doPurge() { + return $this->cache->delete( $this->getReadyQueueCacheKey() ); + } + + /** + * @return string + */ + private function getReadyQueueCacheKey() { + return "jobqueue:aggregator:ready-queues:v1"; // global + } +} diff --git a/includes/job/aggregator/JobQueueAggregatorRedis.php b/includes/job/aggregator/JobQueueAggregatorRedis.php new file mode 100644 index 00000000..c6a799df --- /dev/null +++ b/includes/job/aggregator/JobQueueAggregatorRedis.php @@ -0,0 +1,193 @@ +<?php +/** + * Job queue aggregator code that uses PhpRedis. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Class to handle tracking information about all queues using PhpRedis + * + * @ingroup JobQueue + * @ingroup Redis + * @since 1.21 + */ +class JobQueueAggregatorRedis extends JobQueueAggregator { + /** @var RedisConnectionPool */ + protected $redisPool; + + /** + * @params include: + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. + * @param array $params + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $this->server = $params['redisServer']; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + } + + /** + * @see JobQueueAggregator::doNotifyQueueEmpty() + */ + protected function doNotifyQueueEmpty( $wiki, $type ) { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); + return true; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return false; + } + } + + /** + * @see JobQueueAggregator::doNotifyQueueNonEmpty() + */ + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); + return true; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return false; + } + } + + /** + * @see JobQueueAggregator::doAllGetReadyWikiQueues() + */ + protected function doGetAllReadyWikiQueues() { + $conn = $this->getConnection(); + if ( !$conn ) { + return array(); + } + try { + $conn->multi( Redis::PIPELINE ); + $conn->exists( $this->getReadyQueueKey() ); + $conn->hGetAll( $this->getReadyQueueKey() ); + list( $exists, $map ) = $conn->exec(); + + if ( $exists ) { // cache hit + $pendingDBs = array(); // (type => list of wikis) + foreach ( $map as $key => $time ) { + list( $type, $wiki ) = $this->dencQueueName( $key ); + $pendingDBs[$type][] = $wiki; + } + } else { // cache miss + // Avoid duplicated effort + $conn->multi( Redis::MULTI ); + $conn->setnx( $this->getReadyQueueKey() . ":lock", 1 ); + $conn->expire( $this->getReadyQueueKey() . ":lock", 3600 ); + if ( $conn->exec() !== array( true, true ) ) { // lock + return array(); // already in progress + } + + $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) + + $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock + + $now = time(); + $map = array(); + foreach ( $pendingDBs as $type => $wikis ) { + foreach ( $wikis as $wiki ) { + $map[$this->encQueueName( $type, $wiki )] = $now; + } + } + $conn->hMSet( $this->getReadyQueueKey(), $map ); + } + + return $pendingDBs; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return array(); + } + } + + /** + * @see JobQueueAggregator::doPurge() + */ + protected function doPurge() { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->delete( $this->getReadyQueueKey() ); + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return false; + } + return true; + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return Array (server name, Redis instance) + * @throws MWException + */ + protected function getConnection() { + return $this->redisPool->getConnection( $this->server ); + } + + /** + * @param RedisConnRef $conn + * @param RedisException $e + * @return void + */ + protected function handleException( RedisConnRef $conn, $e ) { + $this->redisPool->handleException( $this->server, $conn, $e ); + } + + /** + * @return string + */ + private function getReadyQueueKey() { + return "jobqueue:aggregator:h-ready-queues:v1"; // global + } + + /** + * @param string $type + * @param string $wiki + * @return string + */ + private function encQueueName( $type, $wiki ) { + return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); + } + + /** + * @param string $name + * @return string + */ + private function dencQueueName( $name ) { + list( $type, $wiki ) = explode( '/', $name, 2 ); + return array( rawurldecode( $type ), rawurldecode( $wiki ) ); + } +} diff --git a/includes/job/jobs/AssembleUploadChunksJob.php b/includes/job/jobs/AssembleUploadChunksJob.php new file mode 100644 index 00000000..6237e568 --- /dev/null +++ b/includes/job/jobs/AssembleUploadChunksJob.php @@ -0,0 +1,127 @@ +<?php +/** + * Assemble the segments of a chunked upload. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Upload + */ + +/** + * Assemble the segments of a chunked upload. + * + * @ingroup Upload + */ +class AssembleUploadChunksJob extends Job { + public function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'AssembleUploadChunks', $title, $params, $id ); + $this->removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); + try { + $user = $context->getUser(); + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + return false; + } + + if ( count( $_SESSION ) === 0 ) { + // Empty session probably indicates that we didn't associate + // with the session correctly. Note that being able to load + // the user does not necessarily mean the session was loaded. + // Most likely cause by suhosin.session.encrypt = On. + $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" ); + return false; + } + + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ) + ); + + $upload = new UploadFromChunks( $user ); + $upload->continueChunks( + $this->params['filename'], + $this->params['filekey'], + $context->getRequest() + ); + + // Combine all of the chunks into a local file and upload that to a new stash file + $status = $upload->concatenateChunks(); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ) + ); + $this->setLastError( $status->getWikiText() ); + return false; + } + + // We have a new filekey for the fully concatenated file + $newFileKey = $upload->getLocalFile()->getFileKey(); + + // Remove the old stash file row and first chunk file + $upload->stash->removeFileNoAuth( $this->params['filekey'] ); + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Success', + 'stage' => 'assembling', + 'filekey' => $newFileKey, + 'imageinfo' => $imageInfo, + 'status' => Status::newGood() + ) + ); + } catch ( MWException $e ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Failure', + 'stage' => 'assembling', + 'status' => Status::newFatal( 'api-error-stashfailed' ) + ) + ); + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); + return false; + } + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); + } + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/includes/job/DoubleRedirectJob.php b/includes/job/jobs/DoubleRedirectJob.php index 08af9975..33e749b8 100644 --- a/includes/job/DoubleRedirectJob.php +++ b/includes/job/jobs/DoubleRedirectJob.php @@ -27,7 +27,7 @@ * @ingroup JobQueue */ class DoubleRedirectJob extends Job { - var $reason, $redirTitle, $destTitleText; + var $reason, $redirTitle; /** * @var User @@ -36,9 +36,9 @@ class DoubleRedirectJob extends Job { /** * Insert jobs into the job queue to fix redirects to the given title - * @param $reason String: the reason for the fix, see message "double-redirect-fixed-<reason>" + * @param string $reason the reason for the fix, see message "double-redirect-fixed-<reason>" * @param $redirTitle Title: the title which has changed, redirects pointing to this title are fixed - * @param $destTitle bool Not used + * @param bool $destTitle Not used */ public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) { # Need to use the master to get the redirect table updated in the same transaction @@ -66,18 +66,17 @@ class DoubleRedirectJob extends Job { 'redirTitle' => $redirTitle->getPrefixedDBkey() ) ); # Avoid excessive memory usage if ( count( $jobs ) > 10000 ) { - Job::batchInsert( $jobs ); + JobQueueGroup::singleton()->push( $jobs ); $jobs = array(); } } - Job::batchInsert( $jobs ); + JobQueueGroup::singleton()->push( $jobs ); } function __construct( $title, $params = false, $id = 0 ) { parent::__construct( 'fixDoubleRedirect', $title, $params, $id ); $this->reason = $params['reason']; $this->redirTitle = Title::newFromText( $params['redirTitle'] ); - $this->destTitleText = !empty( $params['destTitle'] ) ? $params['destTitle'] : ''; } /** @@ -91,60 +90,64 @@ class DoubleRedirectJob extends Job { $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST ); if ( !$targetRev ) { - wfDebug( __METHOD__.": target redirect already deleted, ignoring\n" ); + wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" ); return true; } - $text = $targetRev->getText(); - $currentDest = Title::newFromRedirect( $text ); + $content = $targetRev->getContent(); + $currentDest = $content ? $content->getRedirectTarget() : null; if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) { - wfDebug( __METHOD__.": Redirect has changed since the job was queued\n" ); + wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" ); return true; } - # Check for a suppression tag (used e.g. in periodically archived discussions) + // Check for a suppression tag (used e.g. in periodically archived discussions) $mw = MagicWord::get( 'staticredirect' ); - if ( $mw->match( $text ) ) { - wfDebug( __METHOD__.": skipping: suppressed with __STATICREDIRECT__\n" ); + if ( $content->matchMagicWord( $mw ) ) { + wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" ); return true; } - # Find the current final destination + // Find the current final destination $newTitle = self::getFinalDestination( $this->redirTitle ); if ( !$newTitle ) { - wfDebug( __METHOD__.": skipping: single redirect, circular redirect or invalid redirect destination\n" ); + wfDebug( __METHOD__ . ": skipping: single redirect, circular redirect or invalid redirect destination\n" ); return true; } if ( $newTitle->equals( $this->redirTitle ) ) { - # The redirect is already right, no need to change it - # This can happen if the page was moved back (say after vandalism) - wfDebug( __METHOD__.": skipping, already good\n" ); + // The redirect is already right, no need to change it + // This can happen if the page was moved back (say after vandalism) + wfDebug( __METHOD__ . " : skipping, already good\n" ); } - # Preserve fragment (bug 14904) + // Preserve fragment (bug 14904) $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(), - $currentDest->getFragment() ); + $currentDest->getFragment(), $newTitle->getInterwiki() ); - # Fix the text - # Remember that redirect pages can have categories, templates, etc., - # so the regex has to be fairly general - $newText = preg_replace( '/ \[ \[ [^\]]* \] \] /x', - '[[' . $newTitle->getFullText() . ']]', - $text, 1 ); + // Fix the text + $newContent = $content->updateRedirect( $newTitle ); - if ( $newText === $text ) { - $this->setLastError( 'Text unchanged???' ); + if ( $newContent->equals( $content ) ) { + $this->setLastError( 'Content unchanged???' ); return false; } - # Save it + $user = $this->getUser(); + if ( !$user ) { + $this->setLastError( 'Invalid user' ); + return false; + } + + // Save it global $wgUser; $oldUser = $wgUser; - $wgUser = $this->getUser(); + $wgUser = $user; $article = WikiPage::factory( $this->title ); + + // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance $reason = wfMessage( 'double-redirect-fixed-' . $this->reason, $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText() )->inContentLanguage()->text(); - $article->doEdit( $newText, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $this->getUser() ); + $article->doEditContent( $newContent, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $user ); $wgUser = $oldUser; return true; @@ -160,7 +163,8 @@ class DoubleRedirectJob extends Job { public static function getFinalDestination( $title ) { $dbw = wfGetDB( DB_MASTER ); - $seenTitles = array(); # Circular redirect check + // Circular redirect check + $seenTitles = array(); $dest = false; while ( true ) { @@ -171,9 +175,17 @@ class DoubleRedirectJob extends Job { } $seenTitles[$titleText] = true; + if ( $title->getInterwiki() ) { + // If the target is interwiki, we have to break early (bug 40352). + // Otherwise it will look up a row in the local page table + // with the namespace/page of the interwiki target which can cause + // unexpected results (e.g. X -> foo:Bar -> Bar -> .. ) + break; + } + $row = $dbw->selectRow( array( 'redirect', 'page' ), - array( 'rd_namespace', 'rd_title' ), + array( 'rd_namespace', 'rd_title', 'rd_interwiki' ), array( 'rd_from=page_id', 'page_namespace' => $title->getNamespace(), @@ -183,7 +195,7 @@ class DoubleRedirectJob extends Job { # No redirect from here, chain terminates break; } else { - $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title ); + $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title, '', $row->rd_interwiki ); } } return $dest; @@ -191,17 +203,19 @@ class DoubleRedirectJob extends Job { /** * Get a user object for doing edits, from a request-lifetime cache - * @return User + * False will be returned if the user name specified in the + * 'double-redirect-fixer' message is invalid. + * + * @return User|bool */ function getUser() { if ( !self::$user ) { - self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text(), false ); - # FIXME: newFromName could return false on a badly configured wiki. - if ( !self::$user->isLoggedIn() ) { + self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text() ); + # User::newFromName() can return false on a badly configured wiki. + if ( self::$user && !self::$user->isLoggedIn() ) { self::$user->addToDatabase(); } } return self::$user; } } - diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php new file mode 100644 index 00000000..be1bfe5c --- /dev/null +++ b/includes/job/jobs/DuplicateJob.php @@ -0,0 +1,59 @@ +<?php +/** + * No-op job that does nothing. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +/** + * No-op job that does nothing. Used to represent duplicates. + * + * @ingroup JobQueue + */ +final class DuplicateJob extends Job { + /** + * Callers should use DuplicateJob::newFromJob() instead + * + * @param $title Title + * @param array $params job parameters + * @param $id Integer: job id + */ + function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'duplicate', $title, $params, $id ); + } + + /** + * Get a duplicate no-op version of a job + * + * @param Job $job + * @return Job + */ + public static function newFromJob( Job $job ) { + $djob = new self( $job->getTitle(), $job->getParams(), $job->id ); + $djob->command = $job->getType(); + $djob->params = is_array( $djob->params ) ? $djob->params : array(); + $djob->params = array( 'isDuplicate' => true ) + $djob->params; + $djob->metadata = $job->metadata; + return $djob; + } + + public function run() { + return true; + } +} diff --git a/includes/job/EmaillingJob.php b/includes/job/jobs/EmaillingJob.php index d3599882..9fbf3124 100644 --- a/includes/job/EmaillingJob.php +++ b/includes/job/jobs/EmaillingJob.php @@ -33,14 +33,15 @@ class EmaillingJob extends Job { } function run() { - UserMailer::send( + $status = UserMailer::send( $this->params['to'], $this->params['from'], $this->params['subj'], $this->params['body'], $this->params['replyto'] ); - return true; + + return $status->isOK(); } } diff --git a/includes/job/EnotifNotifyJob.php b/includes/job/jobs/EnotifNotifyJob.php index b4c925e9..bbe988d0 100644 --- a/includes/job/EnotifNotifyJob.php +++ b/includes/job/jobs/EnotifNotifyJob.php @@ -35,7 +35,7 @@ class EnotifNotifyJob extends Job { function run() { $enotif = new EmailNotification(); // Get the user from ID (rename safe). Anons are 0, so defer to name. - if( isset( $this->params['editorID'] ) && $this->params['editorID'] ) { + if ( isset( $this->params['editorID'] ) && $this->params['editorID'] ) { $editor = User::newFromId( $this->params['editorID'] ); // B/C, only the name might be given. } else { @@ -49,7 +49,8 @@ class EnotifNotifyJob extends Job { $this->params['summary'], $this->params['minorEdit'], $this->params['oldid'], - $this->params['watchers'] + $this->params['watchers'], + $this->params['pageStatus'] ); return true; } diff --git a/includes/job/jobs/HTMLCacheUpdateJob.php b/includes/job/jobs/HTMLCacheUpdateJob.php new file mode 100644 index 00000000..44c240bb --- /dev/null +++ b/includes/job/jobs/HTMLCacheUpdateJob.php @@ -0,0 +1,263 @@ +<?php +/** + * HTML cache invalidation of all pages linking to a given title. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +/** + * Job wrapper for HTMLCacheUpdate. Gets run whenever a related + * job gets called from the queue. + * + * This class is designed to work efficiently with small numbers of links, and + * to work reasonably well with up to ~10^5 links. Above ~10^6 links, the memory + * and time requirements of loading all backlinked IDs in doUpdate() might become + * prohibitive. The requirements measured at Wikimedia are approximately: + * + * memory: 48 bytes per row + * time: 16us per row for the query plus processing + * + * The reason this query is done is to support partitioning of the job + * by backlinked ID. The memory issue could be allieviated by doing this query in + * batches, but of course LIMIT with an offset is inefficient on the DB side. + * + * The class is nevertheless a vast improvement on the previous method of using + * File::getLinksTo() and Title::touchArray(), which uses about 2KB of memory per + * link. + * + * @ingroup JobQueue + */ +class HTMLCacheUpdateJob extends Job { + /** @var BacklinkCache */ + protected $blCache; + + protected $rowsPerJob, $rowsPerQuery; + + /** + * Construct a job + * @param $title Title: the title linked to + * @param array $params job parameters (table, start and end page_ids) + * @param $id Integer: job id + */ + function __construct( $title, $params, $id = 0 ) { + global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery; + + parent::__construct( 'htmlCacheUpdate', $title, $params, $id ); + + $this->rowsPerJob = $wgUpdateRowsPerJob; + $this->rowsPerQuery = $wgUpdateRowsPerQuery; + $this->blCache = $title->getBacklinkCache(); + } + + public function run() { + if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { + # This is hit when a job is actually performed + return $this->doPartialUpdate(); + } else { + # This is hit when the jobs have to be inserted + return $this->doFullUpdate(); + } + } + + /** + * Update all of the backlinks + */ + protected function doFullUpdate() { + global $wgMaxBacklinksInvalidate; + + # Get an estimate of the number of rows from the BacklinkCache + $max = max( $this->rowsPerJob * 2, $wgMaxBacklinksInvalidate ) + 1; + $numRows = $this->blCache->getNumLinks( $this->params['table'], $max ); + if ( $wgMaxBacklinksInvalidate !== false && $numRows > $wgMaxBacklinksInvalidate ) { + wfDebug( "Skipped HTML cache invalidation of {$this->title->getPrefixedText()}." ); + return true; + } + + if ( $numRows > $this->rowsPerJob * 2 ) { + # Do fast cached partition + $this->insertPartitionJobs(); + } else { + # Get the links from the DB + $titleArray = $this->blCache->getLinks( $this->params['table'] ); + # Check if the row count estimate was correct + if ( $titleArray->count() > $this->rowsPerJob * 2 ) { + # Not correct, do accurate partition + wfDebug( __METHOD__ . ": row count estimate was incorrect, repartitioning\n" ); + $this->insertJobsFromTitles( $titleArray ); + } else { + $this->invalidateTitles( $titleArray ); // just do the query + } + } + + return true; + } + + /** + * Update some of the backlinks, defined by a page ID range + */ + protected function doPartialUpdate() { + $titleArray = $this->blCache->getLinks( + $this->params['table'], $this->params['start'], $this->params['end'] ); + if ( $titleArray->count() <= $this->rowsPerJob * 2 ) { + # This partition is small enough, do the update + $this->invalidateTitles( $titleArray ); + } else { + # Partitioning was excessively inaccurate. Divide the job further. + # This can occur when a large number of links are added in a short + # period of time, say by updating a heavily-used template. + $this->insertJobsFromTitles( $titleArray ); + } + return true; + } + + /** + * Partition the current range given by $this->params['start'] and $this->params['end'], + * using a pre-calculated title array which gives the links in that range. + * Queue the resulting jobs. + * + * @param $titleArray array + * @param $rootJobParams array + * @return void + */ + protected function insertJobsFromTitles( $titleArray, $rootJobParams = array() ) { + // Carry over any "root job" information + $rootJobParams = $this->getRootJobParams(); + # We make subpartitions in the sense that the start of the first job + # will be the start of the parent partition, and the end of the last + # job will be the end of the parent partition. + $jobs = array(); + $start = $this->params['start']; # start of the current job + $numTitles = 0; + foreach ( $titleArray as $title ) { + $id = $title->getArticleID(); + # $numTitles is now the number of titles in the current job not + # including the current ID + if ( $numTitles >= $this->rowsPerJob ) { + # Add a job up to but not including the current ID + $jobs[] = new HTMLCacheUpdateJob( $this->title, + array( + 'table' => $this->params['table'], + 'start' => $start, + 'end' => $id - 1 + ) + $rootJobParams // carry over information for de-duplication + ); + $start = $id; + $numTitles = 0; + } + $numTitles++; + } + # Last job + $jobs[] = new HTMLCacheUpdateJob( $this->title, + array( + 'table' => $this->params['table'], + 'start' => $start, + 'end' => $this->params['end'] + ) + $rootJobParams // carry over information for de-duplication + ); + wfDebug( __METHOD__ . ": repartitioning into " . count( $jobs ) . " jobs\n" ); + + if ( count( $jobs ) < 2 ) { + # I don't think this is possible at present, but handling this case + # makes the code a bit more robust against future code updates and + # avoids a potential infinite loop of repartitioning + wfDebug( __METHOD__ . ": repartitioning failed!\n" ); + $this->invalidateTitles( $titleArray ); + } else { + JobQueueGroup::singleton()->push( $jobs ); + } + } + + /** + * @param $rootJobParams array + * @return void + */ + protected function insertPartitionJobs( $rootJobParams = array() ) { + // Carry over any "root job" information + $rootJobParams = $this->getRootJobParams(); + + $batches = $this->blCache->partition( $this->params['table'], $this->rowsPerJob ); + if ( !count( $batches ) ) { + return; // no jobs to insert + } + + $jobs = array(); + foreach ( $batches as $batch ) { + list( $start, $end ) = $batch; + $jobs[] = new HTMLCacheUpdateJob( $this->title, + array( + 'table' => $this->params['table'], + 'start' => $start, + 'end' => $end, + ) + $rootJobParams // carry over information for de-duplication + ); + } + + JobQueueGroup::singleton()->push( $jobs ); + } + + /** + * Invalidate an array (or iterator) of Title objects, right now + * @param $titleArray array + */ + protected function invalidateTitles( $titleArray ) { + global $wgUseFileCache, $wgUseSquid; + + $dbw = wfGetDB( DB_MASTER ); + $timestamp = $dbw->timestamp(); + + # Get all IDs in this query into an array + $ids = array(); + foreach ( $titleArray as $title ) { + $ids[] = $title->getArticleID(); + } + + if ( !$ids ) { + return; + } + + # Don't invalidated pages that were already invalidated + $touchedCond = isset( $this->params['rootJobTimestamp'] ) + ? array( "page_touched < " . + $dbw->addQuotes( $dbw->timestamp( $this->params['rootJobTimestamp'] ) ) ) + : array(); + + # Update page_touched + $batches = array_chunk( $ids, $this->rowsPerQuery ); + foreach ( $batches as $batch ) { + $dbw->update( 'page', + array( 'page_touched' => $timestamp ), + array( 'page_id' => $batch ) + $touchedCond, + __METHOD__ + ); + } + + # Update squid + if ( $wgUseSquid ) { + $u = SquidUpdate::newFromTitles( $titleArray ); + $u->doUpdate(); + } + + # Update file cache + if ( $wgUseFileCache ) { + foreach ( $titleArray as $title ) { + HTMLFileCache::clearFileCache( $title ); + } + } + } +} diff --git a/includes/job/jobs/NullJob.php b/includes/job/jobs/NullJob.php new file mode 100644 index 00000000..b6164a5d --- /dev/null +++ b/includes/job/jobs/NullJob.php @@ -0,0 +1,76 @@ +<?php +/** + * Degenerate job that does nothing. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +/** + * Degenerate job that does nothing, but can optionally replace itself + * in the queue and/or sleep for a brief time period. These can be used + * to represent "no-op" jobs or test lock contention and performance. + * + * @par Example: + * Inserting a null job in the configured job queue: + * @code + * $ php maintenance/eval.php + * > $queue = JobQueueGroup::singleton(); + * > $job = new NullJob( Title::newMainPage(), array( 'lives' => 10 ) ); + * > $queue->push( $job ); + * @endcode + * You can then confirm the job has been enqueued by using the showJobs.php + * maintenance utility: + * @code + * $ php maintenance/showJobs.php --group + * null: 1 queue; 0 claimed (0 active, 0 abandoned) + * $ + * @endcode + * + * @ingroup JobQueue + */ +class NullJob extends Job { + /** + * @param $title Title (can be anything) + * @param array $params job parameters (lives, usleep) + * @param $id Integer: job id + */ + function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'null', $title, $params, $id ); + if ( !isset( $this->params['lives'] ) ) { + $this->params['lives'] = 1; + } + if ( !isset( $this->params['usleep'] ) ) { + $this->params['usleep'] = 0; + } + $this->removeDuplicates = !empty( $this->params['removeDuplicates'] ); + } + + public function run() { + if ( $this->params['usleep'] > 0 ) { + usleep( $this->params['usleep'] ); + } + if ( $this->params['lives'] > 1 ) { + $params = $this->params; + $params['lives']--; + $job = new self( $this->title, $params ); + JobQueueGroup::singleton()->push( $job ); + } + return true; + } +} diff --git a/includes/job/jobs/PublishStashedFileJob.php b/includes/job/jobs/PublishStashedFileJob.php new file mode 100644 index 00000000..5a24f93c --- /dev/null +++ b/includes/job/jobs/PublishStashedFileJob.php @@ -0,0 +1,140 @@ +<?php +/** + * Upload a file from the upload stash into the local file repo. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Upload + */ + +/** + * Upload a file from the upload stash into the local file repo. + * + * @ingroup Upload + */ +class PublishStashedFileJob extends Job { + public function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'PublishStashedFile', $title, $params, $id ); + $this->removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); + try { + $user = $context->getUser(); + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + return false; + } + + if ( count( $_SESSION ) === 0 ) { + // Empty session probably indicates that we didn't associate + // with the session correctly. Note that being able to load + // the user does not necessarily mean the session was loaded. + // Most likely cause by suhosin.session.encrypt = On. + $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" ); + return false; + } + + + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ) + ); + + $upload = new UploadFromStash( $user ); + // @todo initialize() causes a GET, ideally we could frontload the antivirus + // checks and anything else to the stash stage (which includes concatenation and + // the local file is thus already there). That way, instead of GET+PUT, there could + // just be a COPY operation from the stash to the public zone. + $upload->initialize( $this->params['filekey'], $this->params['filename'] ); + + // Check if the local file checks out (this is generally a no-op) + $verification = $upload->verifyUpload(); + if ( $verification['status'] !== UploadBase::OK ) { + $status = Status::newFatal( 'verification-error' ); + $status->value = array( 'verification' => $verification ); + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) + ); + $this->setLastError( "Could not verify upload." ); + return false; + } + + // Upload the stashed file to a permanent location + $status = $upload->performUpload( + $this->params['comment'], + $this->params['text'], + $this->params['watch'], + $user + ); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) + ); + $this->setLastError( $status->getWikiText() ); + return false; + } + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Success', + 'stage' => 'publish', + 'filename' => $upload->getLocalFile()->getName(), + 'imageinfo' => $imageInfo, + 'status' => Status::newGood() + ) + ); + } catch ( MWException $e ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Failure', + 'stage' => 'publish', + 'status' => Status::newFatal( 'api-error-publishfailed' ) + ) + ); + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); + return false; + } + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); + } + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/includes/job/jobs/RefreshLinksJob.php b/includes/job/jobs/RefreshLinksJob.php new file mode 100644 index 00000000..4fc8bac6 --- /dev/null +++ b/includes/job/jobs/RefreshLinksJob.php @@ -0,0 +1,222 @@ +<?php +/** + * Job to update links for a given title. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Background job to update links for a given title. + * + * @ingroup JobQueue + */ +class RefreshLinksJob extends Job { + function __construct( $title, $params = '', $id = 0 ) { + parent::__construct( 'refreshLinks', $title, $params, $id ); + $this->removeDuplicates = true; // job is expensive + } + + /** + * Run a refreshLinks job + * @return boolean success + */ + function run() { + $linkCache = LinkCache::singleton(); + $linkCache->clear(); + + if ( is_null( $this->title ) ) { + $this->error = "refreshLinks: Invalid title"; + return false; + } + + # Wait for the DB of the current/next slave DB handle to catch up to the master. + # This way, we get the correct page_latest for templates or files that just changed + # milliseconds ago, having triggered this job to begin with. + if ( isset( $this->params['masterPos'] ) && $this->params['masterPos'] !== false ) { + wfGetLB()->waitFor( $this->params['masterPos'] ); + } + + $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL ); + if ( !$revision ) { + $this->error = 'refreshLinks: Article not found "' . + $this->title->getPrefixedDBkey() . '"'; + return false; // XXX: what if it was just deleted? + } + + self::runForTitleInternal( $this->title, $revision, __METHOD__ ); + + return true; + } + + /** + * @return Array + */ + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + // Don't let highly unique "masterPos" values ruin duplicate detection + if ( is_array( $info['params'] ) ) { + unset( $info['params']['masterPos'] ); + } + return $info; + } + + /** + * @param $title Title + * @param $revision Revision + * @param $fname string + * @return void + */ + public static function runForTitleInternal( Title $title, Revision $revision, $fname ) { + wfProfileIn( $fname ); + $content = $revision->getContent( Revision::RAW ); + + if ( !$content ) { + // if there is no content, pretend the content is empty + $content = $revision->getContentHandler()->makeEmptyContent(); + } + + // Revision ID must be passed to the parser output to get revision variables correct + $parserOutput = $content->getParserOutput( $title, $revision->getId(), null, false ); + + $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput ); + DataUpdate::runUpdates( $updates ); + + InfoAction::invalidateCache( $title ); + + wfProfileOut( $fname ); + } +} + +/** + * Background job to update links for a given title. + * Newer version for high use templates. + * + * @ingroup JobQueue + */ +class RefreshLinksJob2 extends Job { + function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'refreshLinks2', $title, $params, $id ); + // Base jobs for large templates can easily be de-duplicated + $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] ); + } + + /** + * Run a refreshLinks2 job + * @return boolean success + */ + function run() { + global $wgUpdateRowsPerJob; + + $linkCache = LinkCache::singleton(); + $linkCache->clear(); + + if ( is_null( $this->title ) ) { + $this->error = "refreshLinks2: Invalid title"; + return false; + } + + // Back compat for pre-r94435 jobs + $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks'; + + // Avoid slave lag when fetching templates. + // When the outermost job is run, we know that the caller that enqueued it must have + // committed the relevant changes to the DB by now. At that point, record the master + // position and pass it along as the job recursively breaks into smaller range jobs. + // Hopefully, when leaf jobs are popped, the slaves will have reached that position. + if ( isset( $this->params['masterPos'] ) ) { + $masterPos = $this->params['masterPos']; + } elseif ( wfGetLB()->getServerCount() > 1 ) { + $masterPos = wfGetLB()->getMasterPos(); + } else { + $masterPos = false; + } + + $tbc = $this->title->getBacklinkCache(); + + $jobs = array(); // jobs to insert + if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { + # This is a partition job to trigger the insertion of leaf jobs... + $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); + } else { + # This is a base job to trigger the insertion of partitioned jobs... + if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) { + # Just directly insert the single per-title jobs + $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); + } else { + # Insert the partition jobs to make per-title jobs + foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) { + list( $start, $end ) = $batch; + $jobs[] = new RefreshLinksJob2( $this->title, + array( + 'table' => $table, + 'start' => $start, + 'end' => $end, + 'masterPos' => $masterPos, + ) + $this->getRootJobParams() // carry over information for de-duplication + ); + } + } + } + + if ( count( $jobs ) ) { + JobQueueGroup::singleton()->push( $jobs ); + } + + return true; + } + + /** + * @param $table string + * @param $masterPos mixed + * @return Array + */ + protected function getSingleTitleJobs( $table, $masterPos ) { + # The "start"/"end" fields are not set for the base jobs + $start = isset( $this->params['start'] ) ? $this->params['start'] : false; + $end = isset( $this->params['end'] ) ? $this->params['end'] : false; + $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end ); + # Convert into single page refresh links jobs. + # This handles well when in sapi mode and is useful in any case for job + # de-duplication. If many pages use template A, and that template itself + # uses template B, then an edit to both will create many duplicate jobs. + # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will + # get run first, and when it does, it will remove the duplicates. Of course, + # one page could have its job popped when the other page's job is still + # buried within the logic of a refreshLinks2 job. + $jobs = array(); + foreach ( $titles as $title ) { + $jobs[] = new RefreshLinksJob( $title, + array( 'masterPos' => $masterPos ) + $this->getRootJobParams() + ); // carry over information for de-duplication + } + return $jobs; + } + + /** + * @return Array + */ + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + // Don't let highly unique "masterPos" values ruin duplicate detection + if ( is_array( $info['params'] ) ) { + unset( $info['params']['masterPos'] ); + } + return $info; + } +} diff --git a/includes/job/UploadFromUrlJob.php b/includes/job/jobs/UploadFromUrlJob.php index e06f68e4..c993cfb4 100644 --- a/includes/job/UploadFromUrlJob.php +++ b/includes/job/jobs/UploadFromUrlJob.php @@ -48,6 +48,7 @@ class UploadFromUrlJob extends Job { } public function run() { + global $wgCopyUploadAsyncTimeout; # Initialize this object and the upload object $this->upload = new UploadFromUrl(); $this->upload->initialize( @@ -58,7 +59,11 @@ class UploadFromUrlJob extends Job { $this->user = User::newFromName( $this->params['userName'] ); # Fetch the file - $status = $this->upload->fetchFile(); + $opts = array(); + if ( $wgCopyUploadAsyncTimeout ) { + $opts['timeout'] = $wgCopyUploadAsyncTimeout; + } + $status = $this->upload->fetchFile( $opts ); if ( !$status->isOk() ) { $this->leaveMessage( $status ); return true; @@ -148,8 +153,8 @@ class UploadFromUrlJob extends Job { * Store a result in the session data. Note that the caller is responsible * for appropriate session_start and session_write_close calls. * - * @param $result String: the result (Success|Warning|Failure) - * @param $dataKey String: the key of the extra data + * @param string $result the result (Success|Warning|Failure) + * @param string $dataKey the key of the extra data * @param $dataValue Mixed: the extra data itself */ protected function storeResultInSession( $result, $dataKey, $dataValue ) { |