summaryrefslogtreecommitdiff
path: root/includes/jobqueue/JobQueueGroup.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue/JobQueueGroup.php')
-rw-r--r--includes/jobqueue/JobQueueGroup.php143
1 files changed, 68 insertions, 75 deletions
diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php
index ebd547a0..5bd1cc94 100644
--- a/includes/jobqueue/JobQueueGroup.php
+++ b/includes/jobqueue/JobQueueGroup.php
@@ -28,7 +28,7 @@
* @since 1.21
*/
class JobQueueGroup {
- /** @var array */
+ /** @var JobQueueGroup[] */
protected static $instances = array();
/** @var ProcessCacheLRU */
@@ -40,6 +40,9 @@ class JobQueueGroup {
/** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
protected $coalescedQueues;
+ /** @var Job[] */
+ protected $bufferedJobs = array();
+
const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
@@ -100,13 +103,13 @@ class JobQueueGroup {
}
/**
- * Insert jobs into the respective queues of with the belong.
+ * Insert jobs into the respective queues of which they belong
*
* This inserts the jobs into the queue specified by $wgJobTypeConf
* and updates the aggregate job queue information cache as needed.
*
- * @param Job|Job[] $jobs A single Job or a list of Jobs
- * @throws MWException
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @throws InvalidArgumentException
* @return void
*/
public function push( $jobs ) {
@@ -115,13 +118,11 @@ class JobQueueGroup {
return;
}
+ $this->assertValidJobs( $jobs );
+
$jobsByType = array(); // (job type => list of jobs)
foreach ( $jobs as $job ) {
- if ( $job instanceof IJobSpecification ) {
- $jobsByType[$job->getType()][] = $job;
- } else {
- throw new MWException( "Attempted to push a non-Job object into a queue." );
- }
+ $jobsByType[$job->getType()][] = $job;
}
foreach ( $jobsByType as $type => $jobs ) {
@@ -137,6 +138,42 @@ class JobQueueGroup {
}
/**
+ * Buffer jobs for insertion via push() or call it now if in CLI mode
+ *
+ * Note that MediaWiki::restInPeace() calls pushLazyJobs()
+ *
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @return void
+ * @since 1.26
+ */
+ public function lazyPush( $jobs ) {
+ if ( PHP_SAPI === 'cli' ) {
+ $this->push( $jobs );
+ return;
+ }
+
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+
+ // Throw errors now instead of on push(), when other jobs may be buffered
+ $this->assertValidJobs( $jobs );
+
+ $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
+ }
+
+ /**
+ * Push all jobs buffered via lazyPush() into their respective queues
+ *
+ * @return void
+ * @since 1.26
+ */
+ public static function pushLazyJobs() {
+ foreach ( self::$instances as $group ) {
+ $group->push( $group->bufferedJobs );
+ $group->bufferedJobs = array();
+ }
+ }
+
+ /**
* Pop a job off one of the job queues
*
* This pops a job off a queue as specified by $wgJobTypeConf and
@@ -188,10 +225,10 @@ class JobQueueGroup {
* Acknowledge that a job was completed
*
* @param Job $job
- * @return bool
+ * @return void
*/
public function ack( Job $job ) {
- return $this->get( $job->getType() )->ack( $job );
+ $this->get( $job->getType() )->ack( $job );
}
/**
@@ -211,7 +248,6 @@ class JobQueueGroup {
* This does nothing for certain queue classes.
*
* @return void
- * @throws MWException
*/
public function waitForBackups() {
global $wgJobTypeConf;
@@ -342,69 +378,6 @@ class JobQueueGroup {
}
/**
- * Execute any due periodic queue maintenance tasks for all queues.
- *
- * A task is "due" if the time ellapsed since the last run is greater than
- * the defined run period. Concurrent calls to this function will cause tasks
- * to be attempted twice, so they may need their own methods of mutual exclusion.
- *
- * @return int Number of tasks run
- */
- public function executeReadyPeriodicTasks() {
- global $wgMemc;
-
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
- $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
-
- $count = 0;
- $tasksRun = array(); // (queue => task => UNIX timestamp)
- foreach ( $this->getQueueTypes() as $type ) {
- $queue = $this->get( $type );
- foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
- if ( $definition['period'] <= 0 ) {
- continue; // disabled
- } elseif ( !isset( $lastRuns[$type][$task] )
- || $lastRuns[$type][$task] < ( time() - $definition['period'] )
- ) {
- try {
- if ( call_user_func( $definition['callback'] ) !== null ) {
- $tasksRun[$type][$task] = time();
- ++$count;
- }
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- }
- }
-
- if ( $count === 0 ) {
- return $count; // nothing to update
- }
-
- $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
- if ( is_array( $lastRuns ) ) {
- foreach ( $tasksRun as $type => $tasks ) {
- foreach ( $tasks as $task => $timestamp ) {
- if ( !isset( $lastRuns[$type][$task] )
- || $timestamp > $lastRuns[$type][$task]
- ) {
- $lastRuns[$type][$task] = $timestamp;
- }
- }
- }
- } else {
- $lastRuns = $tasksRun;
- }
-
- return $lastRuns;
- } );
-
- return $count;
- }
-
- /**
* @param string $name
* @return mixed
*/
@@ -427,4 +400,24 @@ class JobQueueGroup {
}
}
}
+
+ /**
+ * @param array $jobs
+ * @throws InvalidArgumentException
+ */
+ private function assertValidJobs( array $jobs ) {
+ foreach ( $jobs as $job ) { // sanity checks
+ if ( !( $job instanceof IJobSpecification ) ) {
+ throw new InvalidArgumentException( "Expected IJobSpecification objects" );
+ }
+ }
+ }
+
+ function __destruct() {
+ $n = count( $this->bufferedJobs );
+ if ( $n > 0 ) {
+ $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
+ trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
+ }
+ }
}