diff options
Diffstat (limited to 'includes/jobqueue/JobRunner.php')
-rw-r--r-- | includes/jobqueue/JobRunner.php | 171 |
1 files changed, 133 insertions, 38 deletions
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index b8c5d6cf..13043629 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -36,6 +36,11 @@ class JobRunner implements LoggerAwareInterface { protected $debug; /** + * @var LoggerInterface $logger + */ + protected $logger; + + /** * @param callable $debug Optional debug output handler */ public function setDebugHandler( $debug ) { @@ -43,12 +48,8 @@ class JobRunner implements LoggerAwareInterface { } /** - * @var LoggerInterface $logger - */ - protected $logger; - - /** * @param LoggerInterface $logger + * @return void */ public function setLogger( LoggerInterface $logger ) { $this->logger = $logger; @@ -88,7 +89,7 @@ class JobRunner implements LoggerAwareInterface { * @return array Summary response that can easily be JSON serialized */ public function run( array $options ) { - global $wgJobClasses; + global $wgJobClasses, $wgTrxProfilerLimits; $response = array( 'jobs' => array(), 'reached' => 'none-ready' ); @@ -102,43 +103,43 @@ class JobRunner implements LoggerAwareInterface { return $response; } - $group = JobQueueGroup::singleton(); - // Handle any required periodic queue maintenance - $count = $group->executeReadyPeriodicTasks(); - if ( $count > 0 ) { - $msg = "Executed $count periodic queue task(s)."; - $this->logger->debug( $msg ); - $this->debugCallback( $msg ); - } - // Bail out if in read-only mode if ( wfReadOnly() ) { $response['reached'] = 'read-only'; return $response; } - // Bail out if there is too much DB lag - list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag(); - if ( $maxLag >= 5 ) { + // Catch huge single updates that lead to slave lag + $trxProfiler = Profiler::instance()->getTransactionProfiler(); + $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); + $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ ); + + // Bail out if there is too much DB lag. + // This check should not block as we want to try other wiki queues. + $maxAllowedLag = 3; + list( , $maxLag ) = wfGetLB( wfWikiID() )->getMaxLag(); + if ( $maxLag >= $maxAllowedLag ) { $response['reached'] = 'slave-lag-limit'; return $response; } + $group = JobQueueGroup::singleton(); + // Flush any pending DB writes for sanity - wfGetLBFactory()->commitMasterChanges(); + wfGetLBFactory()->commitAll(); // Some jobs types should not run until a certain timestamp $backoffs = array(); // map of (type => UNIX expiry) $backoffDeltas = array(); // map of (type => seconds) $wait = 'wait'; // block to read backoffs the first time - $jobsRun = 0; + $stats = RequestContext::getMain()->getStats(); + $jobsPopped = 0; $timeMsTotal = 0; $flags = JobQueueGroup::USE_CACHE; - $checkPeriod = 5.0; // seconds - $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes $startTime = microtime( true ); // time since jobs started running - $lastTime = microtime( true ) - $checkPhase; // time since last slave check + $checkLagPeriod = 1.0; // check slave lag this many seconds + $lastCheckTime = 1; // timestamp of last slave check do { // Sync the persistent backoffs with concurrent runners $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); @@ -154,6 +155,7 @@ class JobRunner implements LoggerAwareInterface { } if ( $job ) { // found a job + $popTime = time(); $jType = $job->getType(); // Back off of certain jobs for a while (for throttling and for errors) @@ -168,25 +170,47 @@ class JobRunner implements LoggerAwareInterface { } $msg = $job->toString() . " STARTING"; - $this->logger->info( $msg ); + $this->logger->debug( $msg ); $this->debugCallback( $msg ); // Run the job... $jobStartTime = microtime( true ); try { - ++$jobsRun; + ++$jobsPopped; $status = $job->run(); $error = $job->getLastError(); - wfGetLBFactory()->commitMasterChanges(); + $this->commitMasterChanges( $job ); + + DeferredUpdates::doUpdates(); + $this->commitMasterChanges( $job ); } catch ( Exception $e ) { MWExceptionHandler::rollbackMasterChangesAndLog( $e ); $status = false; $error = get_class( $e ) . ': ' . $e->getMessage(); MWExceptionHandler::logException( $e ); } + // Commit all outstanding connections that are in a transaction + // to get a fresh repeatable read snapshot on every connection. + wfGetLBFactory()->commitAll(); $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); $timeMsTotal += $timeMs; + // Record how long jobs wait before getting popped + $readyTs = $job->getReadyTimestamp(); + if ( $readyTs ) { + $pickupDelay = $popTime - $readyTs; + $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay ); + $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay ); + } + // Record root job age for jobs being run + $root = $job->getRootJobParams(); + if ( $root['rootJobTimestamp'] ) { + $age = $popTime - wfTimestamp( TS_UNIX, $root['rootJobTimestamp'] ); + $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age ); + } + // Track the execution time for jobs + $stats->timing( "jobqueue.run.$jType", $timeMs ); + // Mark the job as done on success or when the job cannot be retried if ( $status !== false || !$job->allowRetries() ) { $group->ack( $job ); // done @@ -218,7 +242,7 @@ class JobRunner implements LoggerAwareInterface { ); // Break out if we hit the job count or wall time limits... - if ( $maxJobs && $jobsRun >= $maxJobs ) { + if ( $maxJobs && $jobsPopped >= $maxJobs ) { $response['reached'] = 'job-limit'; break; } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) { @@ -229,21 +253,24 @@ class JobRunner implements LoggerAwareInterface { // Don't let any of the main DB slaves get backed up. // This only waits for so long before exiting and letting // other wikis in the farm (on different masters) get a chance. - $timePassed = microtime( true ) - $lastTime; - if ( $timePassed >= 5 || $timePassed < 0 ) { - if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) { + $timePassed = microtime( true ) - $lastCheckTime; + if ( $timePassed >= $checkLagPeriod || $timePassed < 0 ) { + if ( !wfWaitForSlaves( $lastCheckTime, false, '*', $maxAllowedLag ) ) { $response['reached'] = 'slave-lag-limit'; break; } - $lastTime = microtime( true ); + $lastCheckTime = microtime( true ); } // Don't let any queue slaves/backups fall behind - if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) { + if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) { $group->waitForBackups(); } // Bail if near-OOM instead of in a job - $this->assertMemoryOK(); + if ( !$this->checkMemoryOK() ) { + $response['reached'] = 'memory-limit'; + break; + } } } while ( $job ); // stop when there are no jobs @@ -298,7 +325,6 @@ class JobRunner implements LoggerAwareInterface { * @return array Map of (job type => backoff expiry timestamp) */ private function loadBackoffs( array $backoffs, $mode = 'wait' ) { - $file = wfTempDir() . '/mw-runJobs-backoffs.json'; if ( is_file( $file ) ) { $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; @@ -336,7 +362,6 @@ class JobRunner implements LoggerAwareInterface { * @return array The new backoffs account for $backoffs and the latest file data */ private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { - if ( !$deltas ) { return $this->loadBackoffs( $backoffs, $mode ); } @@ -374,9 +399,9 @@ class JobRunner implements LoggerAwareInterface { /** * Make sure that this script is not too close to the memory usage limit. * It is better to die in between jobs than OOM right in the middle of one. - * @throws MWException + * @return bool */ - private function assertMemoryOK() { + private function checkMemoryOK() { static $maxBytes = null; if ( $maxBytes === null ) { $m = array(); @@ -390,8 +415,14 @@ class JobRunner implements LoggerAwareInterface { } $usedBytes = memory_get_usage(); if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { - throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." ); + $msg = "Detected excessive memory usage ($usedBytes/$maxBytes)."; + $this->debugCallback( $msg ); + $this->logger->error( $msg ); + + return false; } + + return true; } /** @@ -403,4 +434,68 @@ class JobRunner implements LoggerAwareInterface { call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) ); } } + + /** + * Issue a commit on all masters who are currently in a transaction and have + * made changes to the database. It also supports sometimes waiting for the + * local wiki's slaves to catch up. See the documentation for + * $wgJobSerialCommitThreshold for more. + * + * @param Job $job + * @throws DBError + */ + private function commitMasterChanges( Job $job ) { + global $wgJobSerialCommitThreshold; + + $lb = wfGetLB( wfWikiID() ); + if ( $wgJobSerialCommitThreshold !== false ) { + // Generally, there is one master connection to the local DB + $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); + } else { + $dbwSerial = false; + } + + if ( !$dbwSerial + || !$dbwSerial->namedLocksEnqueue() + || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold + ) { + // Writes are all to foreign DBs, named locks don't form queues, + // or $wgJobSerialCommitThreshold is not reached; commit changes now + wfGetLBFactory()->commitMasterChanges(); + return; + } + + $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() ); + $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); + + // Wait for an exclusive lock to commit + if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) { + // This will trigger a rollback in the main loop + throw new DBError( $dbwSerial, "Timed out waiting on commit queue." ); + } + // Wait for the generic slave to catch up + $pos = $lb->getMasterPos(); + if ( $pos ) { + $lb->waitForOne( $pos ); + } + + $fname = __METHOD__; + // Re-ping all masters with transactions. This throws DBError if some + // connection died while waiting on locks/slaves, triggering a rollback. + wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) use ( $fname ) { + $lb->forEachOpenConnection( function( DatabaseBase $conn ) use ( $fname ) { + if ( $conn->writesOrCallbacksPending() ) { + $conn->query( "SELECT 1", $fname ); + } + } ); + } ); + + // Actually commit the DB master changes + wfGetLBFactory()->commitMasterChanges(); + + // Release the lock + $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ ); + } } |