summaryrefslogtreecommitdiff
path: root/includes/jobqueue/jobs/EnqueueJob.php
blob: c7ee9b65e5fc663089a9be2a4eedc15f765bd8d2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
<?php
/**
 * Router job that takes jobs and enqueues them.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 *
 * @file
 * @ingroup JobQueue
 */

/**
 * Router job that takes jobs and enqueues them to their proper queues
 *
 * This can be used for several things:
 *   - a) Making multi-job enqueues more robust by atomically enqueueing
 *        a single job that pushes the actual jobs (with retry logic)
 *   - b) Masking the latency of pushing jobs to different queues/wikis
 *   - c) Low-latency enqueues to push jobs from warm to hot datacenters
 *
 * @ingroup JobQueue
 * @since 1.25
 */
final class EnqueueJob extends Job {
	/**
	 * Callers should use the factory methods instead
	 *
	 * @param Title $title
	 * @param array $params Job parameters
	 */
	function __construct( Title $title, array $params ) {
		parent::__construct( 'enqueue', $title, $params );
	}

	/**
	 * @param JobSpecification|JobSpecification[] $jobs
	 * @return EnqueueJob
	 */
	public static function newFromLocalJobs( $jobs ) {
		$jobs = is_array( $jobs ) ? $jobs : array( $jobs );

		return self::newFromJobsByWiki( array( wfWikiID() => $jobs ) );
	}

	/**
	 * @param array $jobsByWiki Map of (wiki => JobSpecification list)
	 * @return EnqueueJob
	 */
	public static function newFromJobsByWiki( array $jobsByWiki ) {
		$deduplicate = true;

		$jobMapsByWiki = array();
		foreach ( $jobsByWiki as $wiki => $jobs ) {
			$jobMapsByWiki[$wiki] = array();
			foreach ( $jobs as $job ) {
				if ( $job instanceof JobSpecification ) {
					$jobMapsByWiki[$wiki][] = $job->toSerializableArray();
				} else {
					throw new InvalidArgumentException( "Jobs must be of type JobSpecification." );
				}
				$deduplicate = $deduplicate && $job->ignoreDuplicates();
			}
		}

		$eJob = new self(
			Title::makeTitle( NS_SPECIAL, 'Badtitle/' . __CLASS__ ),
			array( 'jobsByWiki' => $jobMapsByWiki )
		);
		// If *all* jobs to be pushed are to be de-duplicated (a common case), then
		// de-duplicate this whole job itself to avoid build up in high traffic cases
		$eJob->removeDuplicates = $deduplicate;

		return $eJob;
	}

	public function run() {
		foreach ( $this->params['jobsByWiki'] as $wiki => $jobMaps ) {
			$jobSpecs = array();
			foreach ( $jobMaps as $jobMap ) {
				$jobSpecs[] = JobSpecification::newFromArray( $jobMap );
			}
			JobQueueGroup::singleton( $wiki )->push( $jobSpecs );
		}

		return true;
	}
}