diff options
Diffstat (limited to 'includes/objectcache')
-rw-r--r-- | includes/objectcache/MemcachedBagOStuff.php | 15 | ||||
-rw-r--r-- | includes/objectcache/MemcachedClient.php | 15 | ||||
-rw-r--r-- | includes/objectcache/MemcachedPeclBagOStuff.php | 22 | ||||
-rw-r--r-- | includes/objectcache/MemcachedPhpBagOStuff.php | 14 | ||||
-rw-r--r-- | includes/objectcache/MultiWriteBagOStuff.php | 88 | ||||
-rw-r--r-- | includes/objectcache/ObjectCache.php | 189 | ||||
-rw-r--r-- | includes/objectcache/ObjectCacheSessionHandler.php | 45 | ||||
-rw-r--r-- | includes/objectcache/RedisBagOStuff.php | 130 | ||||
-rw-r--r-- | includes/objectcache/SqlBagOStuff.php | 78 |
9 files changed, 384 insertions, 212 deletions
diff --git a/includes/objectcache/MemcachedBagOStuff.php b/includes/objectcache/MemcachedBagOStuff.php index 83bee700..e545aa55 100644 --- a/includes/objectcache/MemcachedBagOStuff.php +++ b/includes/objectcache/MemcachedBagOStuff.php @@ -57,12 +57,7 @@ class MemcachedBagOStuff extends BagOStuff { return $params; } - /** - * @param string $key - * @param mixed $casToken [optional] - * @return mixed - */ - public function get( $key, &$casToken = null ) { + public function get( $key, &$casToken = null, $flags = 0 ) { return $this->client->get( $this->encodeKey( $key ), $casToken ); } @@ -183,4 +178,12 @@ class MemcachedBagOStuff extends BagOStuff { protected function debugLog( $text ) { $this->logger->debug( $text ); } + + public function modifySimpleRelayEvent( array $event ) { + if ( array_key_exists( 'val', $event ) ) { + $event['flg'] = 0; // data is not serialized nor gzipped (for memcached driver) + } + + return $event; + } } diff --git a/includes/objectcache/MemcachedClient.php b/includes/objectcache/MemcachedClient.php index bc4a00b2..5010b899 100644 --- a/includes/objectcache/MemcachedClient.php +++ b/includes/objectcache/MemcachedClient.php @@ -94,6 +94,11 @@ class MWMemcached { */ const COMPRESSED = 2; + /** + * Flag: indicates data is an integer + */ + const INTVAL = 4; + // }}} /** @@ -745,13 +750,13 @@ class MWMemcached { $timeout = $this->_connect_timeout; $errno = $errstr = null; for ( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) { - wfSuppressWarnings(); + MediaWiki\suppressWarnings(); if ( $this->_persistent == 1 ) { $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout ); } else { $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout ); } - wfRestoreWarnings(); + MediaWiki\restoreWarnings(); } if ( !$sock ) { $this->_error_log( "Error connecting to $host: $errstr\n" ); @@ -979,6 +984,8 @@ class MWMemcached { */ if ( $flags & self::SERIALIZED ) { $ret[$rkey] = unserialize( $ret[$rkey] ); + } elseif ( $flags & self::INTVAL ) { + $ret[$rkey] = intval( $ret[$rkey] ); } } @@ -1027,7 +1034,9 @@ class MWMemcached { $flags = 0; - if ( !is_scalar( $val ) ) { + if ( is_int( $val ) ) { + $flags |= self::INTVAL; + } elseif ( !is_scalar( $val ) ) { $val = serialize( $val ); $flags |= self::SERIALIZED; if ( $this->_debug ) { diff --git a/includes/objectcache/MemcachedPeclBagOStuff.php b/includes/objectcache/MemcachedPeclBagOStuff.php index f2c49281..1b2c8db6 100644 --- a/includes/objectcache/MemcachedPeclBagOStuff.php +++ b/includes/objectcache/MemcachedPeclBagOStuff.php @@ -115,12 +115,7 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { $this->client->addServers( $servers ); } - /** - * @param string $key - * @param float $casToken [optional] - * @return mixed - */ - public function get( $key, &$casToken = null ) { + public function get( $key, &$casToken = null, $flags = 0 ) { $this->debugLog( "get($key)" ); $result = $this->client->get( $this->encodeKey( $key ), null, $casToken ); $result = $this->checkResult( $key, $result ); @@ -238,15 +233,16 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { return $result; } - /** - * @param array $keys - * @return array - */ - public function getMulti( array $keys ) { + public function getMulti( array $keys, $flags = 0 ) { $this->debugLog( 'getMulti(' . implode( ', ', $keys ) . ')' ); $callback = array( $this, 'encodeKey' ); - $result = $this->client->getMulti( array_map( $callback, $keys ) ); - $result = $result ?: array(); // must be an array + $encodedResult = $this->client->getMulti( array_map( $callback, $keys ) ); + $encodedResult = $encodedResult ?: array(); // must be an array + $result = array(); + foreach ( $encodedResult as $key => $value ) { + $key = $this->decodeKey( $key ); + $result[$key] = $value; + } return $this->checkResult( false, $result ); } diff --git a/includes/objectcache/MemcachedPhpBagOStuff.php b/includes/objectcache/MemcachedPhpBagOStuff.php index 6fba61ba..6f0ba588 100644 --- a/includes/objectcache/MemcachedPhpBagOStuff.php +++ b/includes/objectcache/MemcachedPhpBagOStuff.php @@ -57,13 +57,15 @@ class MemcachedPhpBagOStuff extends MemcachedBagOStuff { $this->client->set_debug( $debug ); } - /** - * @param array $keys - * @return array - */ - public function getMulti( array $keys ) { + public function getMulti( array $keys, $flags = 0 ) { $callback = array( $this, 'encodeKey' ); - return $this->client->get_multi( array_map( $callback, $keys ) ); + $encodedResult = $this->client->get_multi( array_map( $callback, $keys ) ); + $result = array(); + foreach ( $encodedResult as $key => $value ) { + $key = $this->decodeKey( $key ); + $result[$key] = $value; + } + return $result; } /** diff --git a/includes/objectcache/MultiWriteBagOStuff.php b/includes/objectcache/MultiWriteBagOStuff.php index be54e4d3..1a52930e 100644 --- a/includes/objectcache/MultiWriteBagOStuff.php +++ b/includes/objectcache/MultiWriteBagOStuff.php @@ -29,29 +29,48 @@ * @ingroup Cache */ class MultiWriteBagOStuff extends BagOStuff { - /** @var array BagOStuff[] */ + /** @var BagOStuff[] */ protected $caches; + /** @var bool Use async secondary writes */ + protected $asyncWrites = false; /** - * Constructor. Parameters are: - * - * - caches: This should have a numbered array of cache parameter - * structures, in the style required by $wgObjectCaches. See - * the documentation of $wgObjectCaches for more detail. + * $params include: + * - caches: This should have a numbered array of cache parameter + * structures, in the style required by $wgObjectCaches. See + * the documentation of $wgObjectCaches for more detail. + * BagOStuff objects can also be used as values. + * The first cache is the primary one, being the first to + * be read in the fallback chain. Writes happen to all stores + * in the order they are defined. However, lock()/unlock() calls + * only use the primary store. + * - replication: Either 'sync' or 'async'. This controls whether writes to + * secondary stores are deferred when possible. Async writes + * require the HHVM register_postsend_function() function. + * Async writes can increase the chance of some race conditions + * or cause keys to expire seconds later than expected. It is + * safe to use for modules when cached values: are immutable, + * invalidation uses logical TTLs, invalidation uses etag/timestamp + * validation against the DB, or merge() is used to handle races. * * @param array $params * @throws InvalidArgumentException */ public function __construct( $params ) { parent::__construct( $params ); - if ( !isset( $params['caches'] ) ) { - throw new InvalidArgumentException( __METHOD__ . ': the caches parameter is required' ); + + if ( empty( $params['caches'] ) || !is_array( $params['caches'] ) ) { + throw new InvalidArgumentException( __METHOD__ . ': "caches" parameter must be an array of caches' ); } $this->caches = array(); foreach ( $params['caches'] as $cacheInfo ) { - $this->caches[] = ObjectCache::newFromParams( $cacheInfo ); + $this->caches[] = ( $cacheInfo instanceof BagOStuff ) + ? $cacheInfo + : ObjectCache::newFromParams( $cacheInfo ); } + + $this->asyncWrites = isset( $params['replication'] ) && $params['replication'] === 'async'; } /** @@ -61,14 +80,9 @@ class MultiWriteBagOStuff extends BagOStuff { $this->doWrite( 'setDebug', $debug ); } - /** - * @param string $key - * @param mixed $casToken [optional] - * @return bool|mixed - */ - public function get( $key, &$casToken = null ) { + public function get( $key, &$casToken = null, $flags = 0 ) { foreach ( $this->caches as $cache ) { - $value = $cache->get( $key ); + $value = $cache->get( $key, $casToken, $flags ); if ( $value !== false ) { return $value; } @@ -126,15 +140,12 @@ class MultiWriteBagOStuff extends BagOStuff { * @param string $key * @param int $timeout * @param int $expiry + * @param string $rclass * @return bool */ - public function lock( $key, $timeout = 6, $expiry = 6 ) { + public function lock( $key, $timeout = 6, $expiry = 6, $rclass = '' ) { // Lock only the first cache, to avoid deadlocks - if ( isset( $this->caches[0] ) ) { - return $this->caches[0]->lock( $key, $timeout, $expiry ); - } else { - return true; - } + return $this->caches[0]->lock( $key, $timeout, $expiry, $rclass ); } /** @@ -142,11 +153,7 @@ class MultiWriteBagOStuff extends BagOStuff { * @return bool */ public function unlock( $key ) { - if ( isset( $this->caches[0] ) ) { - return $this->caches[0]->unlock( $key ); - } else { - return true; - } + return $this->caches[0]->unlock( $key ); } /** @@ -161,13 +168,11 @@ class MultiWriteBagOStuff extends BagOStuff { } public function getLastError() { - return isset( $this->caches[0] ) ? $this->caches[0]->getLastError() : self::ERR_NONE; + return $this->caches[0]->getLastError(); } public function clearLastError() { - if ( isset( $this->caches[0] ) ) { - $this->caches[0]->clearLastError(); - } + $this->caches[0]->clearLastError(); } /** @@ -179,11 +184,25 @@ class MultiWriteBagOStuff extends BagOStuff { $args = func_get_args(); array_shift( $args ); - foreach ( $this->caches as $cache ) { - if ( !call_user_func_array( array( $cache, $method ), $args ) ) { - $ret = false; + foreach ( $this->caches as $i => $cache ) { + if ( $i == 0 || !$this->asyncWrites ) { + // First store or in sync mode: write now and get result + if ( !call_user_func_array( array( $cache, $method ), $args ) ) { + $ret = false; + } + } else { + // Secondary write in async mode: do not block this HTTP request + $logger = $this->logger; + DeferredUpdates::addCallableUpdate( + function () use ( $cache, $method, $args, $logger ) { + if ( !call_user_func_array( array( $cache, $method ), $args ) ) { + $logger->warning( "Async $method op failed" ); + } + } + ); } } + return $ret; } @@ -202,6 +221,7 @@ class MultiWriteBagOStuff extends BagOStuff { $ret = true; } } + return $ret; } } diff --git a/includes/objectcache/ObjectCache.php b/includes/objectcache/ObjectCache.php index 2e47e24a..60191057 100644 --- a/includes/objectcache/ObjectCache.php +++ b/includes/objectcache/ObjectCache.php @@ -26,44 +26,94 @@ use MediaWiki\Logger\LoggerFactory; /** * Functions to get cache objects * + * The word "cache" has two main dictionary meanings, and both + * are used in this factory class. They are: + * + * - a) Cache (the computer science definition). + * A place to store copies or computations on existing data for + * higher access speeds. + * - b) Storage. + * A place to store lightweight data that is not canonically + * stored anywhere else (e.g. a "hoard" of objects). + * + * The former should always use strongly consistent stores, so callers don't + * have to deal with stale reads. The later may be eventually consistent, but + * callers can use BagOStuff:READ_LATEST to see the latest available data. + * + * Primary entry points: + * + * - ObjectCache::newAccelerator( $fallbackType ) + * Purpose: Cache. + * Stored only on the individual web server. + * Not associated with other servers. + * + * - wfGetMainCache() + * Purpose: Cache. + * Stored centrally within the local data-center. + * Not replicated to other DCs. + * Also known as $wgMemc. Configured by $wgMainCacheType. + * + * - ObjectCache::getMainWANInstance() + * Purpose: Cache. + * Stored in the local data-center's main cache (uses different cache keys). + * Delete events are broadcasted to other DCs. See WANObjectCache for details. + * + * - ObjectCache::getMainStashInstance() + * Purpose: Ephemeral storage. + * Stored centrally within the local data-center. + * Changes are replicated to other DCs (eventually consistent). + * To retrieve the latest value (e.g. not from a slave), use BagOStuff:READ_LATEST. + * This store may be subject to LRU style evictions. + * + * - wfGetCache( $cacheType ) + * Get a specific cache type by key in $wgObjectCaches. + * * @ingroup Cache */ class ObjectCache { + /** @var Array Map of (id => BagOStuff) */ public static $instances = array(); + /** @var Array Map of (id => WANObjectCache) */ + public static $wanInstances = array(); + /** * Get a cached instance of the specified type of cache object. * - * @param string $id - * + * @param string $id A key in $wgObjectCaches. * @return BagOStuff */ - static function getInstance( $id ) { - if ( isset( self::$instances[$id] ) ) { - return self::$instances[$id]; + public static function getInstance( $id ) { + if ( !isset( self::$instances[$id] ) ) { + self::$instances[$id] = self::newFromId( $id ); } - $object = self::newFromId( $id ); - self::$instances[$id] = $object; - return $object; + return self::$instances[$id]; } /** - * Clear all the cached instances. + * Get a cached instance of the specified type of WAN cache object. + * + * @since 1.26 + * @param string $id A key in $wgWANObjectCaches. + * @return WANObjectCache */ - static function clear() { - self::$instances = array(); + public static function getWANInstance( $id ) { + if ( !isset( self::$wanInstances[$id] ) ) { + self::$wanInstances[$id] = self::newWANCacheFromId( $id ); + } + + return self::$wanInstances[$id]; } /** * Create a new cache object of the specified type. * - * @param string $id - * - * @throws MWException + * @param string $id A key in $wgObjectCaches. * @return BagOStuff + * @throws MWException */ - static function newFromId( $id ) { + public static function newFromId( $id ) { global $wgObjectCaches; if ( !isset( $wgObjectCaches[$id] ) ) { @@ -75,14 +125,17 @@ class ObjectCache { } /** - * Create a new cache object from parameters - * - * @param array $params + * Create a new cache object from parameters. * - * @throws MWException + * @param array $params Must have 'factory' or 'class' property. + * - factory: Callback passed $params that returns BagOStuff. + * - class: BagOStuff subclass constructed with $params. + * - loggroup: Alias to set 'logger' key with LoggerFactory group. + * - .. Other parameters passed to factory or class. * @return BagOStuff + * @throws MWException */ - static function newFromParams( $params ) { + public static function newFromParams( $params ) { if ( isset( $params['loggroup'] ) ) { $params['logger'] = LoggerFactory::getInstance( $params['loggroup'] ); } else { @@ -103,7 +156,7 @@ class ObjectCache { } /** - * Factory function referenced from DefaultSettings.php for CACHE_ANYTHING + * Factory function for CACHE_ANYTHING (referenced from DefaultSettings.php) * * CACHE_ANYTHING means that stuff has to be cached, not caching is not an option. * If a caching method is configured for any of the main caches ($wgMainCacheType, @@ -111,10 +164,11 @@ class ObjectCache { * be an alias to the configured cache choice for that. * If no cache choice is configured (by default $wgMainCacheType is CACHE_NONE), * then CACHE_ANYTHING will forward to CACHE_DB. + * * @param array $params * @return BagOStuff */ - static function newAnything( $params ) { + public static function newAnything( $params ) { global $wgMainCacheType, $wgMessageCacheType, $wgParserCacheType; $candidates = array( $wgMainCacheType, $wgMessageCacheType, $wgParserCacheType ); foreach ( $candidates as $candidate ) { @@ -126,17 +180,20 @@ class ObjectCache { } /** - * Factory function referenced from DefaultSettings.php for CACHE_ACCEL. + * Factory function for CACHE_ACCEL (referenced from DefaultSettings.php) * * This will look for any APC style server-local cache. * A fallback cache can be specified if none is found. * - * @param array $params + * @param array $params [optional] * @param int|string $fallback Fallback cache, e.g. (CACHE_NONE, "hash") (since 1.24) - * @throws MWException * @return BagOStuff + * @throws MWException */ - static function newAccelerator( $params, $fallback = null ) { + public static function newAccelerator( $params = array(), $fallback = null ) { + if ( !is_array( $params ) && $fallback === null ) { + $fallback = $params; + } if ( function_exists( 'apc_fetch' ) ) { $id = 'apc'; } elseif ( function_exists( 'xcache_get' ) && wfIniGetBool( 'xcache.var_size' ) ) { @@ -144,11 +201,11 @@ class ObjectCache { } elseif ( function_exists( 'wincache_ucache_get' ) ) { $id = 'wincache'; } else { - if ( $fallback !== null ) { - return self::newFromId( $fallback ); + if ( $fallback === null ) { + throw new MWException( 'CACHE_ACCEL requested but no suitable object ' . + 'cache is present. You may want to install APC.' ); } - throw new MWException( "CACHE_ACCEL requested but no suitable object " . - "cache is present. You may want to install APC." ); + $id = $fallback; } return self::newFromId( $id ); } @@ -161,10 +218,78 @@ class ObjectCache { * switching between the two clients randomly would be disastrous. * * @param array $params - * * @return MemcachedPhpBagOStuff */ - static function newMemcached( $params ) { + public static function newMemcached( $params ) { return new MemcachedPhpBagOStuff( $params ); } + + /** + * Create a new cache object of the specified type. + * + * @since 1.26 + * @param string $id A key in $wgWANObjectCaches. + * @return WANObjectCache + * @throws MWException + */ + public static function newWANCacheFromId( $id ) { + global $wgWANObjectCaches; + + if ( !isset( $wgWANObjectCaches[$id] ) ) { + throw new MWException( "Invalid object cache type \"$id\" requested. " . + "It is not present in \$wgWANObjectCaches." ); + } + + $params = $wgWANObjectCaches[$id]; + $class = $params['relayerConfig']['class']; + $params['relayer'] = new $class( $params['relayerConfig'] ); + $params['cache'] = self::newFromId( $params['cacheId'] ); + $class = $params['class']; + + return new $class( $params ); + } + + /** + * Get the main WAN cache object. + * + * @since 1.26 + * @return WANObjectCache + */ + public static function getMainWANInstance() { + global $wgMainWANCache; + + return self::getWANInstance( $wgMainWANCache ); + } + + /** + * Get the cache object for the main stash. + * + * Stash objects are BagOStuff instances suitable for storing light + * weight data that is not canonically stored elsewhere (such as RDBMS). + * Stashes should be configured to propagate changes to all data-centers. + * + * Callers should be prepared for: + * - a) Writes to be slower in non-"primary" (e.g. HTTP GET/HEAD only) DCs + * - b) Reads to be eventually consistent, e.g. for get()/getMulti() + * In general, this means avoiding updates on idempotent HTTP requests and + * avoiding an assumption of perfect serializability (or accepting anomalies). + * Reads may be eventually consistent or data might rollback as nodes flap. + * Callers can use BagOStuff:READ_LATEST to see the latest available data. + * + * @return BagOStuff + * @since 1.26 + */ + public static function getMainStashInstance() { + global $wgMainStash; + + return self::getInstance( $wgMainStash ); + } + + /** + * Clear all the cached instances. + */ + public static function clear() { + self::$instances = array(); + self::$wanInstances = array(); + } } diff --git a/includes/objectcache/ObjectCacheSessionHandler.php b/includes/objectcache/ObjectCacheSessionHandler.php index 789f1e3b..2a5d6955 100644 --- a/includes/objectcache/ObjectCacheSessionHandler.php +++ b/includes/objectcache/ObjectCacheSessionHandler.php @@ -21,6 +21,8 @@ * @ingroup Cache */ +use MediaWiki\Logger\LoggerFactory; + /** * Session storage in object cache. * Used if $wgSessionsInObjectCache is true. @@ -106,7 +108,11 @@ class ObjectCacheSessionHandler { * @return mixed Session data */ static function read( $id ) { + $stime = microtime( true ); $data = self::getCache()->get( self::getKey( $id ) ); + $real = microtime( true ) - $stime; + + RequestContext::getMain()->getStats()->timing( "session.read", 1000 * $real ); self::$hashCache = array( $id => self::getHash( $data ) ); @@ -127,7 +133,11 @@ class ObjectCacheSessionHandler { if ( !isset( self::$hashCache[$id] ) || self::getHash( $data ) !== self::$hashCache[$id] ) { + $stime = microtime( true ); self::getCache()->set( self::getKey( $id ), $data, $wgObjectCacheSessionExpiry ); + $real = microtime( true ) - $stime; + + RequestContext::getMain()->getStats()->timing( "session.write", 1000 * $real ); } return true; @@ -140,7 +150,11 @@ class ObjectCacheSessionHandler { * @return bool Success */ static function destroy( $id ) { + $stime = microtime( true ); self::getCache()->delete( self::getKey( $id ) ); + $real = microtime( true ) - $stime; + + RequestContext::getMain()->getStats()->timing( "session.destroy", 1000 * $real ); return true; } @@ -157,10 +171,37 @@ class ObjectCacheSessionHandler { } /** - * Shutdown function. See the comment inside ObjectCacheSessionHandler::install - * for rationale. + * Shutdown function. + * See the comment inside ObjectCacheSessionHandler::install for rationale. */ static function handleShutdown() { session_write_close(); } + + /** + * Pre-emptive session renewal function + */ + static function renewCurrentSession() { + global $wgObjectCacheSessionExpiry; + + // Once a session is at half TTL, renew it + $window = $wgObjectCacheSessionExpiry / 2; + $logger = LoggerFactory::getInstance( 'SessionHandler' ); + + $now = microtime( true ); + // Session are only written in object stores when $_SESSION changes, + // which also renews the TTL ($wgObjectCacheSessionExpiry). If a user + // is active but not causing session data changes, it may suddenly + // expire as they view a form, blocking the first submission. + // Make a dummy change every so often to avoid this. + if ( !isset( $_SESSION['wsExpiresUnix'] ) ) { + $_SESSION['wsExpiresUnix'] = $now + $wgObjectCacheSessionExpiry; + + $logger->info( "Set expiry for session " . session_id(), array() ); + } elseif ( ( $now + $window ) > $_SESSION['wsExpiresUnix'] ) { + $_SESSION['wsExpiresUnix'] = $now + $wgObjectCacheSessionExpiry; + + $logger->info( "Renewed session " . session_id(), array() ); + } + } } diff --git a/includes/objectcache/RedisBagOStuff.php b/includes/objectcache/RedisBagOStuff.php index de3511df..7d9903fe 100644 --- a/includes/objectcache/RedisBagOStuff.php +++ b/includes/objectcache/RedisBagOStuff.php @@ -20,11 +20,18 @@ * @file */ +/** + * Redis-based caching module for redis server >= 2.6.12 + * + * @note: avoid use of Redis::MULTI transactions for twemproxy support + */ class RedisBagOStuff extends BagOStuff { /** @var RedisConnectionPool */ protected $redisPool; /** @var array List of server names */ protected $servers; + /** @var array Map of (tag => server name) */ + protected $serverTagMap; /** @var bool */ protected $automaticFailover; @@ -34,7 +41,8 @@ class RedisBagOStuff extends BagOStuff { * - servers: An array of server names. A server name may be a hostname, * a hostname/port combination or the absolute path of a UNIX socket. * If a hostname is specified but no port, the standard port number - * 6379 will be used. Required. + * 6379 will be used. Arrays keys can be used to specify the tag to + * hash on in place of the host/port. Required. * * - connectTimeout: The timeout for new connections, in seconds. Optional, * default is 1 second. @@ -66,6 +74,10 @@ class RedisBagOStuff extends BagOStuff { $this->redisPool = RedisConnectionPool::singleton( $redisConf ); $this->servers = $params['servers']; + foreach ( $this->servers as $key => $server ) { + $this->serverTagMap[is_int( $key ) ? $server : $key] = $server; + } + if ( isset( $params['automaticFailover'] ) ) { $this->automaticFailover = $params['automaticFailover']; } else { @@ -73,8 +85,7 @@ class RedisBagOStuff extends BagOStuff { } } - public function get( $key, &$casToken = null ) { - + public function get( $key, &$casToken = null, $flags = 0 ) { list( $server, $conn ) = $this->getConnection( $key ); if ( !$conn ) { return false; @@ -93,7 +104,6 @@ class RedisBagOStuff extends BagOStuff { } public function set( $key, $value, $expiry = 0 ) { - list( $server, $conn ) = $this->getConnection( $key ); if ( !$conn ) { return false; @@ -115,41 +125,7 @@ class RedisBagOStuff extends BagOStuff { return $result; } - protected function cas( $casToken, $key, $value, $expiry = 0 ) { - - list( $server, $conn ) = $this->getConnection( $key ); - if ( !$conn ) { - return false; - } - $expiry = $this->convertToRelative( $expiry ); - try { - $conn->watch( $key ); - - if ( $this->serialize( $this->get( $key ) ) !== $casToken ) { - $conn->unwatch(); - return false; - } - - // multi()/exec() will fail atomically if the key changed since watch() - $conn->multi(); - if ( $expiry ) { - $conn->setex( $key, $expiry, $this->serialize( $value ) ); - } else { - // No expiry, that is very different from zero expiry in Redis - $conn->set( $key, $this->serialize( $value ) ); - } - $result = ( $conn->exec() == array( true ) ); - } catch ( RedisException $e ) { - $result = false; - $this->handleException( $conn, $e ); - } - - $this->logRequest( 'cas', $key, $server, $result ); - return $result; - } - public function delete( $key ) { - list( $server, $conn ) = $this->getConnection( $key ); if ( !$conn ) { return false; @@ -167,8 +143,7 @@ class RedisBagOStuff extends BagOStuff { return $result; } - public function getMulti( array $keys ) { - + public function getMulti( array $keys, $flags = 0 ) { $batches = array(); $conns = array(); foreach ( $keys as $key ) { @@ -213,7 +188,6 @@ class RedisBagOStuff extends BagOStuff { * @return bool */ public function setMulti( array $data, $expiry = 0 ) { - $batches = array(); $conns = array(); foreach ( $data as $key => $value ) { @@ -257,10 +231,7 @@ class RedisBagOStuff extends BagOStuff { return $result; } - - public function add( $key, $value, $expiry = 0 ) { - list( $server, $conn ) = $this->getConnection( $key ); if ( !$conn ) { return false; @@ -268,10 +239,11 @@ class RedisBagOStuff extends BagOStuff { $expiry = $this->convertToRelative( $expiry ); try { if ( $expiry ) { - $conn->multi(); - $conn->setnx( $key, $this->serialize( $value ) ); - $conn->expire( $key, $expiry ); - $result = ( $conn->exec() == array( true, true ) ); + $result = $conn->set( + $key, + $this->serialize( $value ), + array( 'nx', 'ex' => $expiry ) + ); } else { $result = $conn->setnx( $key, $this->serialize( $value ) ); } @@ -297,7 +269,6 @@ class RedisBagOStuff extends BagOStuff { * @return int|bool New value or false on failure */ public function incr( $key, $value = 1 ) { - list( $server, $conn ) = $this->getConnection( $key ); if ( !$conn ) { return false; @@ -306,6 +277,7 @@ class RedisBagOStuff extends BagOStuff { return null; } try { + // @FIXME: on races, the key may have a 0 TTL $result = $conn->incrBy( $key, $value ); } catch ( RedisException $e ) { $result = false; @@ -316,12 +288,12 @@ class RedisBagOStuff extends BagOStuff { return $result; } - public function merge( $key, $callback, $exptime = 0, $attempts = 10 ) { - if ( !is_callable( $callback ) ) { - throw new Exception( "Got invalid callback." ); + public function modifySimpleRelayEvent( array $event ) { + if ( array_key_exists( 'val', $event ) ) { + $event['val'] = serialize( $event['val'] ); // this class uses PHP serialization } - return $this->mergeViaCas( $key, $callback, $exptime, $attempts ); + return $event; } /** @@ -348,27 +320,65 @@ class RedisBagOStuff extends BagOStuff { * @return array (server, RedisConnRef) or (false, false) */ protected function getConnection( $key ) { - if ( count( $this->servers ) === 1 ) { - $candidates = $this->servers; - } else { - $candidates = $this->servers; + $candidates = array_keys( $this->serverTagMap ); + + if ( count( $this->servers ) > 1 ) { ArrayUtils::consistentHashSort( $candidates, $key, '/' ); if ( !$this->automaticFailover ) { $candidates = array_slice( $candidates, 0, 1 ); } } - foreach ( $candidates as $server ) { + while ( ( $tag = array_shift( $candidates ) ) !== null ) { + $server = $this->serverTagMap[$tag]; $conn = $this->redisPool->getConnection( $server ); - if ( $conn ) { - return array( $server, $conn ); + if ( !$conn ) { + continue; } + + // If automatic failover is enabled, check that the server's link + // to its master (if any) is up -- but only if there are other + // viable candidates left to consider. Also, getMasterLinkStatus() + // does not work with twemproxy, though $candidates will be empty + // by now in such cases. + if ( $this->automaticFailover && $candidates ) { + try { + if ( $this->getMasterLinkStatus( $conn ) === 'down' ) { + // If the master cannot be reached, fail-over to the next server. + // If masters are in data-center A, and slaves in data-center B, + // this helps avoid the case were fail-over happens in A but not + // to the corresponding server in B (e.g. read/write mismatch). + continue; + } + } catch ( RedisException $e ) { + // Server is not accepting commands + $this->handleException( $conn, $e ); + continue; + } + } + + return array( $server, $conn ); } + $this->setLastError( BagOStuff::ERR_UNREACHABLE ); + return array( false, false ); } /** + * Check the master link status of a Redis server that is configured as a slave. + * @param RedisConnRef $conn + * @return string|null Master link status (either 'up' or 'down'), or null + * if the server is not a slave. + */ + protected function getMasterLinkStatus( RedisConnRef $conn ) { + $info = $conn->info(); + return isset( $info['master_link_status'] ) + ? $info['master_link_status'] + : null; + } + + /** * Log a fatal error * @param string $msg */ diff --git a/includes/objectcache/SqlBagOStuff.php b/includes/objectcache/SqlBagOStuff.php index 82eeb842..e08fec99 100644 --- a/includes/objectcache/SqlBagOStuff.php +++ b/includes/objectcache/SqlBagOStuff.php @@ -30,6 +30,7 @@ class SqlBagOStuff extends BagOStuff { /** @var LoadBalancer */ protected $lb; + /** @var array */ protected $serverInfos; /** @var array */ @@ -53,6 +54,9 @@ class SqlBagOStuff extends BagOStuff { /** @var string */ protected $tableName = 'objectcache'; + /** @var bool */ + protected $slaveOnly = false; + /** @var array UNIX timestamps */ protected $connFailureTimes = array(); @@ -84,6 +88,10 @@ class SqlBagOStuff extends BagOStuff { * required to hold the largest shard index. Data will be * distributed across all tables by key hash. This is for * MySQL bugs 61735 and 61736. + * - slaveOnly: Whether to only use slave DBs and avoid triggering + * garbage collection logic of expired items. This only + * makes sense if the primary DB is used and only if get() + * calls will be used. This is used by ReplicatedBagOStuff. * * @param array $params */ @@ -112,6 +120,7 @@ class SqlBagOStuff extends BagOStuff { if ( isset( $params['shards'] ) ) { $this->shards = intval( $params['shards'] ); } + $this->slaveOnly = !empty( $params['slaveOnly'] ); } /** @@ -155,12 +164,13 @@ class SqlBagOStuff extends BagOStuff { * However, SQLite has an opposite behavior. And PostgreSQL needs to know * if we are in transaction or no */ - if ( wfGetDB( DB_MASTER )->getType() == 'mysql' ) { + $index = $this->slaveOnly ? DB_SLAVE : DB_MASTER; + if ( wfGetDB( $index )->getType() == 'mysql' ) { $this->lb = wfGetLBFactory()->newMainLB(); - $db = $this->lb->getConnection( DB_MASTER ); + $db = $this->lb->getConnection( $index ); $db->clearFlag( DBO_TRX ); // auto-commit mode } else { - $db = wfGetDB( DB_MASTER ); + $db = wfGetDB( $index ); } } if ( $wgDebugDBTransactions ) { @@ -210,12 +220,7 @@ class SqlBagOStuff extends BagOStuff { } } - /** - * @param string $key - * @param mixed $casToken [optional] - * @return mixed - */ - public function get( $key, &$casToken = null ) { + public function get( $key, &$casToken = null, $flags = 0 ) { $values = $this->getMulti( array( $key ) ); if ( array_key_exists( $key, $values ) ) { $casToken = $values[$key]; @@ -224,11 +229,7 @@ class SqlBagOStuff extends BagOStuff { return false; } - /** - * @param array $keys - * @return array - */ - public function getMulti( array $keys ) { + public function getMulti( array $keys, $flags = 0 ) { $values = array(); // array of (key => value) $keysByTable = array(); @@ -274,12 +275,7 @@ class SqlBagOStuff extends BagOStuff { try { $db = $this->getDB( $row->serverIndex ); if ( $this->isExpired( $db, $row->exptime ) ) { // MISS - $this->debug( "get: key has expired, deleting" ); - # Put the expiry time in the WHERE condition to avoid deleting a - # newly-inserted value - $db->delete( $row->tableName, - array( 'keyname' => $key, 'exptime' => $row->exptime ), - __METHOD__ ); + $this->debug( "get: key has expired" ); } else { // HIT $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) ); } @@ -358,8 +354,6 @@ class SqlBagOStuff extends BagOStuff { return $result; } - - /** * @param string $key * @param mixed $value @@ -367,37 +361,7 @@ class SqlBagOStuff extends BagOStuff { * @return bool */ public function set( $key, $value, $exptime = 0 ) { - list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); - try { - $db = $this->getDB( $serverIndex ); - $exptime = intval( $exptime ); - - if ( $exptime < 0 ) { - $exptime = 0; - } - - if ( $exptime == 0 ) { - $encExpiry = $this->getMaxDateTime( $db ); - } else { - $exptime = $this->convertExpiry( $exptime ); - $encExpiry = $db->timestamp( $exptime ); - } - // (bug 24425) use a replace if the db supports it instead of - // delete/insert to avoid clashes with conflicting keynames - $db->replace( - $tableName, - array( 'keyname' ), - array( - 'keyname' => $key, - 'value' => $db->encodeBlob( $this->serialize( $value ) ), - 'exptime' => $encExpiry - ), __METHOD__ ); - } catch ( DBError $e ) { - $this->handleWriteError( $e, $serverIndex ); - return false; - } - - return true; + return $this->setMulti( array( $key => $value ), $exptime ); } /** @@ -546,7 +510,7 @@ class SqlBagOStuff extends BagOStuff { } protected function garbageCollect() { - if ( !$this->purgePeriod ) { + if ( !$this->purgePeriod || $this->slaveOnly ) { // Disabled return; } @@ -688,9 +652,9 @@ class SqlBagOStuff extends BagOStuff { */ protected function unserialize( $serial ) { if ( function_exists( 'gzinflate' ) ) { - wfSuppressWarnings(); + MediaWiki\suppressWarnings(); $decomp = gzinflate( $serial ); - wfRestoreWarnings(); + MediaWiki\restoreWarnings(); if ( false !== $decomp ) { $serial = $decomp; @@ -758,6 +722,8 @@ class SqlBagOStuff extends BagOStuff { * @param int $serverIndex */ protected function markServerDown( $exception, $serverIndex ) { + unset( $this->conns[$serverIndex] ); // bug T103435 + if ( isset( $this->connFailureTimes[$serverIndex] ) ) { if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) { unset( $this->connFailureTimes[$serverIndex] ); |