summaryrefslogtreecommitdiff
path: root/includes/job/JobQueueAggregatorMemc.php
blob: 4b82cf925a7a18fca0f1c6d94cdd63941cf329ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
<?php
/**
 * Job queue aggregator code that uses BagOStuff.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 *
 * @file
 * @author Aaron Schulz
 */

/**
 * Class to handle tracking information about all queues using BagOStuff
 *
 * @ingroup JobQueue
 * @since 1.21
 */
class JobQueueAggregatorMemc extends JobQueueAggregator {
	/** @var BagOStuff */
	protected $cache;

	protected $cacheTTL; // integer; seconds

	/**
	 * @params include:
	 *   - objectCache : Name of an object cache registered in $wgObjectCaches.
	 *                   This defaults to the one specified by $wgMainCacheType.
	 *   - cacheTTL    : Seconds to cache the aggregate data before regenerating.
	 * @param array $params
	 */
	protected function __construct( array $params ) {
		parent::__construct( $params );
		$this->cache = isset( $params['objectCache'] )
			? wfGetCache( $params['objectCache'] )
			: wfGetMainCache();
		$this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min
	}

	/**
	 * @see JobQueueAggregator::doNotifyQueueEmpty()
	 */
	protected function doNotifyQueueEmpty( $wiki, $type ) {
		$key = $this->getReadyQueueCacheKey();
		// Delist the queue from the "ready queue" list
		if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
			$curInfo = $this->cache->get( $key );
			if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) {
				if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) {
					$curInfo['pendingDBs'][$type] = array_diff(
						$curInfo['pendingDBs'][$type], array( $wiki ) );
					$this->cache->set( $key, $curInfo );
				}
			}
			$this->cache->delete( "$key:lock" ); // unlock
		}
		return true;
	}

	/**
	 * @see JobQueueAggregator::doNotifyQueueNonEmpty()
	 */
	protected function doNotifyQueueNonEmpty( $wiki, $type ) {
		return true; // updated periodically
	}

	/**
	 * @see JobQueueAggregator::doAllGetReadyWikiQueues()
	 */
	protected function doGetAllReadyWikiQueues() {
		$key = $this->getReadyQueueCacheKey();
		// If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
		// regenerate the cache. Use any available stale cache if another process is
		// currently regenerating the pending DB information.
		$pendingDbInfo = $this->cache->get( $key );
		if ( !is_array( $pendingDbInfo )
			|| ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL
			|| mt_rand( 0, 999 ) == 0
		) {
			if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock
				$pendingDbInfo = array(
					'pendingDBs' => $this->findPendingWikiQueues(),
					'timestamp'  => time()
				);
				for ( $attempts=1; $attempts <= 25; ++$attempts ) {
					if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock
						$this->cache->set( $key, $pendingDbInfo );
						$this->cache->delete( "$key:lock" ); // unlock
						break;
					}
				}
				$this->cache->delete( "$key:rebuild" ); // unlock
			}
		}
		return is_array( $pendingDbInfo )
			? $pendingDbInfo['pendingDBs']
			: array(); // cache is both empty and locked
	}

	/**
	 * @return string
	 */
	private function getReadyQueueCacheKey() {
		return "jobqueue:aggregator:ready-queues:v1"; // global
	}
}