summaryrefslogtreecommitdiff
path: root/includes/job
diff options
context:
space:
mode:
authorPierre Schmitz <pierre@archlinux.de>2014-12-27 15:41:37 +0100
committerPierre Schmitz <pierre@archlinux.de>2014-12-31 11:43:28 +0100
commitc1f9b1f7b1b77776192048005dcc66dcf3df2bfb (patch)
tree2b38796e738dd74cb42ecd9bfd151803108386bc /includes/job
parentb88ab0086858470dd1f644e64cb4e4f62bb2be9b (diff)
Update to MediaWiki 1.24.1
Diffstat (limited to 'includes/job')
-rw-r--r--includes/job/Job.php321
-rw-r--r--includes/job/JobQueue.php706
-rw-r--r--includes/job/JobQueueDB.php816
-rw-r--r--includes/job/JobQueueFederated.php473
-rw-r--r--includes/job/JobQueueGroup.php427
-rw-r--r--includes/job/JobQueueRedis.php856
-rw-r--r--includes/job/README81
-rw-r--r--includes/job/aggregator/JobQueueAggregator.php156
-rw-r--r--includes/job/aggregator/JobQueueAggregatorMemc.php124
-rw-r--r--includes/job/aggregator/JobQueueAggregatorRedis.php193
-rw-r--r--includes/job/jobs/AssembleUploadChunksJob.php127
-rw-r--r--includes/job/jobs/DoubleRedirectJob.php221
-rw-r--r--includes/job/jobs/DuplicateJob.php59
-rw-r--r--includes/job/jobs/EmaillingJob.php47
-rw-r--r--includes/job/jobs/EnotifNotifyJob.php58
-rw-r--r--includes/job/jobs/HTMLCacheUpdateJob.php263
-rw-r--r--includes/job/jobs/NullJob.php76
-rw-r--r--includes/job/jobs/PublishStashedFileJob.php140
-rw-r--r--includes/job/jobs/RefreshLinksJob.php222
-rw-r--r--includes/job/jobs/UploadFromUrlJob.php184
20 files changed, 0 insertions, 5550 deletions
diff --git a/includes/job/Job.php b/includes/job/Job.php
deleted file mode 100644
index ab7df5d2..00000000
--- a/includes/job/Job.php
+++ /dev/null
@@ -1,321 +0,0 @@
-<?php
-/**
- * Job queue base code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @defgroup JobQueue JobQueue
- */
-
-/**
- * Class to both describe a background job and handle jobs.
- * The queue aspects of this class are now deprecated.
- *
- * @ingroup JobQueue
- */
-abstract class Job {
- /**
- * @var Title
- */
- var $title;
-
- var $command,
- $params,
- $id,
- $removeDuplicates,
- $error;
-
- /** @var Array Additional queue metadata */
- public $metadata = array();
-
- /*-------------------------------------------------------------------------
- * Abstract functions
- *------------------------------------------------------------------------*/
-
- /**
- * Run the job
- * @return boolean success
- */
- abstract public function run();
-
- /*-------------------------------------------------------------------------
- * Static functions
- *------------------------------------------------------------------------*/
-
- /**
- * Create the appropriate object to handle a specific job
- *
- * @param string $command Job command
- * @param $title Title: Associated title
- * @param array|bool $params Job parameters
- * @param int $id Job identifier
- * @throws MWException
- * @return Job
- */
- public static function factory( $command, Title $title, $params = false, $id = 0 ) {
- global $wgJobClasses;
- if ( isset( $wgJobClasses[$command] ) ) {
- $class = $wgJobClasses[$command];
- return new $class( $title, $params, $id );
- }
- throw new MWException( "Invalid job command `{$command}`" );
- }
-
- /**
- * Batch-insert a group of jobs into the queue.
- * This will be wrapped in a transaction with a forced commit.
- *
- * This may add duplicate at insert time, but they will be
- * removed later on, when the first one is popped.
- *
- * @param array $jobs of Job objects
- * @return bool
- * @deprecated since 1.21
- */
- public static function batchInsert( $jobs ) {
- return JobQueueGroup::singleton()->push( $jobs );
- }
-
- /**
- * Insert a group of jobs into the queue.
- *
- * Same as batchInsert() but does not commit and can thus
- * be rolled-back as part of a larger transaction. However,
- * large batches of jobs can cause slave lag.
- *
- * @param array $jobs of Job objects
- * @return bool
- * @deprecated since 1.21
- */
- public static function safeBatchInsert( $jobs ) {
- return JobQueueGroup::singleton()->push( $jobs, JobQueue::QOS_ATOMIC );
- }
-
- /**
- * Pop a job of a certain type. This tries less hard than pop() to
- * actually find a job; it may be adversely affected by concurrent job
- * runners.
- *
- * @param $type string
- * @return Job|bool Returns false if there are no jobs
- * @deprecated since 1.21
- */
- public static function pop_type( $type ) {
- return JobQueueGroup::singleton()->get( $type )->pop();
- }
-
- /**
- * Pop a job off the front of the queue.
- * This is subject to $wgJobTypesExcludedFromDefaultQueue.
- *
- * @return Job or false if there's no jobs
- * @deprecated since 1.21
- */
- public static function pop() {
- return JobQueueGroup::singleton()->pop();
- }
-
- /*-------------------------------------------------------------------------
- * Non-static functions
- *------------------------------------------------------------------------*/
-
- /**
- * @param $command
- * @param $title
- * @param $params array|bool
- * @param $id int
- */
- public function __construct( $command, $title, $params = false, $id = 0 ) {
- $this->command = $command;
- $this->title = $title;
- $this->params = $params;
- $this->id = $id;
-
- $this->removeDuplicates = false; // expensive jobs may set this to true
- }
-
- /**
- * @return integer May be 0 for jobs stored outside the DB
- * @deprecated since 1.22
- */
- public function getId() {
- return $this->id;
- }
-
- /**
- * @return string
- */
- public function getType() {
- return $this->command;
- }
-
- /**
- * @return Title
- */
- public function getTitle() {
- return $this->title;
- }
-
- /**
- * @return array
- */
- public function getParams() {
- return $this->params;
- }
-
- /**
- * @return integer|null UNIX timestamp to delay running this job until, otherwise null
- * @since 1.22
- */
- public function getReleaseTimestamp() {
- return isset( $this->params['jobReleaseTimestamp'] )
- ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
- : null;
- }
-
- /**
- * @return bool Whether only one of each identical set of jobs should be run
- */
- public function ignoreDuplicates() {
- return $this->removeDuplicates;
- }
-
- /**
- * @return bool Whether this job can be retried on failure by job runners
- * @since 1.21
- */
- public function allowRetries() {
- return true;
- }
-
- /**
- * Subclasses may need to override this to make duplication detection work.
- * The resulting map conveys everything that makes the job unique. This is
- * only checked if ignoreDuplicates() returns true, meaning that duplicate
- * jobs are supposed to be ignored.
- *
- * @return Array Map of key/values
- * @since 1.21
- */
- public function getDeduplicationInfo() {
- $info = array(
- 'type' => $this->getType(),
- 'namespace' => $this->getTitle()->getNamespace(),
- 'title' => $this->getTitle()->getDBkey(),
- 'params' => $this->getParams()
- );
- if ( is_array( $info['params'] ) ) {
- // Identical jobs with different "root" jobs should count as duplicates
- unset( $info['params']['rootJobSignature'] );
- unset( $info['params']['rootJobTimestamp'] );
- // Likewise for jobs with different delay times
- unset( $info['params']['jobReleaseTimestamp'] );
- }
- return $info;
- }
-
- /**
- * @see JobQueue::deduplicateRootJob()
- * @param string $key A key that identifies the task
- * @return Array
- * @since 1.21
- */
- public static function newRootJobParams( $key ) {
- return array(
- 'rootJobSignature' => sha1( $key ),
- 'rootJobTimestamp' => wfTimestampNow()
- );
- }
-
- /**
- * @see JobQueue::deduplicateRootJob()
- * @return Array
- * @since 1.21
- */
- public function getRootJobParams() {
- return array(
- 'rootJobSignature' => isset( $this->params['rootJobSignature'] )
- ? $this->params['rootJobSignature']
- : null,
- 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
- ? $this->params['rootJobTimestamp']
- : null
- );
- }
-
- /**
- * @see JobQueue::deduplicateRootJob()
- * @return bool
- * @since 1.22
- */
- public function hasRootJobParams() {
- return isset( $this->params['rootJobSignature'] )
- && isset( $this->params['rootJobTimestamp'] );
- }
-
- /**
- * Insert a single job into the queue.
- * @return bool true on success
- * @deprecated since 1.21
- */
- public function insert() {
- return JobQueueGroup::singleton()->push( $this );
- }
-
- /**
- * @return string
- */
- public function toString() {
- $paramString = '';
- if ( $this->params ) {
- foreach ( $this->params as $key => $value ) {
- if ( $paramString != '' ) {
- $paramString .= ' ';
- }
- if ( is_array( $value ) ) {
- $value = "array(" . count( $value ) . ")";
- } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) {
- $value = "object(" . get_class( $value ) . ")";
- }
- $value = (string)$value;
- if ( mb_strlen( $value ) > 1024 ) {
- $value = "string(" . mb_strlen( $value ) . ")";
- }
-
- $paramString .= "$key=$value";
- }
- }
-
- if ( is_object( $this->title ) ) {
- $s = "{$this->command} " . $this->title->getPrefixedDBkey();
- if ( $paramString !== '' ) {
- $s .= ' ' . $paramString;
- }
- return $s;
- } else {
- return "{$this->command} $paramString";
- }
- }
-
- protected function setLastError( $error ) {
- $this->error = $error;
- }
-
- public function getLastError() {
- return $this->error;
- }
-}
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
deleted file mode 100644
index 6556ee85..00000000
--- a/includes/job/JobQueue.php
+++ /dev/null
@@ -1,706 +0,0 @@
-<?php
-/**
- * Job queue base code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @defgroup JobQueue JobQueue
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle enqueueing and running of background jobs
- *
- * @ingroup JobQueue
- * @since 1.21
- */
-abstract class JobQueue {
- protected $wiki; // string; wiki ID
- protected $type; // string; job type
- protected $order; // string; job priority for pop()
- protected $claimTTL; // integer; seconds
- protected $maxTries; // integer; maximum number of times to try a job
- protected $checkDelay; // boolean; allow delayed jobs
-
- /** @var BagOStuff */
- protected $dupCache;
-
- const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
-
- const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
-
- /**
- * @param $params array
- */
- protected function __construct( array $params ) {
- $this->wiki = $params['wiki'];
- $this->type = $params['type'];
- $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
- $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
- if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
- $this->order = $params['order'];
- } else {
- $this->order = $this->optimalOrder();
- }
- if ( !in_array( $this->order, $this->supportedOrders() ) ) {
- throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
- }
- $this->checkDelay = !empty( $params['checkDelay'] );
- if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
- throw new MWException( __CLASS__ . " does not support delayed jobs." );
- }
- $this->dupCache = wfGetCache( CACHE_ANYTHING );
- }
-
- /**
- * Get a job queue object of the specified type.
- * $params includes:
- * - class : What job class to use (determines job type)
- * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
- * - type : The name of the job types this queue handles
- * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
- * If "fifo" is used, the queue will effectively be FIFO. Note that job
- * completion will not appear to be exactly FIFO if there are multiple
- * job runners since jobs can take different times to finish once popped.
- * If "timestamp" is used, the queue will at least be loosely ordered
- * by timestamp, allowing for some jobs to be popped off out of order.
- * If "random" is used, pop() will pick jobs in random order.
- * Note that it may only be weakly random (e.g. a lottery of the oldest X).
- * If "any" is choosen, the queue will use whatever order is the fastest.
- * This might be useful for improving concurrency for job acquisition.
- * - claimTTL : If supported, the queue will recycle jobs that have been popped
- * but not acknowledged as completed after this many seconds. Recycling
- * of jobs simple means re-inserting them into the queue. Jobs can be
- * attempted up to three times before being discarded.
- * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
- * This lets delayed jobs wait in a staging area until a given timestamp is
- * reached, at which point they will enter the queue. If this is not enabled
- * or not supported, an exception will be thrown on delayed job insertion.
- *
- * Queue classes should throw an exception if they do not support the options given.
- *
- * @param $params array
- * @return JobQueue
- * @throws MWException
- */
- final public static function factory( array $params ) {
- $class = $params['class'];
- if ( !class_exists( $class ) ) {
- throw new MWException( "Invalid job queue class '$class'." );
- }
- $obj = new $class( $params );
- if ( !( $obj instanceof self ) ) {
- throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
- }
- return $obj;
- }
-
- /**
- * @return string Wiki ID
- */
- final public function getWiki() {
- return $this->wiki;
- }
-
- /**
- * @return string Job type that this queue handles
- */
- final public function getType() {
- return $this->type;
- }
-
- /**
- * @return string One of (random, timestamp, fifo, undefined)
- */
- final public function getOrder() {
- return $this->order;
- }
-
- /**
- * @return bool Whether delayed jobs are enabled
- * @since 1.22
- */
- final public function delayedJobsEnabled() {
- return $this->checkDelay;
- }
-
- /**
- * Get the allowed queue orders for configuration validation
- *
- * @return Array Subset of (random, timestamp, fifo, undefined)
- */
- abstract protected function supportedOrders();
-
- /**
- * Get the default queue order to use if configuration does not specify one
- *
- * @return string One of (random, timestamp, fifo, undefined)
- */
- abstract protected function optimalOrder();
-
- /**
- * Find out if delayed jobs are supported for configuration validation
- *
- * @return boolean Whether delayed jobs are supported
- */
- protected function supportsDelayedJobs() {
- return false; // not implemented
- }
-
- /**
- * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
- * Queue classes should use caching if they are any slower without memcached.
- *
- * If caching is used, this might return false when there are actually no jobs.
- * If pop() is called and returns false then it should correct the cache. Also,
- * calling flushCaches() first prevents this. However, this affect is typically
- * not distinguishable from the race condition between isEmpty() and pop().
- *
- * @return bool
- * @throws JobQueueError
- */
- final public function isEmpty() {
- wfProfileIn( __METHOD__ );
- $res = $this->doIsEmpty();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueue::isEmpty()
- * @return bool
- */
- abstract protected function doIsEmpty();
-
- /**
- * Get the number of available (unacquired, non-delayed) jobs in the queue.
- * Queue classes should use caching if they are any slower without memcached.
- *
- * If caching is used, this number might be out of date for a minute.
- *
- * @return integer
- * @throws JobQueueError
- */
- final public function getSize() {
- wfProfileIn( __METHOD__ );
- $res = $this->doGetSize();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueue::getSize()
- * @return integer
- */
- abstract protected function doGetSize();
-
- /**
- * Get the number of acquired jobs (these are temporarily out of the queue).
- * Queue classes should use caching if they are any slower without memcached.
- *
- * If caching is used, this number might be out of date for a minute.
- *
- * @return integer
- * @throws JobQueueError
- */
- final public function getAcquiredCount() {
- wfProfileIn( __METHOD__ );
- $res = $this->doGetAcquiredCount();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueue::getAcquiredCount()
- * @return integer
- */
- abstract protected function doGetAcquiredCount();
-
- /**
- * Get the number of delayed jobs (these are temporarily out of the queue).
- * Queue classes should use caching if they are any slower without memcached.
- *
- * If caching is used, this number might be out of date for a minute.
- *
- * @return integer
- * @throws JobQueueError
- * @since 1.22
- */
- final public function getDelayedCount() {
- wfProfileIn( __METHOD__ );
- $res = $this->doGetDelayedCount();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueue::getDelayedCount()
- * @return integer
- */
- protected function doGetDelayedCount() {
- return 0; // not implemented
- }
-
- /**
- * Get the number of acquired jobs that can no longer be attempted.
- * Queue classes should use caching if they are any slower without memcached.
- *
- * If caching is used, this number might be out of date for a minute.
- *
- * @return integer
- * @throws JobQueueError
- */
- final public function getAbandonedCount() {
- wfProfileIn( __METHOD__ );
- $res = $this->doGetAbandonedCount();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueue::getAbandonedCount()
- * @return integer
- */
- protected function doGetAbandonedCount() {
- return 0; // not implemented
- }
-
- /**
- * Push a single jobs into the queue.
- * This does not require $wgJobClasses to be set for the given job type.
- * Outside callers should use JobQueueGroup::push() instead of this function.
- *
- * @param $jobs Job|Array
- * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
- * @return bool Returns false on failure
- * @throws JobQueueError
- */
- final public function push( $jobs, $flags = 0 ) {
- return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
- }
-
- /**
- * Push a batch of jobs into the queue.
- * This does not require $wgJobClasses to be set for the given job type.
- * Outside callers should use JobQueueGroup::push() instead of this function.
- *
- * @param array $jobs List of Jobs
- * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
- * @return bool Returns false on failure
- * @throws JobQueueError
- */
- final public function batchPush( array $jobs, $flags = 0 ) {
- if ( !count( $jobs ) ) {
- return true; // nothing to do
- }
-
- foreach ( $jobs as $job ) {
- if ( $job->getType() !== $this->type ) {
- throw new MWException(
- "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
- } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
- throw new MWException(
- "Got delayed '{$job->getType()}' job; delays are not supported." );
- }
- }
-
- wfProfileIn( __METHOD__ );
- $ok = $this->doBatchPush( $jobs, $flags );
- wfProfileOut( __METHOD__ );
- return $ok;
- }
-
- /**
- * @see JobQueue::batchPush()
- * @return bool
- */
- abstract protected function doBatchPush( array $jobs, $flags );
-
- /**
- * Pop a job off of the queue.
- * This requires $wgJobClasses to be set for the given job type.
- * Outside callers should use JobQueueGroup::pop() instead of this function.
- *
- * @return Job|bool Returns false if there are no jobs
- * @throws JobQueueError
- */
- final public function pop() {
- global $wgJobClasses;
-
- if ( $this->wiki !== wfWikiID() ) {
- throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
- } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
- // Do not pop jobs if there is no class for the queue type
- throw new MWException( "Unrecognized job type '{$this->type}'." );
- }
-
- wfProfileIn( __METHOD__ );
- $job = $this->doPop();
- wfProfileOut( __METHOD__ );
-
- // Flag this job as an old duplicate based on its "root" job...
- try {
- if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
- JobQueue::incrStats( 'job-pop-duplicate', $this->type );
- $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
- }
- } catch ( MWException $e ) {} // don't lose jobs over this
-
- return $job;
- }
-
- /**
- * @see JobQueue::pop()
- * @return Job
- */
- abstract protected function doPop();
-
- /**
- * Acknowledge that a job was completed.
- *
- * This does nothing for certain queue classes or if "claimTTL" is not set.
- * Outside callers should use JobQueueGroup::ack() instead of this function.
- *
- * @param $job Job
- * @return bool
- * @throws JobQueueError
- */
- final public function ack( Job $job ) {
- if ( $job->getType() !== $this->type ) {
- throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
- }
- wfProfileIn( __METHOD__ );
- $ok = $this->doAck( $job );
- wfProfileOut( __METHOD__ );
- return $ok;
- }
-
- /**
- * @see JobQueue::ack()
- * @return bool
- */
- abstract protected function doAck( Job $job );
-
- /**
- * Register the "root job" of a given job into the queue for de-duplication.
- * This should only be called right *after* all the new jobs have been inserted.
- * This is used to turn older, duplicate, job entries into no-ops. The root job
- * information will remain in the registry until it simply falls out of cache.
- *
- * This requires that $job has two special fields in the "params" array:
- * - rootJobSignature : hash (e.g. SHA1) that identifies the task
- * - rootJobTimestamp : TS_MW timestamp of this instance of the task
- *
- * A "root job" is a conceptual job that consist of potentially many smaller jobs
- * that are actually inserted into the queue. For example, "refreshLinks" jobs are
- * spawned when a template is edited. One can think of the task as "update links
- * of pages that use template X" and an instance of that task as a "root job".
- * However, what actually goes into the queue are potentially many refreshLinks2 jobs.
- * Since these jobs include things like page ID ranges and DB master positions, and morph
- * into smaller refreshLinks2 jobs recursively, simple duplicate detection (like job_sha1)
- * for individual jobs being identical is not useful.
- *
- * In the case of "refreshLinks", if these jobs are still in the queue when the template
- * is edited again, we want all of these old refreshLinks jobs for that template to become
- * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
- * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
- * previous "root job" for the same task of "update links of pages that use template X".
- *
- * This does nothing for certain queue classes.
- *
- * @param $job Job
- * @return bool
- * @throws JobQueueError
- */
- final public function deduplicateRootJob( Job $job ) {
- if ( $job->getType() !== $this->type ) {
- throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
- }
- wfProfileIn( __METHOD__ );
- $ok = $this->doDeduplicateRootJob( $job );
- wfProfileOut( __METHOD__ );
- return $ok;
- }
-
- /**
- * @see JobQueue::deduplicateRootJob()
- * @param $job Job
- * @return bool
- */
- protected function doDeduplicateRootJob( Job $job ) {
- if ( !$job->hasRootJobParams() ) {
- throw new MWException( "Cannot register root job; missing parameters." );
- }
- $params = $job->getRootJobParams();
-
- $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
- // Callers should call batchInsert() and then this function so that if the insert
- // fails, the de-duplication registration will be aborted. Since the insert is
- // deferred till "transaction idle", do the same here, so that the ordering is
- // maintained. Having only the de-duplication registration succeed would cause
- // jobs to become no-ops without any actual jobs that made them redundant.
- $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
- if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
- return true; // a newer version of this root job was enqueued
- }
-
- // Update the timestamp of the last root job started at the location...
- return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
- }
-
- /**
- * Check if the "root" job of a given job has been superseded by a newer one
- *
- * @param $job Job
- * @return bool
- * @throws JobQueueError
- */
- final protected function isRootJobOldDuplicate( Job $job ) {
- if ( $job->getType() !== $this->type ) {
- throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
- }
- wfProfileIn( __METHOD__ );
- $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
- wfProfileOut( __METHOD__ );
- return $isDuplicate;
- }
-
- /**
- * @see JobQueue::isRootJobOldDuplicate()
- * @param Job $job
- * @return bool
- */
- protected function doIsRootJobOldDuplicate( Job $job ) {
- if ( !$job->hasRootJobParams() ) {
- return false; // job has no de-deplication info
- }
- $params = $job->getRootJobParams();
-
- $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
- // Get the last time this root job was enqueued
- $timestamp = $this->dupCache->get( $key );
-
- // Check if a new root job was started at the location after this one's...
- return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
- }
-
- /**
- * @param string $signature Hash identifier of the root job
- * @return string
- */
- protected function getRootJobCacheKey( $signature ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
- }
-
- /**
- * Deleted all unclaimed and delayed jobs from the queue
- *
- * @return bool Success
- * @throws JobQueueError
- * @since 1.22
- */
- final public function delete() {
- wfProfileIn( __METHOD__ );
- $res = $this->doDelete();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueue::delete()
- * @return bool Success
- */
- protected function doDelete() {
- throw new MWException( "This method is not implemented." );
- }
-
- /**
- * Wait for any slaves or backup servers to catch up.
- *
- * This does nothing for certain queue classes.
- *
- * @return void
- * @throws JobQueueError
- */
- final public function waitForBackups() {
- wfProfileIn( __METHOD__ );
- $this->doWaitForBackups();
- wfProfileOut( __METHOD__ );
- }
-
- /**
- * @see JobQueue::waitForBackups()
- * @return void
- */
- protected function doWaitForBackups() {}
-
- /**
- * Return a map of task names to task definition maps.
- * A "task" is a fast periodic queue maintenance action.
- * Mutually exclusive tasks must implement their own locking in the callback.
- *
- * Each task value is an associative array with:
- * - name : the name of the task
- * - callback : a PHP callable that performs the task
- * - period : the period in seconds corresponding to the task frequency
- *
- * @return Array
- */
- final public function getPeriodicTasks() {
- $tasks = $this->doGetPeriodicTasks();
- foreach ( $tasks as $name => &$def ) {
- $def['name'] = $name;
- }
- return $tasks;
- }
-
- /**
- * @see JobQueue::getPeriodicTasks()
- * @return Array
- */
- protected function doGetPeriodicTasks() {
- return array();
- }
-
- /**
- * Clear any process and persistent caches
- *
- * @return void
- */
- final public function flushCaches() {
- wfProfileIn( __METHOD__ );
- $this->doFlushCaches();
- wfProfileOut( __METHOD__ );
- }
-
- /**
- * @see JobQueue::flushCaches()
- * @return void
- */
- protected function doFlushCaches() {}
-
- /**
- * Get an iterator to traverse over all available jobs in this queue.
- * This does not include jobs that are currently acquired or delayed.
- * Note: results may be stale if the queue is concurrently modified.
- *
- * @return Iterator
- * @throws JobQueueError
- */
- abstract public function getAllQueuedJobs();
-
- /**
- * Get an iterator to traverse over all delayed jobs in this queue.
- * Note: results may be stale if the queue is concurrently modified.
- *
- * @return Iterator
- * @throws JobQueueError
- * @since 1.22
- */
- public function getAllDelayedJobs() {
- return new ArrayIterator( array() ); // not implemented
- }
-
- /**
- * Do not use this function outside of JobQueue/JobQueueGroup
- *
- * @return string
- * @since 1.22
- */
- public function getCoalesceLocationInternal() {
- return null;
- }
-
- /**
- * Check whether each of the given queues are empty.
- * This is used for batching checks for queues stored at the same place.
- *
- * @param array $types List of queues types
- * @return array|null (list of non-empty queue types) or null if unsupported
- * @throws MWException
- * @since 1.22
- */
- final public function getSiblingQueuesWithJobs( array $types ) {
- $section = new ProfileSection( __METHOD__ );
- return $this->doGetSiblingQueuesWithJobs( $types );
- }
-
- /**
- * @see JobQueue::getSiblingQueuesWithJobs()
- * @param array $types List of queues types
- * @return array|null (list of queue types) or null if unsupported
- */
- protected function doGetSiblingQueuesWithJobs( array $types ) {
- return null; // not supported
- }
-
- /**
- * Check the size of each of the given queues.
- * For queues not served by the same store as this one, 0 is returned.
- * This is used for batching checks for queues stored at the same place.
- *
- * @param array $types List of queues types
- * @return array|null (job type => whether queue is empty) or null if unsupported
- * @throws MWException
- * @since 1.22
- */
- final public function getSiblingQueueSizes( array $types ) {
- $section = new ProfileSection( __METHOD__ );
- return $this->doGetSiblingQueueSizes( $types );
- }
-
- /**
- * @see JobQueue::getSiblingQueuesSize()
- * @param array $types List of queues types
- * @return array|null (list of queue types) or null if unsupported
- */
- protected function doGetSiblingQueueSizes( array $types ) {
- return null; // not supported
- }
-
- /**
- * Call wfIncrStats() for the queue overall and for the queue type
- *
- * @param string $key Event type
- * @param string $type Job type
- * @param integer $delta
- * @since 1.22
- */
- public static function incrStats( $key, $type, $delta = 1 ) {
- wfIncrStats( $key, $delta );
- wfIncrStats( "{$key}-{$type}", $delta );
- }
-
- /**
- * Namespace the queue with a key to isolate it for testing
- *
- * @param $key string
- * @return void
- * @throws MWException
- */
- public function setTestingPrefix( $key ) {
- throw new MWException( "Queue namespacing not supported for this queue type." );
- }
-}
-
-/**
- * @ingroup JobQueue
- * @since 1.22
- */
-class JobQueueError extends MWException {}
-class JobQueueConnectionError extends JobQueueError {}
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php
deleted file mode 100644
index c39083df..00000000
--- a/includes/job/JobQueueDB.php
+++ /dev/null
@@ -1,816 +0,0 @@
-<?php
-/**
- * Database-backed job queue code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle job queues stored in the DB
- *
- * @ingroup JobQueue
- * @since 1.21
- */
-class JobQueueDB extends JobQueue {
- const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
- const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
- const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
- const MAX_OFFSET = 255; // integer; maximum number of rows to skip
-
- /** @var BagOStuff */
- protected $cache;
-
- protected $cluster = false; // string; name of an external DB cluster
-
- /**
- * Additional parameters include:
- * - cluster : The name of an external cluster registered via LBFactory.
- * If not specified, the primary DB cluster for the wiki will be used.
- * This can be overridden with a custom cluster so that DB handles will
- * be retrieved via LBFactory::getExternalLB() and getConnection().
- * @param $params array
- */
- protected function __construct( array $params ) {
- global $wgMemc;
-
- parent::__construct( $params );
-
- $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
- // Make sure that we don't use the SQL cache, which would be harmful
- $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc;
- }
-
- protected function supportedOrders() {
- return array( 'random', 'timestamp', 'fifo' );
- }
-
- protected function optimalOrder() {
- return 'random';
- }
-
- /**
- * @see JobQueue::doIsEmpty()
- * @return bool
- */
- protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
- $dbr = $this->getSlaveDB();
- try {
- $found = $dbr->selectField( // unclaimed job
- 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
- );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
- $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
-
- return !$found;
- }
-
- /**
- * @see JobQueue::doGetSize()
- * @return integer
- */
- protected function doGetSize() {
- $key = $this->getCacheKey( 'size' );
-
- $size = $this->cache->get( $key );
- if ( is_int( $size ) ) {
- return $size;
- }
-
- try {
- $dbr = $this->getSlaveDB();
- $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
- array( 'job_cmd' => $this->type, 'job_token' => '' ),
- __METHOD__
- );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
- $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
-
- return $size;
- }
-
- /**
- * @see JobQueue::doGetAcquiredCount()
- * @return integer
- */
- protected function doGetAcquiredCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
-
- $key = $this->getCacheKey( 'acquiredcount' );
-
- $count = $this->cache->get( $key );
- if ( is_int( $count ) ) {
- return $count;
- }
-
- $dbr = $this->getSlaveDB();
- try {
- $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
- array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
- __METHOD__
- );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
- $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
-
- return $count;
- }
-
- /**
- * @see JobQueue::doGetAbandonedCount()
- * @return integer
- * @throws MWException
- */
- protected function doGetAbandonedCount() {
- global $wgMemc;
-
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
-
- $key = $this->getCacheKey( 'abandonedcount' );
-
- $count = $wgMemc->get( $key );
- if ( is_int( $count ) ) {
- return $count;
- }
-
- $dbr = $this->getSlaveDB();
- try {
- $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
- array(
- 'job_cmd' => $this->type,
- "job_token != {$dbr->addQuotes( '' )}",
- "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
- ),
- __METHOD__
- );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
- $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
-
- return $count;
- }
-
- /**
- * @see JobQueue::doBatchPush()
- * @param array $jobs
- * @param $flags
- * @throws DBError|Exception
- * @return bool
- */
- protected function doBatchPush( array $jobs, $flags ) {
- $dbw = $this->getMasterDB();
-
- $that = $this;
- $method = __METHOD__;
- $dbw->onTransactionIdle(
- function() use ( $dbw, $that, $jobs, $flags, $method ) {
- $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
- }
- );
-
- return true;
- }
-
- /**
- * This function should *not* be called outside of JobQueueDB
- *
- * @param DatabaseBase $dbw
- * @param array $jobs
- * @param int $flags
- * @param string $method
- * @return boolean
- * @throws type
- */
- public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
- if ( !count( $jobs ) ) {
- return true;
- }
-
- $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
- $rowList = array(); // list of jobs for jobs that are are not de-duplicated
- foreach ( $jobs as $job ) {
- $row = $this->insertFields( $job );
- if ( $job->ignoreDuplicates() ) {
- $rowSet[$row['job_sha1']] = $row;
- } else {
- $rowList[] = $row;
- }
- }
-
- if ( $flags & self::QOS_ATOMIC ) {
- $dbw->begin( $method ); // wrap all the job additions in one transaction
- }
- try {
- // Strip out any duplicate jobs that are already in the queue...
- if ( count( $rowSet ) ) {
- $res = $dbw->select( 'job', 'job_sha1',
- array(
- // No job_type condition since it's part of the job_sha1 hash
- 'job_sha1' => array_keys( $rowSet ),
- 'job_token' => '' // unclaimed
- ),
- $method
- );
- foreach ( $res as $row ) {
- wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
- unset( $rowSet[$row->job_sha1] ); // already enqueued
- }
- }
- // Build the full list of job rows to insert
- $rows = array_merge( $rowList, array_values( $rowSet ) );
- // Insert the job rows in chunks to avoid slave lag...
- foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
- $dbw->insert( 'job', $rowBatch, $method );
- }
- JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
- JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $rowSet ) + count( $rowList ) - count( $rows ) );
- } catch ( DBError $e ) {
- if ( $flags & self::QOS_ATOMIC ) {
- $dbw->rollback( $method );
- }
- throw $e;
- }
- if ( $flags & self::QOS_ATOMIC ) {
- $dbw->commit( $method );
- }
-
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
-
- return true;
- }
-
- /**
- * @see JobQueue::doPop()
- * @return Job|bool
- */
- protected function doPop() {
- if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
- return false; // queue is empty
- }
-
- $dbw = $this->getMasterDB();
- try {
- $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
- $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
- $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
- $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
- } );
-
- $uuid = wfRandomString( 32 ); // pop attempt
- $job = false; // job popped off
- do { // retry when our row is invalid or deleted as a duplicate
- // Try to reserve a row in the DB...
- if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
- $row = $this->claimOldest( $uuid );
- } else { // random first
- $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
- $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
- $row = $this->claimRandom( $uuid, $rand, $gte );
- }
- // Check if we found a row to reserve...
- if ( !$row ) {
- $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
- break; // nothing to do
- }
- JobQueue::incrStats( 'job-pop', $this->type );
- // Get the job object from the row...
- $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
- if ( !$title ) {
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- wfDebug( "Row has invalid title '{$row->job_title}'." );
- continue; // try again
- }
- $job = Job::factory( $row->job_cmd, $title,
- self::extractBlob( $row->job_params ), $row->job_id );
- $job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around broken subclasses
- break; // done
- } while ( true );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
-
- return $job;
- }
-
- /**
- * Reserve a row with a single UPDATE without holding row locks over RTTs...
- *
- * @param string $uuid 32 char hex string
- * @param $rand integer Random unsigned integer (31 bits)
- * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
- * @return Row|false
- */
- protected function claimRandom( $uuid, $rand, $gte ) {
- $dbw = $this->getMasterDB();
- // Check cache to see if the queue has <= OFFSET items
- $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
-
- $row = false; // the row acquired
- $invertedDirection = false; // whether one job_random direction was already scanned
- // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
- // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
- // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
- // be used here with MySQL.
- do {
- if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
- // For small queues, using OFFSET will overshoot and return no rows more often.
- // Instead, this uses job_random to pick a row (possibly checking both directions).
- $ineq = $gte ? '>=' : '<=';
- $dir = $gte ? 'ASC' : 'DESC';
- $row = $dbw->selectRow( 'job', '*', // find a random job
- array(
- 'job_cmd' => $this->type,
- 'job_token' => '', // unclaimed
- "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
- __METHOD__,
- array( 'ORDER BY' => "job_random {$dir}" )
- );
- if ( !$row && !$invertedDirection ) {
- $gte = !$gte;
- $invertedDirection = true;
- continue; // try the other direction
- }
- } else { // table *may* have >= MAX_OFFSET rows
- // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
- // in MySQL if there are many rows for some reason. This uses a small OFFSET
- // instead of job_random for reducing excess claim retries.
- $row = $dbw->selectRow( 'job', '*', // find a random job
- array(
- 'job_cmd' => $this->type,
- 'job_token' => '', // unclaimed
- ),
- __METHOD__,
- array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) )
- );
- if ( !$row ) {
- $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
- $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
- continue; // use job_random
- }
- }
- if ( $row ) { // claim the job
- $dbw->update( 'job', // update by PK
- array(
- 'job_token' => $uuid,
- 'job_token_timestamp' => $dbw->timestamp(),
- 'job_attempts = job_attempts+1' ),
- array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
- __METHOD__
- );
- // This might get raced out by another runner when claiming the previously
- // selected row. The use of job_random should minimize this problem, however.
- if ( !$dbw->affectedRows() ) {
- $row = false; // raced out
- }
- } else {
- break; // nothing to do
- }
- } while ( !$row );
-
- return $row;
- }
-
- /**
- * Reserve a row with a single UPDATE without holding row locks over RTTs...
- *
- * @param string $uuid 32 char hex string
- * @return Row|false
- */
- protected function claimOldest( $uuid ) {
- $dbw = $this->getMasterDB();
-
- $row = false; // the row acquired
- do {
- if ( $dbw->getType() === 'mysql' ) {
- // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
- // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
- // Oracle and Postgre have no such limitation. However, MySQL offers an
- // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
- $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
- "SET " .
- "job_token = {$dbw->addQuotes( $uuid ) }, " .
- "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
- "job_attempts = job_attempts+1 " .
- "WHERE ( " .
- "job_cmd = {$dbw->addQuotes( $this->type )} " .
- "AND job_token = {$dbw->addQuotes( '' )} " .
- ") ORDER BY job_id ASC LIMIT 1",
- __METHOD__
- );
- } else {
- // Use a subquery to find the job, within an UPDATE to claim it.
- // This uses as much of the DB wrapper functions as possible.
- $dbw->update( 'job',
- array(
- 'job_token' => $uuid,
- 'job_token_timestamp' => $dbw->timestamp(),
- 'job_attempts = job_attempts+1' ),
- array( 'job_id = (' .
- $dbw->selectSQLText( 'job', 'job_id',
- array( 'job_cmd' => $this->type, 'job_token' => '' ),
- __METHOD__,
- array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
- ')'
- ),
- __METHOD__
- );
- }
- // Fetch any row that we just reserved...
- if ( $dbw->affectedRows() ) {
- $row = $dbw->selectRow( 'job', '*',
- array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
- );
- if ( !$row ) { // raced out by duplicate job removal
- wfDebug( "Row deleted as duplicate by another process." );
- }
- } else {
- break; // nothing to do
- }
- } while ( !$row );
-
- return $row;
- }
-
- /**
- * @see JobQueue::doAck()
- * @param Job $job
- * @throws MWException
- * @return Job|bool
- */
- protected function doAck( Job $job ) {
- if ( !isset( $job->metadata['id'] ) ) {
- throw new MWException( "Job of type '{$job->getType()}' has no ID." );
- }
-
- $dbw = $this->getMasterDB();
- try {
- $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
- $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
- $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
- $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
- } );
-
- // Delete a row with a single DELETE without holding row locks over RTTs...
- $dbw->delete( 'job',
- array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
-
- return true;
- }
-
- /**
- * @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
- * @throws MWException
- * @return bool
- */
- protected function doDeduplicateRootJob( Job $job ) {
- $params = $job->getParams();
- if ( !isset( $params['rootJobSignature'] ) ) {
- throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
- } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
- throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
- }
- $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
- // Callers should call batchInsert() and then this function so that if the insert
- // fails, the de-duplication registration will be aborted. Since the insert is
- // deferred till "transaction idle", do the same here, so that the ordering is
- // maintained. Having only the de-duplication registration succeed would cause
- // jobs to become no-ops without any actual jobs that made them redundant.
- $dbw = $this->getMasterDB();
- $cache = $this->dupCache;
- $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) {
- $timestamp = $cache->get( $key ); // current last timestamp of this job
- if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
- return true; // a newer version of this root job was enqueued
- }
-
- // Update the timestamp of the last root job started at the location...
- return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
- } );
-
- return true;
- }
-
- /**
- * @see JobQueue::doDelete()
- * @return bool
- */
- protected function doDelete() {
- $dbw = $this->getMasterDB();
- try {
- $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
- return true;
- }
-
- /**
- * @see JobQueue::doWaitForBackups()
- * @return void
- */
- protected function doWaitForBackups() {
- wfWaitForSlaves();
- }
-
- /**
- * @return Array
- */
- protected function doGetPeriodicTasks() {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- )
- );
- }
-
- /**
- * @return void
- */
- protected function doFlushCaches() {
- foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
- $this->cache->delete( $this->getCacheKey( $type ) );
- }
- }
-
- /**
- * @see JobQueue::getAllQueuedJobs()
- * @return Iterator
- */
- public function getAllQueuedJobs() {
- $dbr = $this->getSlaveDB();
- try {
- return new MappedIterator(
- $dbr->select( 'job', '*',
- array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
- function( $row ) use ( $dbr ) {
- $job = Job::factory(
- $row->job_cmd,
- Title::makeTitle( $row->job_namespace, $row->job_title ),
- strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
- $row->job_id
- );
- $job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around broken subclasses
- return $job;
- }
- );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
- }
-
- public function getCoalesceLocationInternal() {
- return $this->cluster
- ? "DBCluster:{$this->cluster}:{$this->wiki}"
- : "LBFactory:{$this->wiki}";
- }
-
- protected function doGetSiblingQueuesWithJobs( array $types ) {
- $dbr = $this->getSlaveDB();
- $res = $dbr->select( 'job', 'DISTINCT job_cmd',
- array( 'job_cmd' => $types ), __METHOD__ );
-
- $types = array();
- foreach ( $res as $row ) {
- $types[] = $row->job_cmd;
- }
- return $types;
- }
-
- protected function doGetSiblingQueueSizes( array $types ) {
- $dbr = $this->getSlaveDB();
- $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
- array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
-
- $sizes = array();
- foreach ( $res as $row ) {
- $sizes[$row->job_cmd] = (int)$row->count;
- }
- return $sizes;
- }
-
- /**
- * Recycle or destroy any jobs that have been claimed for too long
- *
- * @return integer Number of jobs recycled/deleted
- */
- public function recycleAndDeleteStaleJobs() {
- $now = time();
- $count = 0; // affected rows
- $dbw = $this->getMasterDB();
-
- try {
- if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
- return $count; // already in progress
- }
-
- // Remove claims on jobs acquired for too long if enabled...
- if ( $this->claimTTL > 0 ) {
- $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
- // Get the IDs of jobs that have be claimed but not finished after too long.
- // These jobs can be recycled into the queue by expiring the claim. Selecting
- // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
- $res = $dbw->select( 'job', 'job_id',
- array(
- 'job_cmd' => $this->type,
- "job_token != {$dbw->addQuotes( '' )}", // was acquired
- "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
- "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
- __METHOD__
- );
- $ids = array_map(
- function( $o ) {
- return $o->job_id;
- }, iterator_to_array( $res )
- );
- if ( count( $ids ) ) {
- // Reset job_token for these jobs so that other runners will pick them up.
- // Set the timestamp to the current time, as it is useful to now that the job
- // was already tried before (the timestamp becomes the "released" time).
- $dbw->update( 'job',
- array(
- 'job_token' => '',
- 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
- array(
- 'job_id' => $ids ),
- __METHOD__
- );
- $count += $dbw->affectedRows();
- JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() );
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
- }
- }
-
- // Just destroy any stale jobs...
- $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
- $conds = array(
- 'job_cmd' => $this->type,
- "job_token != {$dbw->addQuotes( '' )}", // was acquired
- "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
- );
- if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
- $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
- }
- // Get the IDs of jobs that are considered stale and should be removed. Selecting
- // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
- $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
- $ids = array_map(
- function( $o ) {
- return $o->job_id;
- }, iterator_to_array( $res )
- );
- if ( count( $ids ) ) {
- $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
- $count += $dbw->affectedRows();
- JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() );
- }
-
- $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
- } catch ( DBError $e ) {
- $this->throwDBException( $e );
- }
-
- return $count;
- }
-
- /**
- * @param $job Job
- * @return array
- */
- protected function insertFields( Job $job ) {
- $dbw = $this->getMasterDB();
- return array(
- // Fields that describe the nature of the job
- 'job_cmd' => $job->getType(),
- 'job_namespace' => $job->getTitle()->getNamespace(),
- 'job_title' => $job->getTitle()->getDBkey(),
- 'job_params' => self::makeBlob( $job->getParams() ),
- // Additional job metadata
- 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
- 'job_timestamp' => $dbw->timestamp(),
- 'job_sha1' => wfBaseConvert(
- sha1( serialize( $job->getDeduplicationInfo() ) ),
- 16, 36, 31
- ),
- 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
- );
- }
-
- /**
- * @return DBConnRef
- */
- protected function getSlaveDB() {
- try {
- return $this->getDB( DB_SLAVE );
- } catch ( DBConnectionError $e ) {
- throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
- }
- }
-
- /**
- * @return DBConnRef
- */
- protected function getMasterDB() {
- try {
- return $this->getDB( DB_MASTER );
- } catch ( DBConnectionError $e ) {
- throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
- }
- }
-
- /**
- * @param $index integer (DB_SLAVE/DB_MASTER)
- * @return DBConnRef
- */
- protected function getDB( $index ) {
- $lb = ( $this->cluster !== false )
- ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
- : wfGetLB( $this->wiki );
- return $lb->getConnectionRef( $index, array(), $this->wiki );
- }
-
- /**
- * @return string
- */
- private function getCacheKey( $property ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
- }
-
- /**
- * @param $params
- * @return string
- */
- protected static function makeBlob( $params ) {
- if ( $params !== false ) {
- return serialize( $params );
- } else {
- return '';
- }
- }
-
- /**
- * @param $blob
- * @return bool|mixed
- */
- protected static function extractBlob( $blob ) {
- if ( (string)$blob !== '' ) {
- return unserialize( $blob );
- } else {
- return false;
- }
- }
-
- /**
- * @param DBError $e
- * @throws JobQueueError
- */
- protected function throwDBException( DBError $e ) {
- throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
- }
-}
diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php
deleted file mode 100644
index d3ce164a..00000000
--- a/includes/job/JobQueueFederated.php
+++ /dev/null
@@ -1,473 +0,0 @@
-<?php
-/**
- * Job queue code for federated queues.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle enqueueing and running of background jobs for federated queues
- *
- * This class allows for queues to be partitioned into smaller queues.
- * A partition is defined by the configuration for a JobQueue instance.
- * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a
- * JobQueueFederated instance, which itself would consist of three JobQueueRedis
- * instances, each using their own redis server. This would allow for the jobs
- * to be split (evenly or based on weights) accross multiple servers if a single
- * server becomes impractical or expensive. Different JobQueue classes can be mixed.
- *
- * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue
- * is inherited by the partition queues. Additional configuration defines what
- * section each wiki is in, what partition queues each section uses (and their weight),
- * and the JobQueue configuration for each partition. Some sections might only need a
- * single queue partition, like the sections for groups of small wikis.
- *
- * If used for performance, then $wgMainCacheType should be set to memcached/redis.
- * Note that "fifo" cannot be used for the ordering, since the data is distributed.
- * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also,
- * queue classes used by this should ignore down servers (with TTL) to avoid slowness.
- *
- * @ingroup JobQueue
- * @since 1.22
- */
-class JobQueueFederated extends JobQueue {
- /** @var Array (partition name => weight) reverse sorted by weight */
- protected $partitionMap = array();
- /** @var Array (partition name => JobQueue) reverse sorted by weight */
- protected $partitionQueues = array();
- /** @var HashRing */
- protected $partitionPushRing;
- /** @var BagOStuff */
- protected $cache;
-
- const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
-
- /**
- * @params include:
- * - sectionsByWiki : A map of wiki IDs to section names.
- * Wikis will default to using the section "default".
- * - partitionsBySection : Map of section names to maps of (partition name => weight).
- * A section called 'default' must be defined if not all wikis
- * have explicitly defined sections.
- * - configByPartition : Map of queue partition names to configuration arrays.
- * These configuration arrays are passed to JobQueue::factory().
- * The options set here are overriden by those passed to this
- * the federated queue itself (e.g. 'order' and 'claimTTL').
- * - partitionsNoPush : List of partition names that can handle pop() but not push().
- * This can be used to migrate away from a certain partition.
- * @param array $params
- */
- protected function __construct( array $params ) {
- parent::__construct( $params );
- $section = isset( $params['sectionsByWiki'][$this->wiki] )
- ? $params['sectionsByWiki'][$this->wiki]
- : 'default';
- if ( !isset( $params['partitionsBySection'][$section] ) ) {
- throw new MWException( "No configuration for section '$section'." );
- }
- // Get the full partition map
- $this->partitionMap = $params['partitionsBySection'][$section];
- arsort( $this->partitionMap, SORT_NUMERIC );
- // Get the partitions jobs can actually be pushed to
- $partitionPushMap = $this->partitionMap;
- if ( isset( $params['partitionsNoPush'] ) ) {
- foreach ( $params['partitionsNoPush'] as $partition ) {
- unset( $partitionPushMap[$partition] );
- }
- }
- // Get the config to pass to merge into each partition queue config
- $baseConfig = $params;
- foreach ( array( 'class', 'sectionsByWiki',
- 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o )
- {
- unset( $baseConfig[$o] );
- }
- // Get the partition queue objects
- foreach ( $this->partitionMap as $partition => $w ) {
- if ( !isset( $params['configByPartition'][$partition] ) ) {
- throw new MWException( "No configuration for partition '$partition'." );
- }
- $this->partitionQueues[$partition] = JobQueue::factory(
- $baseConfig + $params['configByPartition'][$partition] );
- }
- // Get the ring of partitions to push jobs into
- $this->partitionPushRing = new HashRing( $partitionPushMap );
- // Aggregate cache some per-queue values if there are multiple partition queues
- $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
- }
-
- protected function supportedOrders() {
- // No FIFO due to partitioning, though "rough timestamp order" is supported
- return array( 'undefined', 'random', 'timestamp' );
- }
-
- protected function optimalOrder() {
- return 'undefined'; // defer to the partitions
- }
-
- protected function supportsDelayedJobs() {
- return true; // defer checks to the partitions
- }
-
- protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
- foreach ( $this->partitionQueues as $queue ) {
- try {
- if ( !$queue->doIsEmpty() ) {
- $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
- return false;
- }
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
-
- $this->cache->add( $key, 'true', self::CACHE_TTL_LONG );
- return true;
- }
-
- protected function doGetSize() {
- return $this->getCrossPartitionSum( 'size', 'doGetSize' );
- }
-
- protected function doGetAcquiredCount() {
- return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
- }
-
- protected function doGetDelayedCount() {
- return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
- }
-
- protected function doGetAbandonedCount() {
- return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
- }
-
- /**
- * @param string $type
- * @param string $method
- * @return integer
- */
- protected function getCrossPartitionSum( $type, $method ) {
- $key = $this->getCacheKey( $type );
-
- $count = $this->cache->get( $key );
- if ( is_int( $count ) ) {
- return $count;
- }
-
- $count = 0;
- foreach ( $this->partitionQueues as $queue ) {
- try {
- $count += $queue->$method();
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
-
- $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
- return $count;
- }
-
- protected function doBatchPush( array $jobs, $flags ) {
- if ( !count( $jobs ) ) {
- return true; // nothing to do
- }
- // Local ring variable that may be changed to point to a new ring on failure
- $partitionRing = $this->partitionPushRing;
- // Try to insert the jobs and update $partitionsTry on any failures
- $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags );
- if ( count( $jobsLeft ) ) { // some jobs failed to insert?
- // Try to insert the remaning jobs once more, ignoring the bad partitions
- return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) );
- }
- return true;
- }
-
- /**
- * @param array $jobs
- * @param HashRing $partitionRing
- * @param integer $flags
- * @return array List of Job object that could not be inserted
- */
- protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
- $jobsLeft = array();
-
- // Because jobs are spread across partitions, per-job de-duplication needs
- // to use a consistent hash to avoid allowing duplicate jobs per partition.
- // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
- $uJobsByPartition = array(); // (partition name => job list)
- foreach ( $jobs as $key => $job ) {
- if ( $job->ignoreDuplicates() ) {
- $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
- $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job;
- unset( $jobs[$key] );
- }
- }
- // Get the batches of jobs that are not de-duplicated
- if ( $flags & self::QOS_ATOMIC ) {
- $nuJobBatches = array( $jobs ); // all or nothing
- } else {
- // Split the jobs into batches and spread them out over servers if there
- // are many jobs. This helps keep the partitions even. Otherwise, send all
- // the jobs to a single partition queue to avoids the extra connections.
- $nuJobBatches = array_chunk( $jobs, 300 );
- }
-
- // Insert the de-duplicated jobs into the queues...
- foreach ( $uJobsByPartition as $partition => $jobBatch ) {
- $queue = $this->partitionQueues[$partition];
- try {
- $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
- } catch ( JobQueueError $e ) {
- $ok = false;
- MWExceptionHandler::logException( $e );
- }
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
- } else {
- $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
- if ( !$partitionRing ) {
- throw new JobQueueError( "Could not insert job(s), all partitions are down." );
- }
- $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
- }
- }
-
- // Insert the jobs that are not de-duplicated into the queues...
- foreach ( $nuJobBatches as $jobBatch ) {
- $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() );
- $queue = $this->partitionQueues[$partition];
- try {
- $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
- } catch ( JobQueueError $e ) {
- $ok = false;
- MWExceptionHandler::logException( $e );
- }
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
- } else {
- $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
- if ( !$partitionRing ) {
- throw new JobQueueError( "Could not insert job(s), all partitions are down." );
- }
- $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
- }
- }
-
- return $jobsLeft;
- }
-
- protected function doPop() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return false;
- }
-
- $partitionsTry = $this->partitionMap; // (partition => weight)
-
- while ( count( $partitionsTry ) ) {
- $partition = ArrayUtils::pickRandom( $partitionsTry );
- if ( $partition === false ) {
- break; // all partitions at 0 weight
- }
- $queue = $this->partitionQueues[$partition];
- try {
- $job = $queue->pop();
- } catch ( JobQueueError $e ) {
- $job = false;
- MWExceptionHandler::logException( $e );
- }
- if ( $job ) {
- $job->metadata['QueuePartition'] = $partition;
- return $job;
- } else {
- unset( $partitionsTry[$partition] ); // blacklist partition
- }
- }
-
- $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG );
- return false;
- }
-
- protected function doAck( Job $job ) {
- if ( !isset( $job->metadata['QueuePartition'] ) ) {
- throw new MWException( "The given job has no defined partition name." );
- }
- return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
- }
-
- protected function doIsRootJobOldDuplicate( Job $job ) {
- $params = $job->getRootJobParams();
- $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
- try {
- return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
- } catch ( JobQueueError $e ) {
- if ( isset( $partitions[1] ) ) { // check fallback partition
- return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
- }
- }
- return false;
- }
-
- protected function doDeduplicateRootJob( Job $job ) {
- $params = $job->getRootJobParams();
- $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
- try {
- return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
- } catch ( JobQueueError $e ) {
- if ( isset( $partitions[1] ) ) { // check fallback partition
- return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
- }
- }
- return false;
- }
-
- protected function doDelete() {
- foreach ( $this->partitionQueues as $queue ) {
- try {
- $queue->doDelete();
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- }
-
- protected function doWaitForBackups() {
- foreach ( $this->partitionQueues as $queue ) {
- try {
- $queue->waitForBackups();
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- }
-
- protected function doGetPeriodicTasks() {
- $tasks = array();
- foreach ( $this->partitionQueues as $partition => $queue ) {
- foreach ( $queue->getPeriodicTasks() as $task => $def ) {
- $tasks["{$partition}:{$task}"] = $def;
- }
- }
- return $tasks;
- }
-
- protected function doFlushCaches() {
- static $types = array(
- 'empty',
- 'size',
- 'acquiredcount',
- 'delayedcount',
- 'abandonedcount'
- );
- foreach ( $types as $type ) {
- $this->cache->delete( $this->getCacheKey( $type ) );
- }
- foreach ( $this->partitionQueues as $queue ) {
- $queue->doFlushCaches();
- }
- }
-
- public function getAllQueuedJobs() {
- $iterator = new AppendIterator();
- foreach ( $this->partitionQueues as $queue ) {
- $iterator->append( $queue->getAllQueuedJobs() );
- }
- return $iterator;
- }
-
- public function getAllDelayedJobs() {
- $iterator = new AppendIterator();
- foreach ( $this->partitionQueues as $queue ) {
- $iterator->append( $queue->getAllDelayedJobs() );
- }
- return $iterator;
- }
-
- public function getCoalesceLocationInternal() {
- return "JobQueueFederated:wiki:{$this->wiki}" .
- sha1( serialize( array_keys( $this->partitionMap ) ) );
- }
-
- protected function doGetSiblingQueuesWithJobs( array $types ) {
- $result = array();
- foreach ( $this->partitionQueues as $queue ) {
- try {
- $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
- if ( is_array( $nonEmpty ) ) {
- $result = array_unique( array_merge( $result, $nonEmpty ) );
- } else {
- return null; // not supported on all partitions; bail
- }
- if ( count( $result ) == count( $types ) ) {
- break; // short-circuit
- }
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- return array_values( $result );
- }
-
- protected function doGetSiblingQueueSizes( array $types ) {
- $result = array();
- foreach ( $this->partitionQueues as $queue ) {
- try {
- $sizes = $queue->doGetSiblingQueueSizes( $types );
- if ( is_array( $sizes ) ) {
- foreach ( $sizes as $type => $size ) {
- $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
- }
- } else {
- return null; // not supported on all partitions; bail
- }
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- return $result;
- }
-
- public function setTestingPrefix( $key ) {
- foreach ( $this->partitionQueues as $queue ) {
- $queue->setTestingPrefix( $key );
- }
- }
-
- /**
- * @return string
- */
- private function getCacheKey( $property ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
- }
-}
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
deleted file mode 100644
index fa7fee5f..00000000
--- a/includes/job/JobQueueGroup.php
+++ /dev/null
@@ -1,427 +0,0 @@
-<?php
-/**
- * Job queue base code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle enqueueing of background jobs
- *
- * @ingroup JobQueue
- * @since 1.21
- */
-class JobQueueGroup {
- /** @var Array */
- protected static $instances = array();
-
- /** @var ProcessCacheLRU */
- protected $cache;
-
- protected $wiki; // string; wiki ID
-
- /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
- protected $coalescedQueues;
-
- const TYPE_DEFAULT = 1; // integer; jobs popped by default
- const TYPE_ANY = 2; // integer; any job
-
- const USE_CACHE = 1; // integer; use process or persistent cache
- const USE_PRIORITY = 2; // integer; respect deprioritization
-
- const PROC_CACHE_TTL = 15; // integer; seconds
-
- const CACHE_VERSION = 1; // integer; cache version
-
- /**
- * @param string $wiki Wiki ID
- */
- protected function __construct( $wiki ) {
- $this->wiki = $wiki;
- $this->cache = new ProcessCacheLRU( 10 );
- }
-
- /**
- * @param string $wiki Wiki ID
- * @return JobQueueGroup
- */
- public static function singleton( $wiki = false ) {
- $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
- if ( !isset( self::$instances[$wiki] ) ) {
- self::$instances[$wiki] = new self( $wiki );
- }
- return self::$instances[$wiki];
- }
-
- /**
- * Destroy the singleton instances
- *
- * @return void
- */
- public static function destroySingletons() {
- self::$instances = array();
- }
-
- /**
- * Get the job queue object for a given queue type
- *
- * @param $type string
- * @return JobQueue
- */
- public function get( $type ) {
- global $wgJobTypeConf;
-
- $conf = array( 'wiki' => $this->wiki, 'type' => $type );
- if ( isset( $wgJobTypeConf[$type] ) ) {
- $conf = $conf + $wgJobTypeConf[$type];
- } else {
- $conf = $conf + $wgJobTypeConf['default'];
- }
-
- return JobQueue::factory( $conf );
- }
-
- /**
- * Insert jobs into the respective queues of with the belong.
- *
- * This inserts the jobs into the queue specified by $wgJobTypeConf
- * and updates the aggregate job queue information cache as needed.
- *
- * @param $jobs Job|array A single Job or a list of Jobs
- * @throws MWException
- * @return bool
- */
- public function push( $jobs ) {
- $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
-
- $jobsByType = array(); // (job type => list of jobs)
- foreach ( $jobs as $job ) {
- if ( $job instanceof Job ) {
- $jobsByType[$job->getType()][] = $job;
- } else {
- throw new MWException( "Attempted to push a non-Job object into a queue." );
- }
- }
-
- $ok = true;
- foreach ( $jobsByType as $type => $jobs ) {
- if ( $this->get( $type )->push( $jobs ) ) {
- JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
- } else {
- $ok = false;
- }
- }
-
- if ( $this->cache->has( 'queues-ready', 'list' ) ) {
- $list = $this->cache->get( 'queues-ready', 'list' );
- if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
- $this->cache->clear( 'queues-ready' );
- }
- }
-
- return $ok;
- }
-
- /**
- * Pop a job off one of the job queues
- *
- * This pops a job off a queue as specified by $wgJobTypeConf and
- * updates the aggregate job queue information cache as needed.
- *
- * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string
- * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
- * @return Job|bool Returns false on failure
- */
- public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
- if ( is_string( $qtype ) ) { // specific job type
- if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) {
- return false; // back off
- }
- $job = $this->get( $qtype )->pop();
- if ( !$job ) {
- JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
- }
- return $job;
- } else { // any job in the "default" jobs types
- if ( $flags & self::USE_CACHE ) {
- if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
- $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
- }
- $types = $this->cache->get( 'queues-ready', 'list' );
- } else {
- $types = $this->getQueuesWithJobs();
- }
-
- if ( $qtype == self::TYPE_DEFAULT ) {
- $types = array_intersect( $types, $this->getDefaultQueueTypes() );
- }
- shuffle( $types ); // avoid starvation
-
- foreach ( $types as $type ) { // for each queue...
- if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) {
- continue; // back off
- }
- $job = $this->get( $type )->pop();
- if ( $job ) { // found
- return $job;
- } else { // not found
- JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
- $this->cache->clear( 'queues-ready' );
- }
- }
-
- return false; // no jobs found
- }
- }
-
- /**
- * Acknowledge that a job was completed
- *
- * @param $job Job
- * @return bool
- */
- public function ack( Job $job ) {
- return $this->get( $job->getType() )->ack( $job );
- }
-
- /**
- * Register the "root job" of a given job into the queue for de-duplication.
- * This should only be called right *after* all the new jobs have been inserted.
- *
- * @param $job Job
- * @return bool
- */
- public function deduplicateRootJob( Job $job ) {
- return $this->get( $job->getType() )->deduplicateRootJob( $job );
- }
-
- /**
- * Wait for any slaves or backup queue servers to catch up.
- *
- * This does nothing for certain queue classes.
- *
- * @return void
- * @throws MWException
- */
- public function waitForBackups() {
- global $wgJobTypeConf;
-
- wfProfileIn( __METHOD__ );
- // Try to avoid doing this more than once per queue storage medium
- foreach ( $wgJobTypeConf as $type => $conf ) {
- $this->get( $type )->waitForBackups();
- }
- wfProfileOut( __METHOD__ );
- }
-
- /**
- * Get the list of queue types
- *
- * @return array List of strings
- */
- public function getQueueTypes() {
- return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
- }
-
- /**
- * Get the list of default queue types
- *
- * @return array List of strings
- */
- public function getDefaultQueueTypes() {
- global $wgJobTypesExcludedFromDefaultQueue;
-
- return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
- }
-
- /**
- * Get the list of job types that have non-empty queues
- *
- * @return Array List of job types that have non-empty queues
- */
- public function getQueuesWithJobs() {
- $types = array();
- foreach ( $this->getCoalescedQueues() as $info ) {
- $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
- if ( is_array( $nonEmpty ) ) { // batching features supported
- $types = array_merge( $types, $nonEmpty );
- } else { // we have to go through the queues in the bucket one-by-one
- foreach ( $info['types'] as $type ) {
- if ( !$this->get( $type )->isEmpty() ) {
- $types[] = $type;
- }
- }
- }
- }
- return $types;
- }
-
- /**
- * Get the size of the queus for a list of job types
- *
- * @return Array Map of (job type => size)
- */
- public function getQueueSizes() {
- $sizeMap = array();
- foreach ( $this->getCoalescedQueues() as $info ) {
- $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
- if ( is_array( $sizes ) ) { // batching features supported
- $sizeMap = $sizeMap + $sizes;
- } else { // we have to go through the queues in the bucket one-by-one
- foreach ( $info['types'] as $type ) {
- $sizeMap[$type] = $this->get( $type )->getSize();
- }
- }
- }
- return $sizeMap;
- }
-
- /**
- * @return array
- */
- protected function getCoalescedQueues() {
- global $wgJobTypeConf;
-
- if ( $this->coalescedQueues === null ) {
- $this->coalescedQueues = array();
- foreach ( $wgJobTypeConf as $type => $conf ) {
- $queue = JobQueue::factory(
- array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
- $loc = $queue->getCoalesceLocationInternal();
- if ( !isset( $this->coalescedQueues[$loc] ) ) {
- $this->coalescedQueues[$loc]['queue'] = $queue;
- $this->coalescedQueues[$loc]['types'] = array();
- }
- if ( $type === 'default' ) {
- $this->coalescedQueues[$loc]['types'] = array_merge(
- $this->coalescedQueues[$loc]['types'],
- array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
- );
- } else {
- $this->coalescedQueues[$loc]['types'][] = $type;
- }
- }
- }
-
- return $this->coalescedQueues;
- }
-
- /**
- * Check if jobs should not be popped of a queue right now.
- * This is only used for performance, such as to avoid spamming
- * the queue with many sub-jobs before they actually get run.
- *
- * @param $type string
- * @return bool
- */
- public function isQueueDeprioritized( $type ) {
- if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) {
- return $this->cache->get( 'isDeprioritized', $type );
- }
- if ( $type === 'refreshLinks2' ) {
- // Don't keep converting refreshLinks2 => refreshLinks jobs if the
- // later jobs have not been done yet. This helps throttle queue spam.
- $deprioritized = !$this->get( 'refreshLinks' )->isEmpty();
- $this->cache->set( 'isDeprioritized', $type, $deprioritized );
- return $deprioritized;
- }
- return false;
- }
-
- /**
- * Execute any due periodic queue maintenance tasks for all queues.
- *
- * A task is "due" if the time ellapsed since the last run is greater than
- * the defined run period. Concurrent calls to this function will cause tasks
- * to be attempted twice, so they may need their own methods of mutual exclusion.
- *
- * @return integer Number of tasks run
- */
- public function executeReadyPeriodicTasks() {
- global $wgMemc;
-
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
- $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
-
- $count = 0;
- $tasksRun = array(); // (queue => task => UNIX timestamp)
- foreach ( $this->getQueueTypes() as $type ) {
- $queue = $this->get( $type );
- foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
- if ( $definition['period'] <= 0 ) {
- continue; // disabled
- } elseif ( !isset( $lastRuns[$type][$task] )
- || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
- {
- try {
- if ( call_user_func( $definition['callback'] ) !== null ) {
- $tasksRun[$type][$task] = time();
- ++$count;
- }
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- }
- }
-
- $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
- if ( is_array( $lastRuns ) ) {
- foreach ( $tasksRun as $type => $tasks ) {
- foreach ( $tasks as $task => $timestamp ) {
- if ( !isset( $lastRuns[$type][$task] )
- || $timestamp > $lastRuns[$type][$task] )
- {
- $lastRuns[$type][$task] = $timestamp;
- }
- }
- }
- } else {
- $lastRuns = $tasksRun;
- }
- return $lastRuns;
- } );
-
- return $count;
- }
-
- /**
- * @param $name string
- * @return mixed
- */
- private function getCachedConfigVar( $name ) {
- global $wgConf, $wgMemc;
-
- if ( $this->wiki === wfWikiID() ) {
- return $GLOBALS[$name]; // common case
- } else {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
- $value = $wgMemc->get( $key ); // ('v' => ...) or false
- if ( is_array( $value ) ) {
- return $value['v'];
- } else {
- $value = $wgConf->getConfig( $this->wiki, $name );
- $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
- return $value;
- }
- }
- }
-}
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
deleted file mode 100644
index 378e1755..00000000
--- a/includes/job/JobQueueRedis.php
+++ /dev/null
@@ -1,856 +0,0 @@
-<?php
-/**
- * Redis-backed job queue code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle job queues stored in Redis
- *
- * This is faster, less resource intensive, queue that JobQueueDB.
- * All data for a queue using this class is placed into one redis server.
- *
- * There are eight main redis keys used to track jobs:
- * - l-unclaimed : A list of job IDs used for ready unclaimed jobs
- * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries
- * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
- * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs
- * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
- * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
- * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries
- * - h-data : A hash of (job ID => serialized blobs) for job storage
- * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned.
- * If an ID appears in any of those lists, it should have a h-data entry for its ID.
- * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then
- * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById
- * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
- * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
- *
- * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
- * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
- * All the keys are prefixed with the relevant wiki ID information.
- *
- * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
- * Additionally, it should be noted that redis has different persistence modes, such
- * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be
- * made on the servers based on what queues are using it and what tolerance they have.
- *
- * @ingroup JobQueue
- * @ingroup Redis
- * @since 1.22
- */
-class JobQueueRedis extends JobQueue {
- /** @var RedisConnectionPool */
- protected $redisPool;
-
- protected $server; // string; server address
- protected $compression; // string; compression method to use
-
- const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
-
- protected $key; // string; key to prefix the queue keys with (used for testing)
-
- /**
- * @params include:
- * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
- * Note that the serializer option is ignored "none" is always used.
- * - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
- * If a hostname is specified but no port, the standard port number
- * 6379 will be used. Required.
- * - compression : The type of compression to use; one of (none,gzip).
- * @param array $params
- */
- public function __construct( array $params ) {
- parent::__construct( $params );
- $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
- $this->server = $params['redisServer'];
- $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
- $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
- }
-
- protected function supportedOrders() {
- return array( 'timestamp', 'fifo' );
- }
-
- protected function optimalOrder() {
- return 'fifo';
- }
-
- protected function supportsDelayedJobs() {
- return true;
- }
-
- /**
- * @see JobQueue::doIsEmpty()
- * @return bool
- * @throws MWException
- */
- protected function doIsEmpty() {
- return $this->doGetSize() == 0;
- }
-
- /**
- * @see JobQueue::doGetSize()
- * @return integer
- * @throws MWException
- */
- protected function doGetSize() {
- $conn = $this->getConnection();
- try {
- return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * @see JobQueue::doGetAcquiredCount()
- * @return integer
- * @throws MWException
- */
- protected function doGetAcquiredCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
- $conn = $this->getConnection();
- try {
- $conn->multi( Redis::PIPELINE );
- $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
- $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
- return array_sum( $conn->exec() );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * @see JobQueue::doGetDelayedCount()
- * @return integer
- * @throws MWException
- */
- protected function doGetDelayedCount() {
- if ( !$this->checkDelay ) {
- return 0; // no delayed jobs
- }
- $conn = $this->getConnection();
- try {
- return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * @see JobQueue::doGetAbandonedCount()
- * @return integer
- * @throws MWException
- */
- protected function doGetAbandonedCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
- $conn = $this->getConnection();
- try {
- return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * @see JobQueue::doBatchPush()
- * @param array $jobs
- * @param $flags
- * @return bool
- * @throws MWException
- */
- protected function doBatchPush( array $jobs, $flags ) {
- // Convert the jobs into field maps (de-duplicated against each other)
- $items = array(); // (job ID => job fields map)
- foreach ( $jobs as $job ) {
- $item = $this->getNewJobFields( $job );
- if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
- $items[$item['sha1']] = $item;
- } else {
- $items[$item['uuid']] = $item;
- }
- }
-
- if ( !count( $items ) ) {
- return true; // nothing to do
- }
-
- $conn = $this->getConnection();
- try {
- // Actually push the non-duplicate jobs into the queue...
- if ( $flags & self::QOS_ATOMIC ) {
- $batches = array( $items ); // all or nothing
- } else {
- $batches = array_chunk( $items, 500 ); // avoid tying up the server
- }
- $failed = 0;
- $pushed = 0;
- foreach ( $batches as $itemBatch ) {
- $added = $this->pushBlobs( $conn, $itemBatch );
- if ( is_int( $added ) ) {
- $pushed += $added;
- } else {
- $failed += count( $itemBatch );
- }
- }
- if ( $failed > 0 ) {
- wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
- return false;
- }
- JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
- JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $items ) - $failed - $pushed );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- return true;
- }
-
- /**
- * @param RedisConnRef $conn
- * @param array $items List of results from JobQueueRedis::getNewJobFields()
- * @return integer Number of jobs inserted (duplicates are ignored)
- * @throws RedisException
- */
- protected function pushBlobs( RedisConnRef $conn, array $items ) {
- $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
- foreach ( $items as $item ) {
- $args[] = (string)$item['uuid'];
- $args[] = (string)$item['sha1'];
- $args[] = (string)$item['rtimestamp'];
- $args[] = (string)$this->serialize( $item );
- }
- static $script =
-<<<LUA
- if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
- local pushed = 0
- for i = 1,#ARGV,4 do
- local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
- if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
- if 1*rtimestamp > 0 then
- -- Insert into delayed queue (release time as score)
- redis.call('zAdd',KEYS[4],rtimestamp,id)
- else
- -- Insert into unclaimed queue
- redis.call('lPush',KEYS[1],id)
- end
- if sha1 ~= '' then
- redis.call('hSet',KEYS[2],id,sha1)
- redis.call('hSet',KEYS[3],sha1,id)
- end
- redis.call('hSet',KEYS[5],id,blob)
- pushed = pushed + 1
- end
- end
- return pushed
-LUA;
- return $conn->luaEval( $script,
- array_merge(
- array(
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
- $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
- $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
- $this->getQueueKey( 'z-delayed' ), # KEYS[4]
- $this->getQueueKey( 'h-data' ), # KEYS[5]
- ),
- $args
- ),
- 5 # number of first argument(s) that are keys
- );
- }
-
- /**
- * @see JobQueue::doPop()
- * @return Job|bool
- * @throws MWException
- */
- protected function doPop() {
- $job = false;
-
- // Push ready delayed jobs into the queue every 10 jobs to spread the load.
- // This is also done as a periodic task, but we don't want too much done at once.
- if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
- $this->releaseReadyDelayedJobs();
- }
-
- $conn = $this->getConnection();
- try {
- do {
- if ( $this->claimTTL > 0 ) {
- // Keep the claimed job list down for high-traffic queues
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->recycleAndDeleteStaleJobs();
- }
- $blob = $this->popAndAcquireBlob( $conn );
- } else {
- $blob = $this->popAndDeleteBlob( $conn );
- }
- if ( $blob === false ) {
- break; // no jobs; nothing to do
- }
-
- JobQueue::incrStats( 'job-pop', $this->type );
- $item = $this->unserialize( $blob );
- if ( $item === false ) {
- wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
- continue;
- }
-
- // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
- $job = $this->getJobFromFields( $item ); // may be false
- } while ( !$job ); // job may be false if invalid
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- return $job;
- }
-
- /**
- * @param RedisConnRef $conn
- * @return array serialized string or false
- * @throws RedisException
- */
- protected function popAndDeleteBlob( RedisConnRef $conn ) {
- static $script =
-<<<LUA
- -- Pop an item off the queue
- local id = redis.call('rpop',KEYS[1])
- if not id then return false end
- -- Get the job data and remove it
- local item = redis.call('hGet',KEYS[4],id)
- redis.call('hDel',KEYS[4],id)
- -- Allow new duplicates of this job
- local sha1 = redis.call('hGet',KEYS[2],id)
- if sha1 then redis.call('hDel',KEYS[3],sha1) end
- redis.call('hDel',KEYS[2],id)
- -- Return the job data
- return item
-LUA;
- return $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
- $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
- $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- ),
- 4 # number of first argument(s) that are keys
- );
- }
-
- /**
- * @param RedisConnRef $conn
- * @return array serialized string or false
- * @throws RedisException
- */
- protected function popAndAcquireBlob( RedisConnRef $conn ) {
- static $script =
-<<<LUA
- -- Pop an item off the queue
- local id = redis.call('rPop',KEYS[1])
- if not id then return false end
- -- Allow new duplicates of this job
- local sha1 = redis.call('hGet',KEYS[2],id)
- if sha1 then redis.call('hDel',KEYS[3],sha1) end
- redis.call('hDel',KEYS[2],id)
- -- Mark the jobs as claimed and return it
- redis.call('zAdd',KEYS[4],ARGV[1],id)
- redis.call('hIncrBy',KEYS[5],id,1)
- return redis.call('hGet',KEYS[6],id)
-LUA;
- return $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
- $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
- $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
- $this->getQueueKey( 'z-claimed' ), # KEYS[4]
- $this->getQueueKey( 'h-attempts' ), # KEYS[5]
- $this->getQueueKey( 'h-data' ), # KEYS[6]
- time(), # ARGV[1] (injected to be replication-safe)
- ),
- 6 # number of first argument(s) that are keys
- );
- }
-
- /**
- * @see JobQueue::doAck()
- * @param Job $job
- * @return Job|bool
- * @throws MWException
- */
- protected function doAck( Job $job ) {
- if ( !isset( $job->metadata['uuid'] ) ) {
- throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
- }
- if ( $this->claimTTL > 0 ) {
- $conn = $this->getConnection();
- try {
- static $script =
-<<<LUA
- -- Unmark the job as claimed
- redis.call('zRem',KEYS[1],ARGV[1])
- redis.call('hDel',KEYS[2],ARGV[1])
- -- Delete the job data itself
- return redis.call('hDel',KEYS[3],ARGV[1])
-LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'h-data' ), # KEYS[3]
- $job->metadata['uuid'] # ARGV[1]
- ),
- 3 # number of first argument(s) that are keys
- );
-
- if ( !$res ) {
- wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
- return false;
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
- return true;
- }
-
- /**
- * @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
- * @return bool
- * @throws MWException
- */
- protected function doDeduplicateRootJob( Job $job ) {
- if ( !$job->hasRootJobParams() ) {
- throw new MWException( "Cannot register root job; missing parameters." );
- }
- $params = $job->getRootJobParams();
-
- $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
-
- $conn = $this->getConnection();
- try {
- $timestamp = $conn->get( $key ); // current last timestamp of this job
- if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
- return true; // a newer version of this root job was enqueued
- }
- // Update the timestamp of the last root job started at the location...
- return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * @see JobQueue::doIsRootJobOldDuplicate()
- * @param Job $job
- * @return bool
- */
- protected function doIsRootJobOldDuplicate( Job $job ) {
- if ( !$job->hasRootJobParams() ) {
- return false; // job has no de-deplication info
- }
- $params = $job->getRootJobParams();
-
- $conn = $this->getConnection();
- try {
- // Get the last time this root job was enqueued
- $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- // Check if a new root job was started at the location after this one's...
- return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
- }
-
- /**
- * @see JobQueue::doDelete()
- * @return bool
- */
- protected function doDelete() {
- static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
- 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
-
- $conn = $this->getConnection();
- try {
- $keys = array();
- foreach ( $props as $prop ) {
- $keys[] = $this->getQueueKey( $prop );
- }
- return ( $conn->delete( $keys ) !== false );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * @see JobQueue::getAllQueuedJobs()
- * @return Iterator
- */
- public function getAllQueuedJobs() {
- $conn = $this->getConnection();
- try {
- $that = $this;
- return new MappedIterator(
- $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
- function( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) { return is_object( $job ); } )
- );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * @see JobQueue::getAllQueuedJobs()
- * @return Iterator
- */
- public function getAllDelayedJobs() {
- $conn = $this->getConnection();
- try {
- $that = $this;
- return new MappedIterator( // delayed jobs
- $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
- function( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) { return is_object( $job ); } )
- );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- public function getCoalesceLocationInternal() {
- return "RedisServer:" . $this->server;
- }
-
- protected function doGetSiblingQueuesWithJobs( array $types ) {
- return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
- }
-
- protected function doGetSiblingQueueSizes( array $types ) {
- $sizes = array(); // (type => size)
- $types = array_values( $types ); // reindex
- try {
- $conn = $this->getConnection();
- $conn->multi( Redis::PIPELINE );
- foreach ( $types as $type ) {
- $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
- }
- $res = $conn->exec();
- if ( is_array( $res ) ) {
- foreach ( $res as $i => $size ) {
- $sizes[$types[$i]] = $size;
- }
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- return $sizes;
- }
-
- /**
- * This function should not be called outside JobQueueRedis
- *
- * @param $uid string
- * @param $conn RedisConnRef
- * @return Job|bool Returns false if the job does not exist
- * @throws MWException
- */
- public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
- try {
- $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
- if ( $data === false ) {
- return false; // not found
- }
- $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
- if ( !is_array( $item ) ) { // this shouldn't happen
- throw new MWException( "Could not find job with ID '$uid'." );
- }
- $title = Title::makeTitle( $item['namespace'], $item['title'] );
- $job = Job::factory( $item['type'], $title, $item['params'] );
- $job->metadata['uuid'] = $item['uuid'];
- return $job;
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
- }
-
- /**
- * Release any ready delayed jobs into the queue
- *
- * @return integer Number of jobs released
- * @throws MWException
- */
- public function releaseReadyDelayedJobs() {
- $count = 0;
-
- $conn = $this->getConnection();
- try {
- static $script =
-<<<LUA
- -- Get the list of ready delayed jobs, sorted by readiness
- local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
- -- Migrate the jobs from the "delayed" set to the "unclaimed" list
- for k,id in ipairs(ids) do
- redis.call('lPush',KEYS[2],id)
- redis.call('zRem',KEYS[1],id)
- end
- return #ids
-LUA;
- $count += (int)$conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-delayed' ), // KEYS[1]
- $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
- time() // ARGV[1]; max "delay until" UNIX timestamp
- ),
- 2 # first two arguments are keys
- );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- return $count;
- }
-
- /**
- * Recycle or destroy any jobs that have been claimed for too long
- *
- * @return integer Number of jobs recycled/deleted
- * @throws MWException
- */
- public function recycleAndDeleteStaleJobs() {
- if ( $this->claimTTL <= 0 ) { // sanity
- throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
- }
- $count = 0;
- // For each job item that can be retried, we need to add it back to the
- // main queue and remove it from the list of currenty claimed job items.
- // For those that cannot, they are marked as dead and kept around for
- // investigation and manual job restoration but are eventually deleted.
- $conn = $this->getConnection();
- try {
- $now = time();
- static $script =
-<<<LUA
- local released,abandoned,pruned = 0,0,0
- -- Get all non-dead jobs that have an expired claim on them.
- -- The score for each item is the last claim timestamp (UNIX).
- local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
- for k,id in ipairs(staleClaims) do
- local timestamp = redis.call('zScore',KEYS[1],id)
- local attempts = redis.call('hGet',KEYS[2],id)
- if attempts < ARGV[3] then
- -- Claim expired and retries left: re-enqueue the job
- redis.call('lPush',KEYS[3],id)
- redis.call('hIncrBy',KEYS[2],id,1)
- released = released + 1
- else
- -- Claim expired and no retries left: mark the job as dead
- redis.call('zAdd',KEYS[5],timestamp,id)
- abandoned = abandoned + 1
- end
- redis.call('zRem',KEYS[1],id)
- end
- -- Get all of the dead jobs that have been marked as dead for too long.
- -- The score for each item is the last claim timestamp (UNIX).
- local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2])
- for k,id in ipairs(deadClaims) do
- -- Stale and out of retries: remove any traces of the job
- redis.call('zRem',KEYS[5],id)
- redis.call('hDel',KEYS[2],id)
- redis.call('hDel',KEYS[4],id)
- pruned = pruned + 1
- end
- return {released,abandoned,pruned}
-LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
- $now - $this->claimTTL, # ARGV[1]
- $now - self::MAX_AGE_PRUNE, # ARGV[2]
- $this->maxTries # ARGV[3]
- ),
- 5 # number of first argument(s) that are keys
- );
- if ( $res ) {
- list( $released, $abandoned, $pruned ) = $res;
- $count += $released + $pruned;
- JobQueue::incrStats( 'job-recycle', $this->type, $released );
- JobQueue::incrStats( 'job-abandon', $this->type, $abandoned );
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- return $count;
- }
-
- /**
- * @return Array
- */
- protected function doGetPeriodicTasks() {
- $tasks = array();
- if ( $this->claimTTL > 0 ) {
- $tasks['recycleAndDeleteStaleJobs'] = array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- );
- }
- if ( $this->checkDelay ) {
- $tasks['releaseReadyDelayedJobs'] = array(
- 'callback' => array( $this, 'releaseReadyDelayedJobs' ),
- 'period' => 300 // 5 minutes
- );
- }
- return $tasks;
- }
-
- /**
- * @param $job Job
- * @return array
- */
- protected function getNewJobFields( Job $job ) {
- return array(
- // Fields that describe the nature of the job
- 'type' => $job->getType(),
- 'namespace' => $job->getTitle()->getNamespace(),
- 'title' => $job->getTitle()->getDBkey(),
- 'params' => $job->getParams(),
- // Some jobs cannot run until a "release timestamp"
- 'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
- // Additional job metadata
- 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
- 'sha1' => $job->ignoreDuplicates()
- ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
- : '',
- 'timestamp' => time() // UNIX timestamp
- );
- }
-
- /**
- * @param $fields array
- * @return Job|bool
- */
- protected function getJobFromFields( array $fields ) {
- $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
- if ( $title ) {
- $job = Job::factory( $fields['type'], $title, $fields['params'] );
- $job->metadata['uuid'] = $fields['uuid'];
- return $job;
- }
- return false;
- }
-
- /**
- * @param array $fields
- * @return string Serialized and possibly compressed version of $fields
- */
- protected function serialize( array $fields ) {
- $blob = serialize( $fields );
- if ( $this->compression === 'gzip'
- && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) )
- {
- $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' );
- $blobz = serialize( $object );
- return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
- } else {
- return $blob;
- }
- }
-
- /**
- * @param string $blob
- * @return array|bool Unserialized version of $blob or false
- */
- protected function unserialize( $blob ) {
- $fields = unserialize( $blob );
- if ( is_object( $fields ) ) {
- if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
- $fields = unserialize( gzinflate( $fields->blob ) );
- } else {
- $fields = false;
- }
- }
- return is_array( $fields ) ? $fields : false;
- }
-
- /**
- * Get a connection to the server that handles all sub-queues for this queue
- *
- * @return Array (server name, Redis instance)
- * @throws MWException
- */
- protected function getConnection() {
- $conn = $this->redisPool->getConnection( $this->server );
- if ( !$conn ) {
- throw new JobQueueConnectionError( "Unable to connect to redis server." );
- }
- return $conn;
- }
-
- /**
- * @param $server string
- * @param $conn RedisConnRef
- * @param $e RedisException
- * @throws MWException
- */
- protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
- $this->redisPool->handleException( $server, $conn, $e );
- throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
- }
-
- /**
- * @param $prop string
- * @param $type string|null
- * @return string
- */
- private function getQueueKey( $prop, $type = null ) {
- $type = is_string( $type ) ? $type : $this->type;
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- if ( strlen( $this->key ) ) { // namespaced queue (for testing)
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
- } else {
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
- }
- }
-
- /**
- * @param $key string
- * @return void
- */
- public function setTestingPrefix( $key ) {
- $this->key = $key;
- }
-}
diff --git a/includes/job/README b/includes/job/README
deleted file mode 100644
index c11d5a78..00000000
--- a/includes/job/README
+++ /dev/null
@@ -1,81 +0,0 @@
-/*!
-\ingroup JobQueue
-\page jobqueue_design Job queue design
-
-Notes on the Job queuing system architecture.
-
-\section intro Introduction
-
-The data model consist of the following main components:
-* The Job object represents a particular deferred task that happens in the
- background. All jobs subclass the Job object and put the main logic in the
- function called run().
-* The JobQueue object represents a particular queue of jobs of a certain type.
- For example there may be a queue for email jobs and a queue for squid purge
- jobs.
-
-\section jobqueue Job queues
-
-Each job type has its own queue and is associated to a storage medium. One
-queue might save its jobs in redis while another one uses would use a database.
-
-Storage medium are defined in a queue class. Before using it, you must
-define in $wgJobTypeConf a mapping of the job type to a queue class.
-
-The factory class JobQueueGroup provides helper functions:
-- getting the queue for a given job
-- route new job insertions to the proper queue
-
-The following queue classes are available:
-* JobQueueDB (stores jobs in the `job` table in a database)
-* JobQueueRedis (stores jobs in a redis server)
-
-All queue classes support some basic operations (though some may be no-ops):
-* enqueueing a batch of jobs
-* dequeueing a single job
-* acknowledging a job is completed
-* checking if the queue is empty
-
-Some queue classes (like JobQueueDB) may dequeue jobs in random order while other
-queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs
-are executed in FIFO order.
-
-Also note that not all queue classes will have the same reliability guarantees.
-In-memory queues may lose data when restarted depending on snapshot and journal
-settings (including journal fsync() frequency). Some queue types may totally remove
-jobs when dequeued while leaving the ack() function as a no-op; if a job is
-dequeued by a job runner, which crashes before completion, the job will be
-lost. Some jobs, like purging squid caches after a template change, may not
-require durable queues, whereas other jobs might be more important.
-
-\section aggregator Job queue aggregator
-
-The aggregators are used by nextJobDB.php, which is a script that will return a
-random ready queue (on any wiki in the farm) that can be used with runJobs.php.
-This can be used in conjunction with any scripts that handle wiki farm job queues.
-Note that $wgLocalDatabases defines what wikis are in the wiki farm.
-
-Since each job type has its own queue, and wiki-farms may have many wikis,
-there might be a large number of queues to keep track of. To avoid wasting
-large amounts of time polling empty queues, aggregators exists to keep track
-of which queues are ready.
-
-The following queue aggregator classes are available:
-* JobQueueAggregatorMemc (uses $wgMemc to track ready queues)
-* JobQueueAggregatorRedis (uses a redis server to track ready queues)
-
-Some aggregators cache data for a few minutes while others may be always up to date.
-This can be an important factor for jobs that need a low pickup time (or latency).
-
-\section jobs Jobs
-
-Callers should also try to make jobs maintain correctness when executed twice.
-This is useful for queues that actually implement ack(), since they may recycle
-dequeued but un-acknowledged jobs back into the queue to be attempted again. If
-a runner dequeues a job, runs it, but then crashes before calling ack(), the
-job may be returned to the queue and run a second time. Jobs like cache purging can
-happen several times without any correctness problems. However, a pathological case
-would be if a bug causes the problem to systematically keep repeating. For example,
-a job may always throw a DB error at the end of run(). This problem is trickier to
-solve and more obnoxious for things like email jobs, for example. For such jobs,
-it might be useful to use a queue that does not retry jobs.
diff --git a/includes/job/aggregator/JobQueueAggregator.php b/includes/job/aggregator/JobQueueAggregator.php
deleted file mode 100644
index a8186abd..00000000
--- a/includes/job/aggregator/JobQueueAggregator.php
+++ /dev/null
@@ -1,156 +0,0 @@
-<?php
-/**
- * Job queue aggregator code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle tracking information about all queues
- *
- * @ingroup JobQueue
- * @since 1.21
- */
-abstract class JobQueueAggregator {
- /** @var JobQueueAggregator */
- protected static $instance = null;
-
- /**
- * @param array $params
- */
- protected function __construct( array $params ) {}
-
- /**
- * @return JobQueueAggregator
- */
- final public static function singleton() {
- global $wgJobQueueAggregator;
-
- if ( !isset( self::$instance ) ) {
- $class = $wgJobQueueAggregator['class'];
- $obj = new $class( $wgJobQueueAggregator );
- if ( !( $obj instanceof JobQueueAggregator ) ) {
- throw new MWException( "Class '$class' is not a JobQueueAggregator class." );
- }
- self::$instance = $obj;
- }
-
- return self::$instance;
- }
-
- /**
- * Destroy the singleton instance
- *
- * @return void
- */
- final public static function destroySingleton() {
- self::$instance = null;
- }
-
- /**
- * Mark a queue as being empty
- *
- * @param string $wiki
- * @param string $type
- * @return bool Success
- */
- final public function notifyQueueEmpty( $wiki, $type ) {
- wfProfileIn( __METHOD__ );
- $ok = $this->doNotifyQueueEmpty( $wiki, $type );
- wfProfileOut( __METHOD__ );
- return $ok;
- }
-
- /**
- * @see JobQueueAggregator::notifyQueueEmpty()
- */
- abstract protected function doNotifyQueueEmpty( $wiki, $type );
-
- /**
- * Mark a queue as being non-empty
- *
- * @param string $wiki
- * @param string $type
- * @return bool Success
- */
- final public function notifyQueueNonEmpty( $wiki, $type ) {
- wfProfileIn( __METHOD__ );
- $ok = $this->doNotifyQueueNonEmpty( $wiki, $type );
- wfProfileOut( __METHOD__ );
- return $ok;
- }
-
- /**
- * @see JobQueueAggregator::notifyQueueNonEmpty()
- */
- abstract protected function doNotifyQueueNonEmpty( $wiki, $type );
-
- /**
- * Get the list of all of the queues with jobs
- *
- * @return Array (job type => (list of wiki IDs))
- */
- final public function getAllReadyWikiQueues() {
- wfProfileIn( __METHOD__ );
- $res = $this->doGetAllReadyWikiQueues();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueueAggregator::getAllReadyWikiQueues()
- */
- abstract protected function doGetAllReadyWikiQueues();
-
- /**
- * Purge all of the aggregator information
- *
- * @return bool Success
- */
- final public function purge() {
- wfProfileIn( __METHOD__ );
- $res = $this->doPurge();
- wfProfileOut( __METHOD__ );
- return $res;
- }
-
- /**
- * @see JobQueueAggregator::purge()
- */
- abstract protected function doPurge();
-
- /**
- * Get all databases that have a pending job.
- * This poll all the queues and is this expensive.
- *
- * @return Array (job type => (list of wiki IDs))
- */
- protected function findPendingWikiQueues() {
- global $wgLocalDatabases;
-
- $pendingDBs = array(); // (job type => (db list))
- foreach ( $wgLocalDatabases as $db ) {
- foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
- $pendingDBs[$type][] = $db;
- }
- }
-
- return $pendingDBs;
- }
-}
diff --git a/includes/job/aggregator/JobQueueAggregatorMemc.php b/includes/job/aggregator/JobQueueAggregatorMemc.php
deleted file mode 100644
index 9434da04..00000000
--- a/includes/job/aggregator/JobQueueAggregatorMemc.php
+++ /dev/null
@@ -1,124 +0,0 @@
-<?php
-/**
- * Job queue aggregator code that uses BagOStuff.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle tracking information about all queues using BagOStuff
- *
- * @ingroup JobQueue
- * @since 1.21
- */
-class JobQueueAggregatorMemc extends JobQueueAggregator {
- /** @var BagOStuff */
- protected $cache;
-
- protected $cacheTTL; // integer; seconds
-
- /**
- * @params include:
- * - objectCache : Name of an object cache registered in $wgObjectCaches.
- * This defaults to the one specified by $wgMainCacheType.
- * - cacheTTL : Seconds to cache the aggregate data before regenerating.
- * @param array $params
- */
- protected function __construct( array $params ) {
- parent::__construct( $params );
- $this->cache = isset( $params['objectCache'] )
- ? wfGetCache( $params['objectCache'] )
- : wfGetMainCache();
- $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min
- }
-
- /**
- * @see JobQueueAggregator::doNotifyQueueEmpty()
- */
- protected function doNotifyQueueEmpty( $wiki, $type ) {
- $key = $this->getReadyQueueCacheKey();
- // Delist the queue from the "ready queue" list
- if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
- $curInfo = $this->cache->get( $key );
- if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) {
- if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) {
- $curInfo['pendingDBs'][$type] = array_diff(
- $curInfo['pendingDBs'][$type], array( $wiki ) );
- $this->cache->set( $key, $curInfo );
- }
- }
- $this->cache->delete( "$key:lock" ); // unlock
- }
- return true;
- }
-
- /**
- * @see JobQueueAggregator::doNotifyQueueNonEmpty()
- */
- protected function doNotifyQueueNonEmpty( $wiki, $type ) {
- return true; // updated periodically
- }
-
- /**
- * @see JobQueueAggregator::doAllGetReadyWikiQueues()
- */
- protected function doGetAllReadyWikiQueues() {
- $key = $this->getReadyQueueCacheKey();
- // If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
- // regenerate the cache. Use any available stale cache if another process is
- // currently regenerating the pending DB information.
- $pendingDbInfo = $this->cache->get( $key );
- if ( !is_array( $pendingDbInfo )
- || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL
- || mt_rand( 0, 999 ) == 0
- ) {
- if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock
- $pendingDbInfo = array(
- 'pendingDBs' => $this->findPendingWikiQueues(),
- 'timestamp' => time()
- );
- for ( $attempts = 1; $attempts <= 25; ++$attempts ) {
- if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
- $this->cache->set( $key, $pendingDbInfo );
- $this->cache->delete( "$key:lock" ); // unlock
- break;
- }
- }
- $this->cache->delete( "$key:rebuild" ); // unlock
- }
- }
- return is_array( $pendingDbInfo )
- ? $pendingDbInfo['pendingDBs']
- : array(); // cache is both empty and locked
- }
-
- /**
- * @see JobQueueAggregator::doPurge()
- */
- protected function doPurge() {
- return $this->cache->delete( $this->getReadyQueueCacheKey() );
- }
-
- /**
- * @return string
- */
- private function getReadyQueueCacheKey() {
- return "jobqueue:aggregator:ready-queues:v1"; // global
- }
-}
diff --git a/includes/job/aggregator/JobQueueAggregatorRedis.php b/includes/job/aggregator/JobQueueAggregatorRedis.php
deleted file mode 100644
index c6a799df..00000000
--- a/includes/job/aggregator/JobQueueAggregatorRedis.php
+++ /dev/null
@@ -1,193 +0,0 @@
-<?php
-/**
- * Job queue aggregator code that uses PhpRedis.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @author Aaron Schulz
- */
-
-/**
- * Class to handle tracking information about all queues using PhpRedis
- *
- * @ingroup JobQueue
- * @ingroup Redis
- * @since 1.21
- */
-class JobQueueAggregatorRedis extends JobQueueAggregator {
- /** @var RedisConnectionPool */
- protected $redisPool;
-
- /**
- * @params include:
- * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
- * - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
- * If a hostname is specified but no port, the standard port number
- * 6379 will be used. Required.
- * @param array $params
- */
- protected function __construct( array $params ) {
- parent::__construct( $params );
- $this->server = $params['redisServer'];
- $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
- }
-
- /**
- * @see JobQueueAggregator::doNotifyQueueEmpty()
- */
- protected function doNotifyQueueEmpty( $wiki, $type ) {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return false;
- }
- try {
- $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
- return true;
- } catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
- return false;
- }
- }
-
- /**
- * @see JobQueueAggregator::doNotifyQueueNonEmpty()
- */
- protected function doNotifyQueueNonEmpty( $wiki, $type ) {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return false;
- }
- try {
- $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
- return true;
- } catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
- return false;
- }
- }
-
- /**
- * @see JobQueueAggregator::doAllGetReadyWikiQueues()
- */
- protected function doGetAllReadyWikiQueues() {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return array();
- }
- try {
- $conn->multi( Redis::PIPELINE );
- $conn->exists( $this->getReadyQueueKey() );
- $conn->hGetAll( $this->getReadyQueueKey() );
- list( $exists, $map ) = $conn->exec();
-
- if ( $exists ) { // cache hit
- $pendingDBs = array(); // (type => list of wikis)
- foreach ( $map as $key => $time ) {
- list( $type, $wiki ) = $this->dencQueueName( $key );
- $pendingDBs[$type][] = $wiki;
- }
- } else { // cache miss
- // Avoid duplicated effort
- $conn->multi( Redis::MULTI );
- $conn->setnx( $this->getReadyQueueKey() . ":lock", 1 );
- $conn->expire( $this->getReadyQueueKey() . ":lock", 3600 );
- if ( $conn->exec() !== array( true, true ) ) { // lock
- return array(); // already in progress
- }
-
- $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
-
- $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
-
- $now = time();
- $map = array();
- foreach ( $pendingDBs as $type => $wikis ) {
- foreach ( $wikis as $wiki ) {
- $map[$this->encQueueName( $type, $wiki )] = $now;
- }
- }
- $conn->hMSet( $this->getReadyQueueKey(), $map );
- }
-
- return $pendingDBs;
- } catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
- return array();
- }
- }
-
- /**
- * @see JobQueueAggregator::doPurge()
- */
- protected function doPurge() {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return false;
- }
- try {
- $conn->delete( $this->getReadyQueueKey() );
- } catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
- return false;
- }
- return true;
- }
-
- /**
- * Get a connection to the server that handles all sub-queues for this queue
- *
- * @return Array (server name, Redis instance)
- * @throws MWException
- */
- protected function getConnection() {
- return $this->redisPool->getConnection( $this->server );
- }
-
- /**
- * @param RedisConnRef $conn
- * @param RedisException $e
- * @return void
- */
- protected function handleException( RedisConnRef $conn, $e ) {
- $this->redisPool->handleException( $this->server, $conn, $e );
- }
-
- /**
- * @return string
- */
- private function getReadyQueueKey() {
- return "jobqueue:aggregator:h-ready-queues:v1"; // global
- }
-
- /**
- * @param string $type
- * @param string $wiki
- * @return string
- */
- private function encQueueName( $type, $wiki ) {
- return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
- }
-
- /**
- * @param string $name
- * @return string
- */
- private function dencQueueName( $name ) {
- list( $type, $wiki ) = explode( '/', $name, 2 );
- return array( rawurldecode( $type ), rawurldecode( $wiki ) );
- }
-}
diff --git a/includes/job/jobs/AssembleUploadChunksJob.php b/includes/job/jobs/AssembleUploadChunksJob.php
deleted file mode 100644
index 6237e568..00000000
--- a/includes/job/jobs/AssembleUploadChunksJob.php
+++ /dev/null
@@ -1,127 +0,0 @@
-<?php
-/**
- * Assemble the segments of a chunked upload.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup Upload
- */
-
-/**
- * Assemble the segments of a chunked upload.
- *
- * @ingroup Upload
- */
-class AssembleUploadChunksJob extends Job {
- public function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'AssembleUploadChunks', $title, $params, $id );
- $this->removeDuplicates = true;
- }
-
- public function run() {
- $scope = RequestContext::importScopedSession( $this->params['session'] );
- $context = RequestContext::getMain();
- try {
- $user = $context->getUser();
- if ( !$user->isLoggedIn() ) {
- $this->setLastError( "Could not load the author user from session." );
- return false;
- }
-
- if ( count( $_SESSION ) === 0 ) {
- // Empty session probably indicates that we didn't associate
- // with the session correctly. Note that being able to load
- // the user does not necessarily mean the session was loaded.
- // Most likely cause by suhosin.session.encrypt = On.
- $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" );
- return false;
- }
-
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() )
- );
-
- $upload = new UploadFromChunks( $user );
- $upload->continueChunks(
- $this->params['filename'],
- $this->params['filekey'],
- $context->getRequest()
- );
-
- // Combine all of the chunks into a local file and upload that to a new stash file
- $status = $upload->concatenateChunks();
- if ( !$status->isGood() ) {
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status )
- );
- $this->setLastError( $status->getWikiText() );
- return false;
- }
-
- // We have a new filekey for the fully concatenated file
- $newFileKey = $upload->getLocalFile()->getFileKey();
-
- // Remove the old stash file row and first chunk file
- $upload->stash->removeFileNoAuth( $this->params['filekey'] );
-
- // Build the image info array while we have the local reference handy
- $apiMain = new ApiMain(); // dummy object (XXX)
- $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
-
- // Cleanup any temporary local file
- $upload->cleanupTempFile();
-
- // Cache the info so the user doesn't have to wait forever to get the final info
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array(
- 'result' => 'Success',
- 'stage' => 'assembling',
- 'filekey' => $newFileKey,
- 'imageinfo' => $imageInfo,
- 'status' => Status::newGood()
- )
- );
- } catch ( MWException $e ) {
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array(
- 'result' => 'Failure',
- 'stage' => 'assembling',
- 'status' => Status::newFatal( 'api-error-stashfailed' )
- )
- );
- $this->setLastError( get_class( $e ) . ": " . $e->getText() );
- return false;
- }
- return true;
- }
-
- public function getDeduplicationInfo() {
- $info = parent::getDeduplicationInfo();
- if ( is_array( $info['params'] ) ) {
- $info['params'] = array( 'filekey' => $info['params']['filekey'] );
- }
- return $info;
- }
-
- public function allowRetries() {
- return false;
- }
-}
diff --git a/includes/job/jobs/DoubleRedirectJob.php b/includes/job/jobs/DoubleRedirectJob.php
deleted file mode 100644
index 33e749b8..00000000
--- a/includes/job/jobs/DoubleRedirectJob.php
+++ /dev/null
@@ -1,221 +0,0 @@
-<?php
-/**
- * Job to fix double redirects after moving a page.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup JobQueue
- */
-
-/**
- * Job to fix double redirects after moving a page
- *
- * @ingroup JobQueue
- */
-class DoubleRedirectJob extends Job {
- var $reason, $redirTitle;
-
- /**
- * @var User
- */
- static $user;
-
- /**
- * Insert jobs into the job queue to fix redirects to the given title
- * @param string $reason the reason for the fix, see message "double-redirect-fixed-<reason>"
- * @param $redirTitle Title: the title which has changed, redirects pointing to this title are fixed
- * @param bool $destTitle Not used
- */
- public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) {
- # Need to use the master to get the redirect table updated in the same transaction
- $dbw = wfGetDB( DB_MASTER );
- $res = $dbw->select(
- array( 'redirect', 'page' ),
- array( 'page_namespace', 'page_title' ),
- array(
- 'page_id = rd_from',
- 'rd_namespace' => $redirTitle->getNamespace(),
- 'rd_title' => $redirTitle->getDBkey()
- ), __METHOD__ );
- if ( !$res->numRows() ) {
- return;
- }
- $jobs = array();
- foreach ( $res as $row ) {
- $title = Title::makeTitle( $row->page_namespace, $row->page_title );
- if ( !$title ) {
- continue;
- }
-
- $jobs[] = new self( $title, array(
- 'reason' => $reason,
- 'redirTitle' => $redirTitle->getPrefixedDBkey() ) );
- # Avoid excessive memory usage
- if ( count( $jobs ) > 10000 ) {
- JobQueueGroup::singleton()->push( $jobs );
- $jobs = array();
- }
- }
- JobQueueGroup::singleton()->push( $jobs );
- }
-
- function __construct( $title, $params = false, $id = 0 ) {
- parent::__construct( 'fixDoubleRedirect', $title, $params, $id );
- $this->reason = $params['reason'];
- $this->redirTitle = Title::newFromText( $params['redirTitle'] );
- }
-
- /**
- * @return bool
- */
- function run() {
- if ( !$this->redirTitle ) {
- $this->setLastError( 'Invalid title' );
- return false;
- }
-
- $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST );
- if ( !$targetRev ) {
- wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" );
- return true;
- }
- $content = $targetRev->getContent();
- $currentDest = $content ? $content->getRedirectTarget() : null;
- if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) {
- wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" );
- return true;
- }
-
- // Check for a suppression tag (used e.g. in periodically archived discussions)
- $mw = MagicWord::get( 'staticredirect' );
- if ( $content->matchMagicWord( $mw ) ) {
- wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" );
- return true;
- }
-
- // Find the current final destination
- $newTitle = self::getFinalDestination( $this->redirTitle );
- if ( !$newTitle ) {
- wfDebug( __METHOD__ . ": skipping: single redirect, circular redirect or invalid redirect destination\n" );
- return true;
- }
- if ( $newTitle->equals( $this->redirTitle ) ) {
- // The redirect is already right, no need to change it
- // This can happen if the page was moved back (say after vandalism)
- wfDebug( __METHOD__ . " : skipping, already good\n" );
- }
-
- // Preserve fragment (bug 14904)
- $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(),
- $currentDest->getFragment(), $newTitle->getInterwiki() );
-
- // Fix the text
- $newContent = $content->updateRedirect( $newTitle );
-
- if ( $newContent->equals( $content ) ) {
- $this->setLastError( 'Content unchanged???' );
- return false;
- }
-
- $user = $this->getUser();
- if ( !$user ) {
- $this->setLastError( 'Invalid user' );
- return false;
- }
-
- // Save it
- global $wgUser;
- $oldUser = $wgUser;
- $wgUser = $user;
- $article = WikiPage::factory( $this->title );
-
- // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance
- $reason = wfMessage( 'double-redirect-fixed-' . $this->reason,
- $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText()
- )->inContentLanguage()->text();
- $article->doEditContent( $newContent, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $user );
- $wgUser = $oldUser;
-
- return true;
- }
-
- /**
- * Get the final destination of a redirect
- *
- * @param $title Title
- *
- * @return bool if the specified title is not a redirect, or if it is a circular redirect
- */
- public static function getFinalDestination( $title ) {
- $dbw = wfGetDB( DB_MASTER );
-
- // Circular redirect check
- $seenTitles = array();
- $dest = false;
-
- while ( true ) {
- $titleText = $title->getPrefixedDBkey();
- if ( isset( $seenTitles[$titleText] ) ) {
- wfDebug( __METHOD__, "Circular redirect detected, aborting\n" );
- return false;
- }
- $seenTitles[$titleText] = true;
-
- if ( $title->getInterwiki() ) {
- // If the target is interwiki, we have to break early (bug 40352).
- // Otherwise it will look up a row in the local page table
- // with the namespace/page of the interwiki target which can cause
- // unexpected results (e.g. X -> foo:Bar -> Bar -> .. )
- break;
- }
-
- $row = $dbw->selectRow(
- array( 'redirect', 'page' ),
- array( 'rd_namespace', 'rd_title', 'rd_interwiki' ),
- array(
- 'rd_from=page_id',
- 'page_namespace' => $title->getNamespace(),
- 'page_title' => $title->getDBkey()
- ), __METHOD__ );
- if ( !$row ) {
- # No redirect from here, chain terminates
- break;
- } else {
- $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title, '', $row->rd_interwiki );
- }
- }
- return $dest;
- }
-
- /**
- * Get a user object for doing edits, from a request-lifetime cache
- * False will be returned if the user name specified in the
- * 'double-redirect-fixer' message is invalid.
- *
- * @return User|bool
- */
- function getUser() {
- if ( !self::$user ) {
- self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text() );
- # User::newFromName() can return false on a badly configured wiki.
- if ( self::$user && !self::$user->isLoggedIn() ) {
- self::$user->addToDatabase();
- }
- }
- return self::$user;
- }
-}
diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php
deleted file mode 100644
index be1bfe5c..00000000
--- a/includes/job/jobs/DuplicateJob.php
+++ /dev/null
@@ -1,59 +0,0 @@
-<?php
-/**
- * No-op job that does nothing.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup Cache
- */
-
-/**
- * No-op job that does nothing. Used to represent duplicates.
- *
- * @ingroup JobQueue
- */
-final class DuplicateJob extends Job {
- /**
- * Callers should use DuplicateJob::newFromJob() instead
- *
- * @param $title Title
- * @param array $params job parameters
- * @param $id Integer: job id
- */
- function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'duplicate', $title, $params, $id );
- }
-
- /**
- * Get a duplicate no-op version of a job
- *
- * @param Job $job
- * @return Job
- */
- public static function newFromJob( Job $job ) {
- $djob = new self( $job->getTitle(), $job->getParams(), $job->id );
- $djob->command = $job->getType();
- $djob->params = is_array( $djob->params ) ? $djob->params : array();
- $djob->params = array( 'isDuplicate' => true ) + $djob->params;
- $djob->metadata = $job->metadata;
- return $djob;
- }
-
- public function run() {
- return true;
- }
-}
diff --git a/includes/job/jobs/EmaillingJob.php b/includes/job/jobs/EmaillingJob.php
deleted file mode 100644
index 9fbf3124..00000000
--- a/includes/job/jobs/EmaillingJob.php
+++ /dev/null
@@ -1,47 +0,0 @@
-<?php
-/**
- * Old job for notification emails.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup JobQueue
- */
-
-/**
- * Old job used for sending single notification emails;
- * kept for backwards-compatibility
- *
- * @ingroup JobQueue
- */
-class EmaillingJob extends Job {
- function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'sendMail', Title::newMainPage(), $params, $id );
- }
-
- function run() {
- $status = UserMailer::send(
- $this->params['to'],
- $this->params['from'],
- $this->params['subj'],
- $this->params['body'],
- $this->params['replyto']
- );
-
- return $status->isOK();
- }
-
-}
diff --git a/includes/job/jobs/EnotifNotifyJob.php b/includes/job/jobs/EnotifNotifyJob.php
deleted file mode 100644
index bbe988d0..00000000
--- a/includes/job/jobs/EnotifNotifyJob.php
+++ /dev/null
@@ -1,58 +0,0 @@
-<?php
-/**
- * Job for notification emails.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup JobQueue
- */
-
-/**
- * Job for email notification mails
- *
- * @ingroup JobQueue
- */
-class EnotifNotifyJob extends Job {
-
- function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'enotifNotify', $title, $params, $id );
- }
-
- function run() {
- $enotif = new EmailNotification();
- // Get the user from ID (rename safe). Anons are 0, so defer to name.
- if ( isset( $this->params['editorID'] ) && $this->params['editorID'] ) {
- $editor = User::newFromId( $this->params['editorID'] );
- // B/C, only the name might be given.
- } else {
- # FIXME: newFromName could return false on a badly configured wiki.
- $editor = User::newFromName( $this->params['editor'], false );
- }
- $enotif->actuallyNotifyOnPageChange(
- $editor,
- $this->title,
- $this->params['timestamp'],
- $this->params['summary'],
- $this->params['minorEdit'],
- $this->params['oldid'],
- $this->params['watchers'],
- $this->params['pageStatus']
- );
- return true;
- }
-
-}
diff --git a/includes/job/jobs/HTMLCacheUpdateJob.php b/includes/job/jobs/HTMLCacheUpdateJob.php
deleted file mode 100644
index 44c240bb..00000000
--- a/includes/job/jobs/HTMLCacheUpdateJob.php
+++ /dev/null
@@ -1,263 +0,0 @@
-<?php
-/**
- * HTML cache invalidation of all pages linking to a given title.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup Cache
- */
-
-/**
- * Job wrapper for HTMLCacheUpdate. Gets run whenever a related
- * job gets called from the queue.
- *
- * This class is designed to work efficiently with small numbers of links, and
- * to work reasonably well with up to ~10^5 links. Above ~10^6 links, the memory
- * and time requirements of loading all backlinked IDs in doUpdate() might become
- * prohibitive. The requirements measured at Wikimedia are approximately:
- *
- * memory: 48 bytes per row
- * time: 16us per row for the query plus processing
- *
- * The reason this query is done is to support partitioning of the job
- * by backlinked ID. The memory issue could be allieviated by doing this query in
- * batches, but of course LIMIT with an offset is inefficient on the DB side.
- *
- * The class is nevertheless a vast improvement on the previous method of using
- * File::getLinksTo() and Title::touchArray(), which uses about 2KB of memory per
- * link.
- *
- * @ingroup JobQueue
- */
-class HTMLCacheUpdateJob extends Job {
- /** @var BacklinkCache */
- protected $blCache;
-
- protected $rowsPerJob, $rowsPerQuery;
-
- /**
- * Construct a job
- * @param $title Title: the title linked to
- * @param array $params job parameters (table, start and end page_ids)
- * @param $id Integer: job id
- */
- function __construct( $title, $params, $id = 0 ) {
- global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
-
- parent::__construct( 'htmlCacheUpdate', $title, $params, $id );
-
- $this->rowsPerJob = $wgUpdateRowsPerJob;
- $this->rowsPerQuery = $wgUpdateRowsPerQuery;
- $this->blCache = $title->getBacklinkCache();
- }
-
- public function run() {
- if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
- # This is hit when a job is actually performed
- return $this->doPartialUpdate();
- } else {
- # This is hit when the jobs have to be inserted
- return $this->doFullUpdate();
- }
- }
-
- /**
- * Update all of the backlinks
- */
- protected function doFullUpdate() {
- global $wgMaxBacklinksInvalidate;
-
- # Get an estimate of the number of rows from the BacklinkCache
- $max = max( $this->rowsPerJob * 2, $wgMaxBacklinksInvalidate ) + 1;
- $numRows = $this->blCache->getNumLinks( $this->params['table'], $max );
- if ( $wgMaxBacklinksInvalidate !== false && $numRows > $wgMaxBacklinksInvalidate ) {
- wfDebug( "Skipped HTML cache invalidation of {$this->title->getPrefixedText()}." );
- return true;
- }
-
- if ( $numRows > $this->rowsPerJob * 2 ) {
- # Do fast cached partition
- $this->insertPartitionJobs();
- } else {
- # Get the links from the DB
- $titleArray = $this->blCache->getLinks( $this->params['table'] );
- # Check if the row count estimate was correct
- if ( $titleArray->count() > $this->rowsPerJob * 2 ) {
- # Not correct, do accurate partition
- wfDebug( __METHOD__ . ": row count estimate was incorrect, repartitioning\n" );
- $this->insertJobsFromTitles( $titleArray );
- } else {
- $this->invalidateTitles( $titleArray ); // just do the query
- }
- }
-
- return true;
- }
-
- /**
- * Update some of the backlinks, defined by a page ID range
- */
- protected function doPartialUpdate() {
- $titleArray = $this->blCache->getLinks(
- $this->params['table'], $this->params['start'], $this->params['end'] );
- if ( $titleArray->count() <= $this->rowsPerJob * 2 ) {
- # This partition is small enough, do the update
- $this->invalidateTitles( $titleArray );
- } else {
- # Partitioning was excessively inaccurate. Divide the job further.
- # This can occur when a large number of links are added in a short
- # period of time, say by updating a heavily-used template.
- $this->insertJobsFromTitles( $titleArray );
- }
- return true;
- }
-
- /**
- * Partition the current range given by $this->params['start'] and $this->params['end'],
- * using a pre-calculated title array which gives the links in that range.
- * Queue the resulting jobs.
- *
- * @param $titleArray array
- * @param $rootJobParams array
- * @return void
- */
- protected function insertJobsFromTitles( $titleArray, $rootJobParams = array() ) {
- // Carry over any "root job" information
- $rootJobParams = $this->getRootJobParams();
- # We make subpartitions in the sense that the start of the first job
- # will be the start of the parent partition, and the end of the last
- # job will be the end of the parent partition.
- $jobs = array();
- $start = $this->params['start']; # start of the current job
- $numTitles = 0;
- foreach ( $titleArray as $title ) {
- $id = $title->getArticleID();
- # $numTitles is now the number of titles in the current job not
- # including the current ID
- if ( $numTitles >= $this->rowsPerJob ) {
- # Add a job up to but not including the current ID
- $jobs[] = new HTMLCacheUpdateJob( $this->title,
- array(
- 'table' => $this->params['table'],
- 'start' => $start,
- 'end' => $id - 1
- ) + $rootJobParams // carry over information for de-duplication
- );
- $start = $id;
- $numTitles = 0;
- }
- $numTitles++;
- }
- # Last job
- $jobs[] = new HTMLCacheUpdateJob( $this->title,
- array(
- 'table' => $this->params['table'],
- 'start' => $start,
- 'end' => $this->params['end']
- ) + $rootJobParams // carry over information for de-duplication
- );
- wfDebug( __METHOD__ . ": repartitioning into " . count( $jobs ) . " jobs\n" );
-
- if ( count( $jobs ) < 2 ) {
- # I don't think this is possible at present, but handling this case
- # makes the code a bit more robust against future code updates and
- # avoids a potential infinite loop of repartitioning
- wfDebug( __METHOD__ . ": repartitioning failed!\n" );
- $this->invalidateTitles( $titleArray );
- } else {
- JobQueueGroup::singleton()->push( $jobs );
- }
- }
-
- /**
- * @param $rootJobParams array
- * @return void
- */
- protected function insertPartitionJobs( $rootJobParams = array() ) {
- // Carry over any "root job" information
- $rootJobParams = $this->getRootJobParams();
-
- $batches = $this->blCache->partition( $this->params['table'], $this->rowsPerJob );
- if ( !count( $batches ) ) {
- return; // no jobs to insert
- }
-
- $jobs = array();
- foreach ( $batches as $batch ) {
- list( $start, $end ) = $batch;
- $jobs[] = new HTMLCacheUpdateJob( $this->title,
- array(
- 'table' => $this->params['table'],
- 'start' => $start,
- 'end' => $end,
- ) + $rootJobParams // carry over information for de-duplication
- );
- }
-
- JobQueueGroup::singleton()->push( $jobs );
- }
-
- /**
- * Invalidate an array (or iterator) of Title objects, right now
- * @param $titleArray array
- */
- protected function invalidateTitles( $titleArray ) {
- global $wgUseFileCache, $wgUseSquid;
-
- $dbw = wfGetDB( DB_MASTER );
- $timestamp = $dbw->timestamp();
-
- # Get all IDs in this query into an array
- $ids = array();
- foreach ( $titleArray as $title ) {
- $ids[] = $title->getArticleID();
- }
-
- if ( !$ids ) {
- return;
- }
-
- # Don't invalidated pages that were already invalidated
- $touchedCond = isset( $this->params['rootJobTimestamp'] )
- ? array( "page_touched < " .
- $dbw->addQuotes( $dbw->timestamp( $this->params['rootJobTimestamp'] ) ) )
- : array();
-
- # Update page_touched
- $batches = array_chunk( $ids, $this->rowsPerQuery );
- foreach ( $batches as $batch ) {
- $dbw->update( 'page',
- array( 'page_touched' => $timestamp ),
- array( 'page_id' => $batch ) + $touchedCond,
- __METHOD__
- );
- }
-
- # Update squid
- if ( $wgUseSquid ) {
- $u = SquidUpdate::newFromTitles( $titleArray );
- $u->doUpdate();
- }
-
- # Update file cache
- if ( $wgUseFileCache ) {
- foreach ( $titleArray as $title ) {
- HTMLFileCache::clearFileCache( $title );
- }
- }
- }
-}
diff --git a/includes/job/jobs/NullJob.php b/includes/job/jobs/NullJob.php
deleted file mode 100644
index b6164a5d..00000000
--- a/includes/job/jobs/NullJob.php
+++ /dev/null
@@ -1,76 +0,0 @@
-<?php
-/**
- * Degenerate job that does nothing.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup Cache
- */
-
-/**
- * Degenerate job that does nothing, but can optionally replace itself
- * in the queue and/or sleep for a brief time period. These can be used
- * to represent "no-op" jobs or test lock contention and performance.
- *
- * @par Example:
- * Inserting a null job in the configured job queue:
- * @code
- * $ php maintenance/eval.php
- * > $queue = JobQueueGroup::singleton();
- * > $job = new NullJob( Title::newMainPage(), array( 'lives' => 10 ) );
- * > $queue->push( $job );
- * @endcode
- * You can then confirm the job has been enqueued by using the showJobs.php
- * maintenance utility:
- * @code
- * $ php maintenance/showJobs.php --group
- * null: 1 queue; 0 claimed (0 active, 0 abandoned)
- * $
- * @endcode
- *
- * @ingroup JobQueue
- */
-class NullJob extends Job {
- /**
- * @param $title Title (can be anything)
- * @param array $params job parameters (lives, usleep)
- * @param $id Integer: job id
- */
- function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'null', $title, $params, $id );
- if ( !isset( $this->params['lives'] ) ) {
- $this->params['lives'] = 1;
- }
- if ( !isset( $this->params['usleep'] ) ) {
- $this->params['usleep'] = 0;
- }
- $this->removeDuplicates = !empty( $this->params['removeDuplicates'] );
- }
-
- public function run() {
- if ( $this->params['usleep'] > 0 ) {
- usleep( $this->params['usleep'] );
- }
- if ( $this->params['lives'] > 1 ) {
- $params = $this->params;
- $params['lives']--;
- $job = new self( $this->title, $params );
- JobQueueGroup::singleton()->push( $job );
- }
- return true;
- }
-}
diff --git a/includes/job/jobs/PublishStashedFileJob.php b/includes/job/jobs/PublishStashedFileJob.php
deleted file mode 100644
index 5a24f93c..00000000
--- a/includes/job/jobs/PublishStashedFileJob.php
+++ /dev/null
@@ -1,140 +0,0 @@
-<?php
-/**
- * Upload a file from the upload stash into the local file repo.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup Upload
- */
-
-/**
- * Upload a file from the upload stash into the local file repo.
- *
- * @ingroup Upload
- */
-class PublishStashedFileJob extends Job {
- public function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'PublishStashedFile', $title, $params, $id );
- $this->removeDuplicates = true;
- }
-
- public function run() {
- $scope = RequestContext::importScopedSession( $this->params['session'] );
- $context = RequestContext::getMain();
- try {
- $user = $context->getUser();
- if ( !$user->isLoggedIn() ) {
- $this->setLastError( "Could not load the author user from session." );
- return false;
- }
-
- if ( count( $_SESSION ) === 0 ) {
- // Empty session probably indicates that we didn't associate
- // with the session correctly. Note that being able to load
- // the user does not necessarily mean the session was loaded.
- // Most likely cause by suhosin.session.encrypt = On.
- $this->setLastError( "Error associating with user session. Try setting suhosin.session.encrypt = Off" );
- return false;
- }
-
-
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() )
- );
-
- $upload = new UploadFromStash( $user );
- // @todo initialize() causes a GET, ideally we could frontload the antivirus
- // checks and anything else to the stash stage (which includes concatenation and
- // the local file is thus already there). That way, instead of GET+PUT, there could
- // just be a COPY operation from the stash to the public zone.
- $upload->initialize( $this->params['filekey'], $this->params['filename'] );
-
- // Check if the local file checks out (this is generally a no-op)
- $verification = $upload->verifyUpload();
- if ( $verification['status'] !== UploadBase::OK ) {
- $status = Status::newFatal( 'verification-error' );
- $status->value = array( 'verification' => $verification );
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
- );
- $this->setLastError( "Could not verify upload." );
- return false;
- }
-
- // Upload the stashed file to a permanent location
- $status = $upload->performUpload(
- $this->params['comment'],
- $this->params['text'],
- $this->params['watch'],
- $user
- );
- if ( !$status->isGood() ) {
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
- );
- $this->setLastError( $status->getWikiText() );
- return false;
- }
-
- // Build the image info array while we have the local reference handy
- $apiMain = new ApiMain(); // dummy object (XXX)
- $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
-
- // Cleanup any temporary local file
- $upload->cleanupTempFile();
-
- // Cache the info so the user doesn't have to wait forever to get the final info
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array(
- 'result' => 'Success',
- 'stage' => 'publish',
- 'filename' => $upload->getLocalFile()->getName(),
- 'imageinfo' => $imageInfo,
- 'status' => Status::newGood()
- )
- );
- } catch ( MWException $e ) {
- UploadBase::setSessionStatus(
- $this->params['filekey'],
- array(
- 'result' => 'Failure',
- 'stage' => 'publish',
- 'status' => Status::newFatal( 'api-error-publishfailed' )
- )
- );
- $this->setLastError( get_class( $e ) . ": " . $e->getText() );
- return false;
- }
- return true;
- }
-
- public function getDeduplicationInfo() {
- $info = parent::getDeduplicationInfo();
- if ( is_array( $info['params'] ) ) {
- $info['params'] = array( 'filekey' => $info['params']['filekey'] );
- }
- return $info;
- }
-
- public function allowRetries() {
- return false;
- }
-}
diff --git a/includes/job/jobs/RefreshLinksJob.php b/includes/job/jobs/RefreshLinksJob.php
deleted file mode 100644
index 4fc8bac6..00000000
--- a/includes/job/jobs/RefreshLinksJob.php
+++ /dev/null
@@ -1,222 +0,0 @@
-<?php
-/**
- * Job to update links for a given title.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup JobQueue
- */
-
-/**
- * Background job to update links for a given title.
- *
- * @ingroup JobQueue
- */
-class RefreshLinksJob extends Job {
- function __construct( $title, $params = '', $id = 0 ) {
- parent::__construct( 'refreshLinks', $title, $params, $id );
- $this->removeDuplicates = true; // job is expensive
- }
-
- /**
- * Run a refreshLinks job
- * @return boolean success
- */
- function run() {
- $linkCache = LinkCache::singleton();
- $linkCache->clear();
-
- if ( is_null( $this->title ) ) {
- $this->error = "refreshLinks: Invalid title";
- return false;
- }
-
- # Wait for the DB of the current/next slave DB handle to catch up to the master.
- # This way, we get the correct page_latest for templates or files that just changed
- # milliseconds ago, having triggered this job to begin with.
- if ( isset( $this->params['masterPos'] ) && $this->params['masterPos'] !== false ) {
- wfGetLB()->waitFor( $this->params['masterPos'] );
- }
-
- $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL );
- if ( !$revision ) {
- $this->error = 'refreshLinks: Article not found "' .
- $this->title->getPrefixedDBkey() . '"';
- return false; // XXX: what if it was just deleted?
- }
-
- self::runForTitleInternal( $this->title, $revision, __METHOD__ );
-
- return true;
- }
-
- /**
- * @return Array
- */
- public function getDeduplicationInfo() {
- $info = parent::getDeduplicationInfo();
- // Don't let highly unique "masterPos" values ruin duplicate detection
- if ( is_array( $info['params'] ) ) {
- unset( $info['params']['masterPos'] );
- }
- return $info;
- }
-
- /**
- * @param $title Title
- * @param $revision Revision
- * @param $fname string
- * @return void
- */
- public static function runForTitleInternal( Title $title, Revision $revision, $fname ) {
- wfProfileIn( $fname );
- $content = $revision->getContent( Revision::RAW );
-
- if ( !$content ) {
- // if there is no content, pretend the content is empty
- $content = $revision->getContentHandler()->makeEmptyContent();
- }
-
- // Revision ID must be passed to the parser output to get revision variables correct
- $parserOutput = $content->getParserOutput( $title, $revision->getId(), null, false );
-
- $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput );
- DataUpdate::runUpdates( $updates );
-
- InfoAction::invalidateCache( $title );
-
- wfProfileOut( $fname );
- }
-}
-
-/**
- * Background job to update links for a given title.
- * Newer version for high use templates.
- *
- * @ingroup JobQueue
- */
-class RefreshLinksJob2 extends Job {
- function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'refreshLinks2', $title, $params, $id );
- // Base jobs for large templates can easily be de-duplicated
- $this->removeDuplicates = !isset( $params['start'] ) && !isset( $params['end'] );
- }
-
- /**
- * Run a refreshLinks2 job
- * @return boolean success
- */
- function run() {
- global $wgUpdateRowsPerJob;
-
- $linkCache = LinkCache::singleton();
- $linkCache->clear();
-
- if ( is_null( $this->title ) ) {
- $this->error = "refreshLinks2: Invalid title";
- return false;
- }
-
- // Back compat for pre-r94435 jobs
- $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks';
-
- // Avoid slave lag when fetching templates.
- // When the outermost job is run, we know that the caller that enqueued it must have
- // committed the relevant changes to the DB by now. At that point, record the master
- // position and pass it along as the job recursively breaks into smaller range jobs.
- // Hopefully, when leaf jobs are popped, the slaves will have reached that position.
- if ( isset( $this->params['masterPos'] ) ) {
- $masterPos = $this->params['masterPos'];
- } elseif ( wfGetLB()->getServerCount() > 1 ) {
- $masterPos = wfGetLB()->getMasterPos();
- } else {
- $masterPos = false;
- }
-
- $tbc = $this->title->getBacklinkCache();
-
- $jobs = array(); // jobs to insert
- if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
- # This is a partition job to trigger the insertion of leaf jobs...
- $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
- } else {
- # This is a base job to trigger the insertion of partitioned jobs...
- if ( $tbc->getNumLinks( $table, $wgUpdateRowsPerJob + 1 ) <= $wgUpdateRowsPerJob ) {
- # Just directly insert the single per-title jobs
- $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
- } else {
- # Insert the partition jobs to make per-title jobs
- foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) {
- list( $start, $end ) = $batch;
- $jobs[] = new RefreshLinksJob2( $this->title,
- array(
- 'table' => $table,
- 'start' => $start,
- 'end' => $end,
- 'masterPos' => $masterPos,
- ) + $this->getRootJobParams() // carry over information for de-duplication
- );
- }
- }
- }
-
- if ( count( $jobs ) ) {
- JobQueueGroup::singleton()->push( $jobs );
- }
-
- return true;
- }
-
- /**
- * @param $table string
- * @param $masterPos mixed
- * @return Array
- */
- protected function getSingleTitleJobs( $table, $masterPos ) {
- # The "start"/"end" fields are not set for the base jobs
- $start = isset( $this->params['start'] ) ? $this->params['start'] : false;
- $end = isset( $this->params['end'] ) ? $this->params['end'] : false;
- $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end );
- # Convert into single page refresh links jobs.
- # This handles well when in sapi mode and is useful in any case for job
- # de-duplication. If many pages use template A, and that template itself
- # uses template B, then an edit to both will create many duplicate jobs.
- # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will
- # get run first, and when it does, it will remove the duplicates. Of course,
- # one page could have its job popped when the other page's job is still
- # buried within the logic of a refreshLinks2 job.
- $jobs = array();
- foreach ( $titles as $title ) {
- $jobs[] = new RefreshLinksJob( $title,
- array( 'masterPos' => $masterPos ) + $this->getRootJobParams()
- ); // carry over information for de-duplication
- }
- return $jobs;
- }
-
- /**
- * @return Array
- */
- public function getDeduplicationInfo() {
- $info = parent::getDeduplicationInfo();
- // Don't let highly unique "masterPos" values ruin duplicate detection
- if ( is_array( $info['params'] ) ) {
- unset( $info['params']['masterPos'] );
- }
- return $info;
- }
-}
diff --git a/includes/job/jobs/UploadFromUrlJob.php b/includes/job/jobs/UploadFromUrlJob.php
deleted file mode 100644
index c993cfb4..00000000
--- a/includes/job/jobs/UploadFromUrlJob.php
+++ /dev/null
@@ -1,184 +0,0 @@
-<?php
-/**
- * Job for asynchronous upload-by-url.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @ingroup JobQueue
- */
-
-/**
- * Job for asynchronous upload-by-url.
- *
- * This job is in fact an interface to UploadFromUrl, which is designed such
- * that it does not require any globals. If it does, fix it elsewhere, do not
- * add globals in here.
- *
- * @ingroup JobQueue
- */
-class UploadFromUrlJob extends Job {
- const SESSION_KEYNAME = 'wsUploadFromUrlJobData';
-
- /**
- * @var UploadFromUrl
- */
- public $upload;
-
- /**
- * @var User
- */
- protected $user;
-
- public function __construct( $title, $params, $id = 0 ) {
- parent::__construct( 'uploadFromUrl', $title, $params, $id );
- }
-
- public function run() {
- global $wgCopyUploadAsyncTimeout;
- # Initialize this object and the upload object
- $this->upload = new UploadFromUrl();
- $this->upload->initialize(
- $this->title->getText(),
- $this->params['url'],
- false
- );
- $this->user = User::newFromName( $this->params['userName'] );
-
- # Fetch the file
- $opts = array();
- if ( $wgCopyUploadAsyncTimeout ) {
- $opts['timeout'] = $wgCopyUploadAsyncTimeout;
- }
- $status = $this->upload->fetchFile( $opts );
- if ( !$status->isOk() ) {
- $this->leaveMessage( $status );
- return true;
- }
-
- # Verify upload
- $result = $this->upload->verifyUpload();
- if ( $result['status'] != UploadBase::OK ) {
- $status = $this->upload->convertVerifyErrorToStatus( $result );
- $this->leaveMessage( $status );
- return true;
- }
-
- # Check warnings
- if ( !$this->params['ignoreWarnings'] ) {
- $warnings = $this->upload->checkWarnings();
- if ( $warnings ) {
-
- # Stash the upload
- $key = $this->upload->stashFile();
-
- if ( $this->params['leaveMessage'] ) {
- $this->user->leaveUserMessage(
- wfMessage( 'upload-warning-subj' )->text(),
- wfMessage( 'upload-warning-msg',
- $key,
- $this->params['url'] )->text()
- );
- } else {
- wfSetupSession( $this->params['sessionId'] );
- $this->storeResultInSession( 'Warning',
- 'warnings', $warnings );
- session_write_close();
- }
-
- return true;
- }
- }
-
- # Perform the upload
- $status = $this->upload->performUpload(
- $this->params['comment'],
- $this->params['pageText'],
- $this->params['watch'],
- $this->user
- );
- $this->leaveMessage( $status );
- return true;
-
- }
-
- /**
- * Leave a message on the user talk page or in the session according to
- * $params['leaveMessage'].
- *
- * @param $status Status
- */
- protected function leaveMessage( $status ) {
- if ( $this->params['leaveMessage'] ) {
- if ( $status->isGood() ) {
- $this->user->leaveUserMessage( wfMessage( 'upload-success-subj' )->text(),
- wfMessage( 'upload-success-msg',
- $this->upload->getTitle()->getText(),
- $this->params['url']
- )->text() );
- } else {
- $this->user->leaveUserMessage( wfMessage( 'upload-failure-subj' )->text(),
- wfMessage( 'upload-failure-msg',
- $status->getWikiText(),
- $this->params['url']
- )->text() );
- }
- } else {
- wfSetupSession( $this->params['sessionId'] );
- if ( $status->isOk() ) {
- $this->storeResultInSession( 'Success',
- 'filename', $this->upload->getLocalFile()->getName() );
- } else {
- $this->storeResultInSession( 'Failure',
- 'errors', $status->getErrorsArray() );
- }
- session_write_close();
- }
- }
-
- /**
- * Store a result in the session data. Note that the caller is responsible
- * for appropriate session_start and session_write_close calls.
- *
- * @param string $result the result (Success|Warning|Failure)
- * @param string $dataKey the key of the extra data
- * @param $dataValue Mixed: the extra data itself
- */
- protected function storeResultInSession( $result, $dataKey, $dataValue ) {
- $session =& self::getSessionData( $this->params['sessionKey'] );
- $session['result'] = $result;
- $session[$dataKey] = $dataValue;
- }
-
- /**
- * Initialize the session data. Sets the intial result to queued.
- */
- public function initializeSessionData() {
- $session =& self::getSessionData( $this->params['sessionKey'] );
- $$session['result'] = 'Queued';
- }
-
- /**
- * @param $key
- * @return mixed
- */
- public static function &getSessionData( $key ) {
- if ( !isset( $_SESSION[self::SESSION_KEYNAME][$key] ) ) {
- $_SESSION[self::SESSION_KEYNAME][$key] = array();
- }
- return $_SESSION[self::SESSION_KEYNAME][$key];
- }
-}