diff options
Diffstat (limited to 'includes/jobqueue/JobRunner.php')
-rw-r--r-- | includes/jobqueue/JobRunner.php | 86 |
1 files changed, 71 insertions, 15 deletions
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 8cccedaf..b8c5d6cf 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -21,13 +21,17 @@ * @ingroup JobQueue */ +use MediaWiki\Logger\LoggerFactory; +use Psr\Log\LoggerAwareInterface; +use Psr\Log\LoggerInterface; + /** * Job queue runner utility methods * * @ingroup JobQueue * @since 1.24 */ -class JobRunner { +class JobRunner implements LoggerAwareInterface { /** @var callable|null Debug output handler */ protected $debug; @@ -39,6 +43,28 @@ class JobRunner { } /** + * @var LoggerInterface $logger + */ + protected $logger; + + /** + * @param LoggerInterface $logger + */ + public function setLogger( LoggerInterface $logger ) { + $this->logger = $logger; + } + + /** + * @param LoggerInterface $logger + */ + public function __construct( LoggerInterface $logger = null ) { + if ( $logger === null ) { + $logger = LoggerFactory::getInstance( 'runJobs' ); + } + $this->setLogger( $logger ); + } + + /** * Run jobs of the specified number/type for the specified time * * The response map has a 'job' field that lists status of each job, including: @@ -62,6 +88,8 @@ class JobRunner { * @return array Summary response that can easily be JSON serialized */ public function run( array $options ) { + global $wgJobClasses; + $response = array( 'jobs' => array(), 'reached' => 'none-ready' ); $type = isset( $options['type'] ) ? $options['type'] : false; @@ -69,11 +97,31 @@ class JobRunner { $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false; $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; + if ( $type !== false && !isset( $wgJobClasses[$type] ) ) { + $response['reached'] = 'none-possible'; + return $response; + } + $group = JobQueueGroup::singleton(); // Handle any required periodic queue maintenance $count = $group->executeReadyPeriodicTasks(); if ( $count > 0 ) { - $this->runJobsLog( "Executed $count periodic queue task(s)." ); + $msg = "Executed $count periodic queue task(s)."; + $this->logger->debug( $msg ); + $this->debugCallback( $msg ); + } + + // Bail out if in read-only mode + if ( wfReadOnly() ) { + $response['reached'] = 'read-only'; + return $response; + } + + // Bail out if there is too much DB lag + list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag(); + if ( $maxLag >= 5 ) { + $response['reached'] = 'slave-lag-limit'; + return $response; } // Flush any pending DB writes for sanity @@ -87,8 +135,10 @@ class JobRunner { $jobsRun = 0; $timeMsTotal = 0; $flags = JobQueueGroup::USE_CACHE; + $checkPeriod = 5.0; // seconds + $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes $startTime = microtime( true ); // time since jobs started running - $lastTime = microtime( true ); // time since last slave check + $lastTime = microtime( true ) - $checkPhase; // time since last slave check do { // Sync the persistent backoffs with concurrent runners $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); @@ -117,24 +167,24 @@ class JobRunner { $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); } - $this->runJobsLog( $job->toString() . " STARTING" ); + $msg = $job->toString() . " STARTING"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); // Run the job... - wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); $jobStartTime = microtime( true ); try { ++$jobsRun; $status = $job->run(); $error = $job->getLastError(); wfGetLBFactory()->commitMasterChanges(); - } catch ( MWException $e ) { + } catch ( Exception $e ) { MWExceptionHandler::rollbackMasterChangesAndLog( $e ); $status = false; $error = get_class( $e ) . ': ' . $e->getMessage(); MWExceptionHandler::logException( $e ); } $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); - wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); $timeMsTotal += $timeMs; // Mark the job as done on success or when the job cannot be retried @@ -151,9 +201,13 @@ class JobRunner { } if ( $status === false ) { - $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); + $msg = $job->toString() . " t=$timeMs error={$error}"; + $this->logger->error( $msg ); + $this->debugCallback( $msg ); } else { - $this->runJobsLog( $job->toString() . " t=$timeMs good" ); + $msg = $job->toString() . " t=$timeMs good"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); } $response['jobs'][] = array( @@ -172,10 +226,15 @@ class JobRunner { break; } - // Don't let any of the main DB slaves get backed up + // Don't let any of the main DB slaves get backed up. + // This only waits for so long before exiting and letting + // other wikis in the farm (on different masters) get a chance. $timePassed = microtime( true ) - $lastTime; if ( $timePassed >= 5 || $timePassed < 0 ) { - wfWaitForSlaves( $lastTime ); + if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) { + $response['reached'] = 'slave-lag-limit'; + break; + } $lastTime = microtime( true ); } // Don't let any queue slaves/backups fall behind @@ -239,7 +298,6 @@ class JobRunner { * @return array Map of (job type => backoff expiry timestamp) */ private function loadBackoffs( array $backoffs, $mode = 'wait' ) { - $section = new ProfileSection( __METHOD__ ); $file = wfTempDir() . '/mw-runJobs-backoffs.json'; if ( is_file( $file ) ) { @@ -278,7 +336,6 @@ class JobRunner { * @return array The new backoffs account for $backoffs and the latest file data */ private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { - $section = new ProfileSection( __METHOD__ ); if ( !$deltas ) { return $this->loadBackoffs( $backoffs, $mode ); @@ -341,10 +398,9 @@ class JobRunner { * Log the job message * @param string $msg The message to log */ - private function runJobsLog( $msg ) { + private function debugCallback( $msg ) { if ( $this->debug ) { call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) ); } - wfDebugLog( 'runJobs', $msg ); } } |