From 08aa4418c30cfc18ccc69a0f0f9cb9e17be6c196 Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Mon, 12 Aug 2013 09:28:15 +0200 Subject: Update to MediaWiki 1.21.1 --- includes/objectcache/APCBagOStuff.php | 43 +- includes/objectcache/BagOStuff.php | 131 ++++- includes/objectcache/DBABagOStuff.php | 63 ++- includes/objectcache/EhcacheBagOStuff.php | 72 ++- includes/objectcache/EmptyBagOStuff.php | 25 +- includes/objectcache/HashBagOStuff.php | 28 +- includes/objectcache/MemcachedBagOStuff.php | 27 +- includes/objectcache/MemcachedClient.php | 187 +++++--- includes/objectcache/MemcachedPeclBagOStuff.php | 27 +- includes/objectcache/MemcachedPhpBagOStuff.php | 3 +- includes/objectcache/MultiWriteBagOStuff.php | 29 +- includes/objectcache/ObjectCache.php | 8 +- includes/objectcache/ObjectCacheSessionHandler.php | 8 +- includes/objectcache/RedisBagOStuff.php | 189 +++----- includes/objectcache/SqlBagOStuff.php | 528 ++++++++++++--------- includes/objectcache/WinCacheBagOStuff.php | 47 +- includes/objectcache/XCacheBagOStuff.php | 40 +- 17 files changed, 906 insertions(+), 549 deletions(-) (limited to 'includes/objectcache') diff --git a/includes/objectcache/APCBagOStuff.php b/includes/objectcache/APCBagOStuff.php index 1a0de218..3fb80835 100644 --- a/includes/objectcache/APCBagOStuff.php +++ b/includes/objectcache/APCBagOStuff.php @@ -29,11 +29,14 @@ class APCBagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] int * @return mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { $val = apc_fetch( $key ); + $casToken = $val; + if ( is_string( $val ) ) { if ( $this->isInteger( $val ) ) { $val = intval( $val ); @@ -61,6 +64,18 @@ class APCBagOStuff extends BagOStuff { return true; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + // APC's CAS functions only work on integers + throw new MWException( "CAS is not implemented in " . __CLASS__ ); + } + /** * @param $key string * @param $time int @@ -72,6 +87,17 @@ class APCBagOStuff extends BagOStuff { return true; } + /** + * @param $key string + * @param $callback closure Callback method to be executed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @param int $attempts The amount of times to attempt a merge in case of failure + * @return bool success + */ + public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + return $this->mergeViaLock( $key, $callback, $exptime, $attempts ); + } + public function incr( $key, $value = 1 ) { return apc_inc( $key, $value ); } @@ -79,19 +105,4 @@ class APCBagOStuff extends BagOStuff { public function decr( $key, $value = 1 ) { return apc_dec( $key, $value ); } - - /** - * @return Array - */ - public function keys() { - $info = apc_cache_info( 'user' ); - $list = $info['cache_list']; - $keys = array(); - - foreach ( $list as $entry ) { - $keys[] = $entry['info']; - } - - return $keys; - } } diff --git a/includes/objectcache/BagOStuff.php b/includes/objectcache/BagOStuff.php index 7bbaff93..dd744672 100644 --- a/includes/objectcache/BagOStuff.php +++ b/includes/objectcache/BagOStuff.php @@ -56,58 +56,153 @@ abstract class BagOStuff { /** * Get an item with the given key. Returns false if it does not exist. * @param $key string + * @param $casToken[optional] mixed * @return mixed Returns false on failure */ - abstract public function get( $key ); + abstract public function get( $key, &$casToken = null ); /** * Set an item. * @param $key string * @param $value mixed - * @param $exptime int Either an interval in seconds or a unix timestamp for expiry + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry * @return bool success */ abstract public function set( $key, $value, $exptime = 0 ); + /** + * Check and set an item. + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @return bool success + */ + abstract public function cas( $casToken, $key, $value, $exptime = 0 ); + /** * Delete an item. * @param $key string - * @param $time int Amount of time to delay the operation (mostly memcached-specific) + * @param int $time Amount of time to delay the operation (mostly memcached-specific) * @return bool True if the item was deleted or not found, false on failure */ abstract public function delete( $key, $time = 0 ); /** + * Merge changes into the existing cache value (possibly creating a new one). + * The callback function returns the new value given the current value (possibly false), + * and takes the arguments: (this BagOStuff object, cache key, current value). + * * @param $key string - * @param $timeout integer + * @param $callback closure Callback method to be executed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @param int $attempts The amount of times to attempt a merge in case of failure * @return bool success */ - public function lock( $key, $timeout = 0 ) { - /* stub */ - return true; + public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + return $this->mergeViaCas( $key, $callback, $exptime, $attempts ); } /** + * @see BagOStuff::merge() + * * @param $key string + * @param $callback closure Callback method to be executed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @param int $attempts The amount of times to attempt a merge in case of failure * @return bool success */ - public function unlock( $key ) { - /* stub */ - return true; + protected function mergeViaCas( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + do { + $casToken = null; // passed by reference + $currentValue = $this->get( $key, $casToken ); // get the old value + $value = $callback( $this, $key, $currentValue ); // derive the new value + + if ( $value === false ) { + $success = true; // do nothing + } elseif ( $currentValue === false ) { + // Try to create the key, failing if it gets created in the meantime + $success = $this->add( $key, $value, $exptime ); + } else { + // Try to update the key, failing if it gets changed in the meantime + $success = $this->cas( $casToken, $key, $value, $exptime ); + } + } while ( !$success && --$attempts ); + + return $success; } /** - * @todo: what is this? - * @return Array + * @see BagOStuff::merge() + * + * @param $key string + * @param $callback closure Callback method to be executed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @param int $attempts The amount of times to attempt a merge in case of failure + * @return bool success */ - public function keys() { - /* stub */ - return array(); + protected function mergeViaLock( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + if ( !$this->lock( $key, 60 ) ) { + return false; + } + + $currentValue = $this->get( $key ); // get the old value + $value = $callback( $this, $key, $currentValue ); // derive the new value + + if ( $value === false ) { + $success = true; // do nothing + } else { + $success = $this->set( $key, $value, $exptime ); // set the new value + } + + if ( !$this->unlock( $key ) ) { + // this should never happen + trigger_error( "Could not release lock for key '$key'." ); + } + + return $success; + } + + /** + * @param $key string + * @param $timeout integer [optional] + * @return bool success + */ + public function lock( $key, $timeout = 60 ) { + $timestamp = microtime( true ); // starting UNIX timestamp + if ( $this->add( "{$key}:lock", $timeout ) ) { + return true; + } + + $uRTT = ceil( 1e6 * ( microtime( true ) - $timestamp ) ); // estimate RTT (us) + $sleep = 2*$uRTT; // rough time to do get()+set() + + $locked = false; // lock acquired + $attempts = 0; // failed attempts + do { + if ( ++$attempts >= 3 && $sleep <= 1e6 ) { + // Exponentially back off after failed attempts to avoid network spam. + // About 2*$uRTT*(2^n-1) us of "sleep" happen for the next n attempts. + $sleep *= 2; + } + usleep( $sleep ); // back off + $locked = $this->add( "{$key}:lock", $timeout ); + } while( !$locked ); + + return $locked; + } + + /** + * @param $key string + * @return bool success + */ + public function unlock( $key ) { + return $this->delete( "{$key}:lock" ); } /** * Delete all objects expiring before a certain date. - * @param $date string The reference date in MW format + * @param string $date The reference date in MW format * @param $progressCallback callback|bool Optional, a function which will be called * regularly during long-running operations with the percentage progress * as the first parameter. @@ -123,7 +218,7 @@ abstract class BagOStuff { /** * Get an associative array containing the item for each of the keys that have items. - * @param $keys Array List of strings + * @param array $keys List of strings * @return Array */ public function getMulti( array $keys ) { @@ -165,7 +260,7 @@ abstract class BagOStuff { /** * Increase stored value of $key by $value while preserving its TTL - * @param $key String: Key to increase + * @param string $key Key to increase * @param $value Integer: Value to add to $key (Default 1) * @return integer|bool New value or false on failure */ diff --git a/includes/objectcache/DBABagOStuff.php b/includes/objectcache/DBABagOStuff.php index 36ced496..c82b3aa4 100644 --- a/includes/objectcache/DBABagOStuff.php +++ b/includes/objectcache/DBABagOStuff.php @@ -111,9 +111,10 @@ class DBABagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] mixed * @return mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { wfProfileIn( __METHOD__ ); wfDebug( __METHOD__ . "($key)\n" ); @@ -138,7 +139,10 @@ class DBABagOStuff extends BagOStuff { $val = false; } + $casToken = $val; + wfProfileOut( __METHOD__ ); + return $val; } @@ -167,6 +171,42 @@ class DBABagOStuff extends BagOStuff { return $ret; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + wfProfileIn( __METHOD__ ); + wfDebug( __METHOD__ . "($key)\n" ); + + $blob = $this->encode( $value, $exptime ); + + $handle = $this->getWriter(); + if ( !$handle ) { + wfProfileOut( __METHOD__ ); + return false; + } + + // DBA is locked to any other write connection, so we can safely + // compare the current & previous value before saving new value + $val = dba_fetch( $key, $handle ); + list( $val, $exptime ) = $this->decode( $val ); + if ( $casToken !== $val ) { + dba_close( $handle ); + wfProfileOut( __METHOD__ ); + return false; + } + + $ret = dba_replace( $key, $blob, $handle ); + dba_close( $handle ); + + wfProfileOut( __METHOD__ ); + return $ret; + } + /** * @param $key string * @param $time int @@ -211,7 +251,7 @@ class DBABagOStuff extends BagOStuff { # Insert failed, check to see if it failed due to an expired key if ( !$ret ) { - list( $value, $expiry ) = $this->decode( dba_fetch( $key, $handle ) ); + list( , $expiry ) = $this->decode( dba_fetch( $key, $handle ) ); if ( $expiry && $expiry < time() ) { # Yes expired, delete and try again @@ -264,23 +304,4 @@ class DBABagOStuff extends BagOStuff { return ( $value === false ) ? false : (int)$value; } - - function keys() { - $reader = $this->getReader(); - $k1 = dba_firstkey( $reader ); - - if ( !$k1 ) { - return array(); - } - - $result[] = $k1; - - $key = dba_nextkey( $reader ); - while ( $key ) { - $result[] = $key; - $key = dba_nextkey( $reader ); - } - - return $result; - } } diff --git a/includes/objectcache/EhcacheBagOStuff.php b/includes/objectcache/EhcacheBagOStuff.php index f86cf157..960668f5 100644 --- a/includes/objectcache/EhcacheBagOStuff.php +++ b/includes/objectcache/EhcacheBagOStuff.php @@ -28,27 +28,28 @@ * @ingroup Cache */ class EhcacheBagOStuff extends BagOStuff { - var $servers, $cacheName, $connectTimeout, $timeout, $curlOptions, + var $servers, $cacheName, $connectTimeout, $timeout, $curlOptions, $requestData, $requestDataPos; - + var $curls = array(); /** * @param $params array + * @throws MWException */ function __construct( $params ) { if ( !defined( 'CURLOPT_TIMEOUT_MS' ) ) { - throw new MWException( __CLASS__.' requires curl version 7.16.2 or later.' ); + throw new MWException( __CLASS__ . ' requires curl version 7.16.2 or later.' ); } if ( !extension_loaded( 'zlib' ) ) { - throw new MWException( __CLASS__.' requires the zlib extension' ); + throw new MWException( __CLASS__ . ' requires the zlib extension' ); } if ( !isset( $params['servers'] ) ) { - throw new MWException( __METHOD__.': servers parameter is required' ); + throw new MWException( __METHOD__ . ': servers parameter is required' ); } $this->servers = $params['servers']; $this->cacheName = isset( $params['cache'] ) ? $params['cache'] : 'mw'; - $this->connectTimeout = isset( $params['connectTimeout'] ) + $this->connectTimeout = isset( $params['connectTimeout'] ) ? $params['connectTimeout'] : 1; $this->timeout = isset( $params['timeout'] ) ? $params['timeout'] : 1; $this->curlOptions = array( @@ -64,9 +65,10 @@ class EhcacheBagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] mixed * @return bool|mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { wfProfileIn( __METHOD__ ); $response = $this->doItemRequest( $key ); if ( !$response || $response['http_code'] == 404 ) { @@ -74,16 +76,16 @@ class EhcacheBagOStuff extends BagOStuff { return false; } if ( $response['http_code'] >= 300 ) { - wfDebug( __METHOD__.": GET failure, got HTTP {$response['http_code']}\n" ); + wfDebug( __METHOD__ . ": GET failure, got HTTP {$response['http_code']}\n" ); wfProfileOut( __METHOD__ ); - return false; + return false; } $body = $response['body']; $type = $response['content_type']; if ( $type == 'application/vnd.php.serialized+deflate' ) { $body = gzinflate( $body ); if ( !$body ) { - wfDebug( __METHOD__.": error inflating $key\n" ); + wfDebug( __METHOD__ . ": error inflating $key\n" ); wfProfileOut( __METHOD__ ); return false; } @@ -91,11 +93,13 @@ class EhcacheBagOStuff extends BagOStuff { } elseif ( $type == 'application/vnd.php.serialized' ) { $data = unserialize( $body ); } else { - wfDebug( __METHOD__.": unknown content type \"$type\"\n" ); + wfDebug( __METHOD__ . ": unknown content type \"$type\"\n" ); wfProfileOut( __METHOD__ ); return false; } + $casToken = $body; + wfProfileOut( __METHOD__ ); return $data; } @@ -123,7 +127,7 @@ class EhcacheBagOStuff extends BagOStuff { if ( $code == 404 ) { // Maybe the cache does not exist yet, let's try creating it if ( !$this->createCache( $key ) ) { - wfDebug( __METHOD__.": cache creation failed\n" ); + wfDebug( __METHOD__ . ": cache creation failed\n" ); wfProfileOut( __METHOD__ ); return false; } @@ -132,9 +136,9 @@ class EhcacheBagOStuff extends BagOStuff { $result = false; if ( !$code ) { - wfDebug( __METHOD__.": PUT failure for key $key\n" ); + wfDebug( __METHOD__ . ": PUT failure for key $key\n" ); } elseif ( $code >= 300 ) { - wfDebug( __METHOD__.": PUT failure for key $key: HTTP $code\n" ); + wfDebug( __METHOD__ . ": PUT failure for key $key: HTTP $code\n" ); } else { $result = true; } @@ -143,6 +147,20 @@ class EhcacheBagOStuff extends BagOStuff { return $result; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + // Not sure if we can implement CAS for ehcache. There appears to be CAS-support per + // http://ehcache.org/documentation/get-started/consistency-options#cas-cache-operations, + // but I can't find any docs for our current implementation. + throw new MWException( "CAS is not implemented in " . __CLASS__ ); + } + /** * @param $key string * @param $time int @@ -154,7 +172,7 @@ class EhcacheBagOStuff extends BagOStuff { array( CURLOPT_CUSTOMREQUEST => 'DELETE' ) ); $code = isset( $response['http_code'] ) ? $response['http_code'] : 0; if ( !$response || ( $code != 404 && $code >= 300 ) ) { - wfDebug( __METHOD__.": DELETE failure for key $key\n" ); + wfDebug( __METHOD__ . ": DELETE failure for key $key\n" ); $result = false; } else { $result = true; @@ -163,6 +181,14 @@ class EhcacheBagOStuff extends BagOStuff { return $result; } + /** + * @see BagOStuff::merge() + * @return bool success + */ + public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + return $this->mergeViaLock( $key, $callback, $exptime, $attempts ); + } + /** * @param $key string * @return string @@ -202,9 +228,9 @@ class EhcacheBagOStuff extends BagOStuff { * @return int */ protected function attemptPut( $key, $data, $type, $ttl ) { - // In initial benchmarking, it was 30 times faster to use CURLOPT_POST + // In initial benchmarking, it was 30 times faster to use CURLOPT_POST // than CURLOPT_UPLOAD with CURLOPT_READFUNCTION. This was because - // CURLOPT_UPLOAD was pushing the request headers first, then waiting + // CURLOPT_UPLOAD was pushing the request headers first, then waiting // for an ACK packet, then sending the data, whereas CURLOPT_POST just // sends the headers and the data in a single send(). $response = $this->doItemRequest( $key, @@ -230,15 +256,15 @@ class EhcacheBagOStuff extends BagOStuff { * @return bool */ protected function createCache( $key ) { - wfDebug( __METHOD__.": creating cache for $key\n" ); - $response = $this->doCacheRequest( $key, + wfDebug( __METHOD__ . ": creating cache for $key\n" ); + $response = $this->doCacheRequest( $key, array( CURLOPT_POST => 1, CURLOPT_CUSTOMREQUEST => 'PUT', CURLOPT_POSTFIELDS => '', ) ); if ( !$response ) { - wfDebug( __CLASS__.": failed to create cache for $key\n" ); + wfDebug( __CLASS__ . ": failed to create cache for $key\n" ); return false; } return ( $response['http_code'] == 201 /* created */ @@ -278,8 +304,8 @@ class EhcacheBagOStuff extends BagOStuff { protected function doRequest( $curl, $url, $curlOptions = array() ) { if ( array_diff_key( $curlOptions, $this->curlOptions ) ) { // var_dump( array_diff_key( $curlOptions, $this->curlOptions ) ); - throw new MWException( __METHOD__.": to prevent options set in one doRequest() " . - "call from affecting subsequent doRequest() calls, only options listed " . + throw new MWException( __METHOD__ . ": to prevent options set in one doRequest() " . + "call from affecting subsequent doRequest() calls, only options listed " . "in \$this->curlOptions may be specified in the \$curlOptions parameter." ); } $curlOptions += $this->curlOptions; @@ -288,7 +314,7 @@ class EhcacheBagOStuff extends BagOStuff { curl_setopt_array( $curl, $curlOptions ); $result = curl_exec( $curl ); if ( $result === false ) { - wfDebug( __CLASS__.": curl error: " . curl_error( $curl ) . "\n" ); + wfDebug( __CLASS__ . ": curl error: " . curl_error( $curl ) . "\n" ); return false; } $info = curl_getinfo( $curl ); diff --git a/includes/objectcache/EmptyBagOStuff.php b/includes/objectcache/EmptyBagOStuff.php index bd28b241..62060579 100644 --- a/includes/objectcache/EmptyBagOStuff.php +++ b/includes/objectcache/EmptyBagOStuff.php @@ -30,9 +30,10 @@ class EmptyBagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] mixed * @return bool */ - function get( $key ) { + function get( $key, &$casToken = null ) { return false; } @@ -46,6 +47,17 @@ class EmptyBagOStuff extends BagOStuff { return true; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exp int + * @return bool + */ + function cas( $casToken, $key, $value, $exp = 0 ) { + return true; + } + /** * @param $key string * @param $time int @@ -54,6 +66,17 @@ class EmptyBagOStuff extends BagOStuff { function delete( $key, $time = 0 ) { return true; } + + /** + * @param $key string + * @param $callback closure Callback method to be executed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @param int $attempts The amount of times to attempt a merge in case of failure + * @return bool success + */ + public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + return true; + } } /** diff --git a/includes/objectcache/HashBagOStuff.php b/includes/objectcache/HashBagOStuff.php index 799f26a3..d061eff0 100644 --- a/includes/objectcache/HashBagOStuff.php +++ b/includes/objectcache/HashBagOStuff.php @@ -52,9 +52,10 @@ class HashBagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] mixed * @return bool|mixed */ - function get( $key ) { + function get( $key, &$casToken = null ) { if ( !isset( $this->bag[$key] ) ) { return false; } @@ -63,6 +64,8 @@ class HashBagOStuff extends BagOStuff { return false; } + $casToken = $this->bag[$key][0]; + return $this->bag[$key][0]; } @@ -77,6 +80,21 @@ class HashBagOStuff extends BagOStuff { return true; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exptime int + * @return bool + */ + function cas( $casToken, $key, $value, $exptime = 0 ) { + if ( $this->get( $key ) === $casToken ) { + return $this->set( $key, $value, $exptime ); + } + + return false; + } + /** * @param $key string * @param $time int @@ -91,12 +109,4 @@ class HashBagOStuff extends BagOStuff { return true; } - - /** - * @return array - */ - function keys() { - return array_keys( $this->bag ); - } } - diff --git a/includes/objectcache/MemcachedBagOStuff.php b/includes/objectcache/MemcachedBagOStuff.php index 813c2727..3f1fa3a0 100644 --- a/includes/objectcache/MemcachedBagOStuff.php +++ b/includes/objectcache/MemcachedBagOStuff.php @@ -57,10 +57,11 @@ class MemcachedBagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] mixed * @return Mixed */ - public function get( $key ) { - return $this->client->get( $this->encodeKey( $key ) ); + public function get( $key, &$casToken = null ) { + return $this->client->get( $this->encodeKey( $key ), $casToken ); } /** @@ -74,6 +75,18 @@ class MemcachedBagOStuff extends BagOStuff { $this->fixExpiry( $exptime ) ); } + /** + * @param $key string + * @param $casToken mixed + * @param $value + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + return $this->client->cas( $casToken, $this->encodeKey( $key ), + $value, $this->fixExpiry( $exptime ) ); + } + /** * @param $key string * @param $time int @@ -86,7 +99,7 @@ class MemcachedBagOStuff extends BagOStuff { /** * @param $key string * @param $value int - * @param $exptime int (default 0) + * @param int $exptime (default 0) * @return Mixed */ public function add( $key, $value, $exptime = 0 ) { @@ -101,7 +114,7 @@ class MemcachedBagOStuff extends BagOStuff { * @return Mixed */ public function replace( $key, $value, $exptime = 0 ) { - return $this->client->replace( $this->encodeKey( $key ), $value, + return $this->client->replace( $this->encodeKey( $key ), $value, $this->fixExpiry( $exptime ) ); } @@ -166,15 +179,9 @@ class MemcachedBagOStuff extends BagOStuff { * Send a debug message to the log */ protected function debugLog( $text ) { - global $wgDebugLogGroups; - if( !isset( $wgDebugLogGroups['memcached'] ) ) { - # Prefix message since it will end up in main debug log file - $text = "memcached: $text"; - } if ( substr( $text, -1 ) !== "\n" ) { $text .= "\n"; } wfDebugLog( 'memcached', $text ); } } - diff --git a/includes/objectcache/MemcachedClient.php b/includes/objectcache/MemcachedClient.php index 536ba6ea..0d96ed6c 100644 --- a/includes/objectcache/MemcachedClient.php +++ b/includes/objectcache/MemcachedClient.php @@ -99,7 +99,6 @@ class MWMemcached { // }}} - /** * Command statistics * @@ -242,7 +241,7 @@ class MWMemcached { /** * Memcache initializer * - * @param $args Array Associative array of settings + * @param array $args Associative array of settings * * @return mixed */ @@ -272,12 +271,12 @@ class MWMemcached { * Adds a key/value to the memcache server if one isn't already set with * that key * - * @param $key String: key to set with data + * @param string $key key to set with data * @param $val Mixed: value to store * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or * longer must be the timestamp of the time at which the mapping should expire. It - * is safe to use timestamps in all cases, regardless of exipration + * is safe to use timestamps in all cases, regardless of expiration * eg: strtotime("+3 hour") * * @return Boolean @@ -292,7 +291,7 @@ class MWMemcached { /** * Decrease a value stored on the memcache server * - * @param $key String: key to decrease + * @param string $key key to decrease * @param $amt Integer: (optional) amount to decrease * * @return Mixed: FALSE on failure, value on success @@ -307,7 +306,7 @@ class MWMemcached { /** * Deletes a key from the server, optionally after $time * - * @param $key String: key to delete + * @param string $key key to delete * @param $time Integer: (optional) how long to wait before deleting * * @return Boolean: TRUE on success, FALSE on failure @@ -407,11 +406,12 @@ class MWMemcached { /** * Retrieves the value associated with the key from the memcache server * - * @param $key array|string key to retrieve + * @param array|string $key key to retrieve + * @param $casToken[optional] Float * * @return Mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { wfProfileIn( __METHOD__ ); if ( $this->_debug ) { @@ -437,14 +437,14 @@ class MWMemcached { $this->stats['get'] = 1; } - $cmd = "get $key\r\n"; + $cmd = "gets $key\r\n"; if ( !$this->_fwrite( $sock, $cmd ) ) { wfProfileOut( __METHOD__ ); return false; } $val = array(); - $this->_load_items( $sock, $val ); + $this->_load_items( $sock, $val, $casToken ); if ( $this->_debug ) { foreach ( $val as $k => $v ) { @@ -466,7 +466,7 @@ class MWMemcached { /** * Get multiple keys from the server(s) * - * @param $keys Array: keys to retrieve + * @param array $keys keys to retrieve * * @return Array */ @@ -498,7 +498,7 @@ class MWMemcached { $gather = array(); // Send out the requests foreach ( $socks as $sock ) { - $cmd = 'get'; + $cmd = 'gets'; foreach ( $sock_keys[ intval( $sock ) ] as $key ) { $cmd .= ' ' . $key; } @@ -512,7 +512,7 @@ class MWMemcached { // Parse responses $val = array(); foreach ( $gather as $sock ) { - $this->_load_items( $sock, $val ); + $this->_load_items( $sock, $val, $casToken ); } if ( $this->_debug ) { @@ -530,7 +530,7 @@ class MWMemcached { /** * Increments $key (optionally) by $amt * - * @param $key String: key to increment + * @param string $key key to increment * @param $amt Integer: (optional) amount to increment * * @return Integer: null if the key does not exist yet (this does NOT @@ -547,7 +547,7 @@ class MWMemcached { /** * Overwrites an existing value for key; only works if key is already set * - * @param $key String: key to set value as + * @param string $key key to set value as * @param $value Mixed: value to store * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or @@ -569,7 +569,7 @@ class MWMemcached { * output as an array (null array if no output) * * @param $sock Resource: socket to send command on - * @param $cmd String: command to run + * @param string $cmd command to run * * @return Array: output array */ @@ -603,7 +603,7 @@ class MWMemcached { * Unconditionally sets a key to a given value in the memcache. Returns true * if set successfully. * - * @param $key String: key to set value as + * @param string $key key to set value as * @param $value Mixed: value to set * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or @@ -617,6 +617,28 @@ class MWMemcached { return $this->_set( 'set', $key, $value, $exp ); } + // }}} + // {{{ cas() + + /** + * Sets a key to a given value in the memcache if the current value still corresponds + * to a known, given value. Returns true if set successfully. + * + * @param $casToken Float: current known value + * @param string $key key to set value as + * @param $value Mixed: value to set + * @param $exp Integer: (optional) Expiration time. This can be a number of seconds + * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or + * longer must be the timestamp of the time at which the mapping should expire. It + * is safe to use timestamps in all cases, regardless of exipration + * eg: strtotime("+3 hour") + * + * @return Boolean: TRUE on success + */ + public function cas( $casToken, $key, $value, $exp = 0 ) { + return $this->_set( 'cas', $key, $value, $exp, $casToken ); + } + // }}} // {{{ set_compress_threshold() @@ -649,7 +671,7 @@ class MWMemcached { /** * Sets the server list to distribute key gets and puts between * - * @param $list Array of servers to connect to + * @param array $list of servers to connect to * * @see MWMemcached::__construct() */ @@ -684,7 +706,7 @@ class MWMemcached { /** * Close the specified socket * - * @param $sock String: socket to close + * @param string $sock socket to close * * @access private */ @@ -701,7 +723,7 @@ class MWMemcached { * Connects $sock to $host, timing out after $timeout * * @param $sock Integer: socket to connect - * @param $host String: Host:IP to connect to + * @param string $host Host:IP to connect to * * @return boolean * @access private @@ -743,7 +765,7 @@ class MWMemcached { /** * Marks a host as dead until 30-40 seconds in the future * - * @param $sock String: socket to mark as dead + * @param string $sock socket to mark as dead * * @access private */ @@ -769,7 +791,7 @@ class MWMemcached { /** * get_sock * - * @param $key String: key to retrieve value for; + * @param string $key key to retrieve value for; * * @return Mixed: resource on success, false on failure * @access private @@ -818,7 +840,7 @@ class MWMemcached { /** * Creates a hash integer based on the $key * - * @param $key String: key to hash + * @param string $key key to hash * * @return Integer: hash value * @access private @@ -836,8 +858,8 @@ class MWMemcached { /** * Perform increment/decriment on $key * - * @param $cmd String command to perform - * @param $key String|array key to perform it on + * @param string $cmd command to perform + * @param string|array $key key to perform it on * @param $amt Integer amount to adjust * * @return Integer: new value of $key @@ -878,40 +900,78 @@ class MWMemcached { * Load items into $ret from $sock * * @param $sock Resource: socket to read from - * @param $ret Array: returned values + * @param array $ret returned values + * @param $casToken[optional] Float * @return boolean True for success, false for failure * * @access private */ - function _load_items( $sock, &$ret ) { + function _load_items( $sock, &$ret, &$casToken = null ) { + $results = array(); + while ( 1 ) { $decl = $this->_fgets( $sock ); + if( $decl === false ) { + /* + * If nothing can be read, something is wrong because we know exactly when + * to stop reading (right after "END") and we return right after that. + */ return false; + } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+) (\d+)$/', $decl, $match ) ) { + /* + * Read all data returned. This can be either one or multiple values. + * Save all that data (in an array) to be processed later: we'll first + * want to continue reading until "END" before doing anything else, + * to make sure that we don't leave our client in a state where it's + * output is not yet fully read. + */ + $results[] = array( + $match[1], // rkey + $match[2], // flags + $match[3], // len + $match[4], // casToken + $this->_fread( $sock, $match[3] + 2 ), // data + ); } elseif ( $decl == "END" ) { - return true; - } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)$/', $decl, $match ) ) { - list( $rkey, $flags, $len ) = array( $match[1], $match[2], $match[3] ); - $data = $this->_fread( $sock, $len + 2 ); - if ( $data === false ) { - return false; - } - if ( substr( $data, -2 ) !== "\r\n" ) { - $this->_handle_error( $sock, - 'line ending missing from data block from $1' ); + if ( count( $results ) == 0 ) { return false; } - $data = substr( $data, 0, -2 ); - $ret[$rkey] = $data; - if ( $this->_have_zlib && $flags & self::COMPRESSED ) { - $ret[$rkey] = gzuncompress( $ret[$rkey] ); - } + /** + * All data has been read, time to process the data and build + * meaningful return values. + */ + foreach ( $results as $vars ) { + list( $rkey, $flags, $len, $casToken, $data ) = $vars; + + if ( $data === false || substr( $data, -2 ) !== "\r\n" ) { + $this->_handle_error( $sock, + 'line ending missing from data block from $1' ); + return false; + } + $data = substr( $data, 0, -2 ); + $ret[$rkey] = $data; + + if ( $this->_have_zlib && $flags & self::COMPRESSED ) { + $ret[$rkey] = gzuncompress( $ret[$rkey] ); + } - if ( $flags & self::SERIALIZED ) { - $ret[$rkey] = unserialize( $ret[$rkey] ); + /* + * This unserialize is the exact reason that we only want to + * process data after having read until "END" (instead of doing + * this right away): "unserialize" can trigger outside code: + * in the event that $ret[$rkey] is a serialized object, + * unserializing it will trigger __wakeup() if present. If that + * function attempted to read from memcached (while we did not + * yet read "END"), these 2 calls would collide. + */ + if ( $flags & self::SERIALIZED ) { + $ret[$rkey] = unserialize( $ret[$rkey] ); + } } + return true; } else { $this->_handle_error( $sock, 'Error parsing response from $1' ); return false; @@ -925,19 +985,20 @@ class MWMemcached { /** * Performs the requested storage operation to the memcache server * - * @param $cmd String: command to perform - * @param $key String: key to act on + * @param string $cmd command to perform + * @param string $key key to act on * @param $val Mixed: what we need to store * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or * longer must be the timestamp of the time at which the mapping should expire. It * is safe to use timestamps in all cases, regardless of exipration * eg: strtotime("+3 hour") + * @param $casToken[optional] Float * * @return Boolean * @access private */ - function _set( $cmd, $key, $val, $exp ) { + function _set( $cmd, $key, $val, $exp, $casToken = null ) { if ( !$this->_active ) { return false; } @@ -966,7 +1027,7 @@ class MWMemcached { $len = strlen( $val ); if ( $this->_have_zlib && $this->_compress_enable && - $this->_compress_threshold && $len >= $this->_compress_threshold ) + $this->_compress_threshold && $len >= $this->_compress_threshold ) { $c_val = gzcompress( $val, 9 ); $c_len = strlen( $c_val ); @@ -980,7 +1041,13 @@ class MWMemcached { $flags |= self::COMPRESSED; } } - if ( !$this->_fwrite( $sock, "$cmd $key $flags $exp $len\r\n$val\r\n" ) ) { + + $command = "$cmd $key $flags $exp $len"; + if ( $casToken ) { + $command .= " $casToken"; + } + + if ( !$this->_fwrite( $sock, "$command\r\n$val\r\n" ) ) { return false; } @@ -1001,7 +1068,7 @@ class MWMemcached { /** * Returns the socket for the host * - * @param $host String: Host:IP to get socket for + * @param string $host Host:IP to get socket for * * @return Mixed: IO Stream or false * @access private @@ -1036,11 +1103,6 @@ class MWMemcached { * @param $text string */ function _debugprint( $text ) { - global $wgDebugLogGroups; - if( !isset( $wgDebugLogGroups['memcached'] ) ) { - # Prefix message since it will end up in main debug log file - $text = "memcached: $text"; - } wfDebugLog( 'memcached', $text ); } @@ -1096,7 +1158,7 @@ class MWMemcached { } /** - * Read the specified number of bytes from a stream. If there is an error, + * Read the specified number of bytes from a stream. If there is an error, * mark the socket dead. * * @param $sock The socket @@ -1137,7 +1199,7 @@ class MWMemcached { function _fgets( $sock ) { $result = fgets( $sock ); // fgets() may return a partial line if there is a select timeout after - // a successful recv(), so we have to check for a timeout even if we + // a successful recv(), so we have to check for a timeout even if we // got a string response. $data = stream_get_meta_data( $sock ); if ( $data['timed_out'] ) { @@ -1167,10 +1229,16 @@ class MWMemcached { if ( !is_resource( $f ) ) { return; } - $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); + $r = array( $f ); + $w = null; + $e = null; + $n = stream_select( $r, $w, $e, 0, 0 ); while ( $n == 1 && !feof( $f ) ) { fread( $f, 1024 ); - $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); + $r = array( $f ); + $w = null; + $e = null; + $n = stream_select( $r, $w, $e, 0, 0 ); } } @@ -1179,7 +1247,6 @@ class MWMemcached { // }}} } - // }}} class MemCachedClientforWiki extends MWMemcached { diff --git a/includes/objectcache/MemcachedPeclBagOStuff.php b/includes/objectcache/MemcachedPeclBagOStuff.php index 76886ebb..31924293 100644 --- a/includes/objectcache/MemcachedPeclBagOStuff.php +++ b/includes/objectcache/MemcachedPeclBagOStuff.php @@ -47,7 +47,7 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { if ( $params['persistent'] ) { // The pool ID must be unique to the server/option combination. // The Memcached object is essentially shared for each pool ID. - // We can only resuse a pool ID if we keep the config consistent. + // We can only reuse a pool ID if we keep the config consistent. $this->client = new Memcached( md5( serialize( $params ) ) ); if ( count( $this->client->getServerList() ) ) { wfDebug( __METHOD__ . ": persistent Memcached object already loaded.\n" ); @@ -104,11 +104,16 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { /** * @param $key string + * @param $casToken[optional] float * @return Mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { + wfProfileIn( __METHOD__ ); $this->debugLog( "get($key)" ); - return $this->checkResult( $key, parent::get( $key ) ); + $result = $this->client->get( $this->encodeKey( $key ), null, $casToken ); + $result = $this->checkResult( $key, $result ); + wfProfileOut( __METHOD__ ); + return $result; } /** @@ -122,6 +127,18 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { return $this->checkResult( $key, parent::set( $key, $value, $exptime ) ); } + /** + * @param $casToken float + * @param $key string + * @param $value + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + $this->debugLog( "cas($key)" ); + return $this->checkResult( $key, parent::cas( $casToken, $key, $value, $exptime ) ); + } + /** * @param $key string * @param $time int @@ -189,7 +206,7 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { * the client, but some day we might find a case where it should be * different. * - * @param $key string The key used by the caller, or false if there wasn't one. + * @param string $key The key used by the caller, or false if there wasn't one. * @param $result Mixed The return value * @return Mixed */ @@ -224,9 +241,11 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { * @return Array */ public function getMulti( array $keys ) { + wfProfileIn( __METHOD__ ); $this->debugLog( 'getMulti(' . implode( ', ', $keys ) . ')' ); $callback = array( $this, 'encodeKey' ); $result = $this->client->getMulti( array_map( $callback, $keys ) ); + wfProfileOut( __METHOD__ ); return $this->checkResult( false, $result ); } diff --git a/includes/objectcache/MemcachedPhpBagOStuff.php b/includes/objectcache/MemcachedPhpBagOStuff.php index a46dc716..33a134c7 100644 --- a/includes/objectcache/MemcachedPhpBagOStuff.php +++ b/includes/objectcache/MemcachedPhpBagOStuff.php @@ -81,7 +81,7 @@ class MemcachedPhpBagOStuff extends MemcachedBagOStuff { public function unlock( $key ) { return $this->client->unlock( $this->encodeKey( $key ) ); } - + /** * @param $key string * @param $value int @@ -100,4 +100,3 @@ class MemcachedPhpBagOStuff extends MemcachedBagOStuff { return $this->client->decr( $this->encodeKey( $key ), $value ); } } - diff --git a/includes/objectcache/MultiWriteBagOStuff.php b/includes/objectcache/MultiWriteBagOStuff.php index e496ddd8..92afaacd 100644 --- a/includes/objectcache/MultiWriteBagOStuff.php +++ b/includes/objectcache/MultiWriteBagOStuff.php @@ -22,8 +22,8 @@ */ /** - * A cache class that replicates all writes to multiple child caches. Reads - * are implemented by reading from the caches in the order they are given in + * A cache class that replicates all writes to multiple child caches. Reads + * are implemented by reading from the caches in the order they are given in * the configuration until a cache gives a positive result. * * @ingroup Cache @@ -61,9 +61,10 @@ class MultiWriteBagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] mixed * @return bool|mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { foreach ( $this->caches as $cache ) { $value = $cache->get( $key ); if ( $value !== false ) { @@ -73,6 +74,17 @@ class MultiWriteBagOStuff extends BagOStuff { return false; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + throw new MWException( "CAS is not implemented in " . __CLASS__ ); + } + /** * @param $key string * @param $value mixed @@ -156,6 +168,17 @@ class MultiWriteBagOStuff extends BagOStuff { } } + /** + * @param $key string + * @param $callback closure Callback method to be executed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @param int $attempts The amount of times to attempt a merge in case of failure + * @return bool success + */ + public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + return $this->doWrite( 'merge', $key, $callback, $exptime ); + } + /** * @param $method string * @return bool diff --git a/includes/objectcache/ObjectCache.php b/includes/objectcache/ObjectCache.php index 9b360f32..eafa836a 100644 --- a/includes/objectcache/ObjectCache.php +++ b/includes/objectcache/ObjectCache.php @@ -123,7 +123,7 @@ class ObjectCache { * @return ObjectCache */ static function newAccelerator( $params ) { - if ( function_exists( 'apc_fetch') ) { + if ( function_exists( 'apc_fetch' ) ) { $id = 'apc'; } elseif( function_exists( 'xcache_get' ) && wfIniGetBool( 'xcache.var_size' ) ) { $id = 'xcache'; @@ -139,9 +139,9 @@ class ObjectCache { /** * Factory function that creates a memcached client object. * - * This always uses the PHP client, since the PECL client has a different - * hashing scheme and a different interpretation of the flags bitfield, so - * switching between the two clients randomly would be disasterous. + * This always uses the PHP client, since the PECL client has a different + * hashing scheme and a different interpretation of the flags bitfield, so + * switching between the two clients randomly would be disastrous. * * @param $params array * diff --git a/includes/objectcache/ObjectCacheSessionHandler.php b/includes/objectcache/ObjectCacheSessionHandler.php index f55da94d..bc76294a 100644 --- a/includes/objectcache/ObjectCacheSessionHandler.php +++ b/includes/objectcache/ObjectCacheSessionHandler.php @@ -58,7 +58,7 @@ class ObjectCacheSessionHandler { /** * Get a cache key for the given session id. * - * @param $id String: session id + * @param string $id session id * @return String: cache key */ static function getKey( $id ) { @@ -89,7 +89,7 @@ class ObjectCacheSessionHandler { /** * Callback when reading session data. * - * @param $id String: session id + * @param string $id session id * @return Mixed: session data */ static function read( $id ) { @@ -103,7 +103,7 @@ class ObjectCacheSessionHandler { /** * Callback when writing session data. * - * @param $id String: session id + * @param string $id session id * @param $data Mixed: session data * @return Boolean: success */ @@ -116,7 +116,7 @@ class ObjectCacheSessionHandler { /** * Callback to destroy a session when calling session_destroy(). * - * @param $id String: session id + * @param string $id session id * @return Boolean: success */ static function destroy( $id ) { diff --git a/includes/objectcache/RedisBagOStuff.php b/includes/objectcache/RedisBagOStuff.php index c5966cdb..f9feaf9d 100644 --- a/includes/objectcache/RedisBagOStuff.php +++ b/includes/objectcache/RedisBagOStuff.php @@ -20,29 +20,13 @@ * @file */ - class RedisBagOStuff extends BagOStuff { - protected $connectTimeout, $persistent, $password, $automaticFailover; - - /** - * A list of server names, from $params['servers'] - */ + /** @var RedisConnectionPool */ + protected $redisPool; + /** @var Array List of server names */ protected $servers; - - /** - * A cache of Redis objects, representing connections to Redis servers. - * The key is the server name. - */ - protected $conns = array(); - - /** - * An array listing "dead" servers which have had a connection error in - * the past. Servers are marked dead for a limited period of time, to - * avoid excessive overhead from repeated connection timeouts. The key in - * the array is the server name, the value is the UNIX timestamp at which - * the server is resurrected. - */ - protected $deadServers = array(); + /** @var bool */ + protected $automaticFailover; /** * Construct a RedisBagOStuff object. Parameters are: @@ -71,18 +55,15 @@ class RedisBagOStuff extends BagOStuff { * flap, for example if it is in swap death. */ function __construct( $params ) { - if ( !extension_loaded( 'redis' ) ) { - throw new MWException( __CLASS__. ' requires the phpredis extension: ' . - 'https://github.com/nicolasff/phpredis' ); + $redisConf = array( 'serializer' => 'php' ); + foreach ( array( 'connectTimeout', 'persistent', 'password' ) as $opt ) { + if ( isset( $params[$opt] ) ) { + $redisConf[$opt] = $params[$opt]; + } } + $this->redisPool = RedisConnectionPool::singleton( $redisConf ); $this->servers = $params['servers']; - $this->connectTimeout = isset( $params['connectTimeout'] ) - ? $params['connectTimeout'] : 1; - $this->persistent = !empty( $params['persistent'] ); - if ( isset( $params['password'] ) ) { - $this->password = $params['password']; - } if ( isset( $params['automaticFailover'] ) ) { $this->automaticFailover = $params['automaticFailover']; } else { @@ -90,7 +71,7 @@ class RedisBagOStuff extends BagOStuff { } } - public function get( $key ) { + public function get( $key, &$casToken = null ) { wfProfileIn( __METHOD__ ); list( $server, $conn ) = $this->getConnection( $key ); if ( !$conn ) { @@ -101,8 +82,9 @@ class RedisBagOStuff extends BagOStuff { $result = $conn->get( $key ); } catch ( RedisException $e ) { $result = false; - $this->handleException( $server, $e ); + $this->handleException( $server, $conn, $e ); } + $casToken = $result; $this->logRequest( 'get', $key, $server, $result ); wfProfileOut( __METHOD__ ); return $result; @@ -125,7 +107,7 @@ class RedisBagOStuff extends BagOStuff { } } catch ( RedisException $e ) { $result = false; - $this->handleException( $server, $e ); + $this->handleException( $server, $conn, $e ); } $this->logRequest( 'set', $key, $server, $result ); @@ -133,6 +115,42 @@ class RedisBagOStuff extends BagOStuff { return $result; } + public function cas( $casToken, $key, $value, $expiry = 0 ) { + wfProfileIn( __METHOD__ ); + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + wfProfileOut( __METHOD__ ); + return false; + } + $expiry = $this->convertToRelative( $expiry ); + try { + $conn->watch( $key ); + + if ( $this->get( $key ) !== $casToken ) { + wfProfileOut( __METHOD__ ); + return false; + } + + $conn->multi(); + + if ( !$expiry ) { + // No expiry, that is very different from zero expiry in Redis + $conn->set( $key, $value ); + } else { + $conn->setex( $key, $expiry, $value ); + } + + $result = $conn->exec(); + } catch ( RedisException $e ) { + $result = false; + $this->handleException( $server, $conn, $e ); + } + + $this->logRequest( 'cas', $key, $server, $result ); + wfProfileOut( __METHOD__ ); + return $result; + } + public function delete( $key, $time = 0 ) { wfProfileIn( __METHOD__ ); list( $server, $conn ) = $this->getConnection( $key ); @@ -146,7 +164,7 @@ class RedisBagOStuff extends BagOStuff { $result = true; } catch ( RedisException $e ) { $result = false; - $this->handleException( $server, $e ); + $this->handleException( $server, $conn, $e ); } $this->logRequest( 'delete', $key, $server, $result ); wfProfileOut( __METHOD__ ); @@ -184,7 +202,7 @@ class RedisBagOStuff extends BagOStuff { } } } catch ( RedisException $e ) { - $this->handleException( $server, $e ); + $this->handleException( $server, $conn, $e ); } } @@ -209,7 +227,7 @@ class RedisBagOStuff extends BagOStuff { } } catch ( RedisException $e ) { $result = false; - $this->handleException( $server, $e ); + $this->handleException( $server, $conn, $e ); } $this->logRequest( 'add', $key, $server, $result ); wfProfileOut( __METHOD__ ); @@ -241,7 +259,7 @@ class RedisBagOStuff extends BagOStuff { } } catch ( RedisException $e ) { $result = false; - $this->handleException( $server, $e ); + $this->handleException( $server, $conn, $e ); } $this->logRequest( 'replace', $key, $server, $result ); @@ -273,7 +291,7 @@ class RedisBagOStuff extends BagOStuff { $result = $conn->incrBy( $key, $value ); } catch ( RedisException $e ) { $result = false; - $this->handleException( $server, $e ); + $this->handleException( $server, $conn, $e ); } $this->logRequest( 'incr', $key, $server, $result ); @@ -283,27 +301,21 @@ class RedisBagOStuff extends BagOStuff { /** * Get a Redis object with a connection suitable for fetching the specified key + * @return Array (server, RedisConnRef) or (false, false) */ protected function getConnection( $key ) { if ( count( $this->servers ) === 1 ) { $candidates = $this->servers; } else { - // Use consistent hashing - $hashes = array(); - foreach ( $this->servers as $server ) { - $hashes[$server] = md5( $server . '/' . $key ); - } - asort( $hashes ); + $candidates = $this->servers; + ArrayUtils::consistentHashSort( $candidates, $key, '/' ); if ( !$this->automaticFailover ) { - reset( $hashes ); - $candidates = array( key( $hashes ) ); - } else { - $candidates = array_keys( $hashes ); + $candidates = array_slice( $candidates, 0, 1 ); } } foreach ( $candidates as $server ) { - $conn = $this->getConnectionToServer( $server ); + $conn = $this->redisPool->getConnection( $server ); if ( $conn ) { return array( $server, $conn ); } @@ -311,79 +323,6 @@ class RedisBagOStuff extends BagOStuff { return array( false, false ); } - /** - * Get a connection to the server with the specified name. Connections - * are cached, and failures are persistent to avoid multiple timeouts. - * - * @return Redis object, or false on failure - */ - protected function getConnectionToServer( $server ) { - if ( isset( $this->deadServers[$server] ) ) { - $now = time(); - if ( $now > $this->deadServers[$server] ) { - // Dead time expired - unset( $this->deadServers[$server] ); - } else { - // Server is dead - $this->debug( "server $server is marked down for another " . - ($this->deadServers[$server] - $now ) . - " seconds, can't get connection" ); - return false; - } - } - - if ( isset( $this->conns[$server] ) ) { - return $this->conns[$server]; - } - - if ( substr( $server, 0, 1 ) === '/' ) { - // UNIX domain socket - // These are required by the redis extension to start with a slash, but - // we still need to set the port to a special value to make it work. - $host = $server; - $port = 0; - } else { - // TCP connection - $hostPort = IP::splitHostAndPort( $server ); - if ( !$hostPort ) { - throw new MWException( __CLASS__.": invalid configured server \"$server\"" ); - } - list( $host, $port ) = $hostPort; - if ( $port === false ) { - $port = 6379; - } - } - $conn = new Redis; - try { - if ( $this->persistent ) { - $this->debug( "opening persistent connection to $host:$port" ); - $result = $conn->pconnect( $host, $port, $this->connectTimeout ); - } else { - $this->debug( "opening non-persistent connection to $host:$port" ); - $result = $conn->connect( $host, $port, $this->connectTimeout ); - } - if ( !$result ) { - $this->logError( "could not connect to server $server" ); - // Mark server down for 30s to avoid further timeouts - $this->deadServers[$server] = time() + 30; - return false; - } - if ( $this->password !== null ) { - if ( !$conn->auth( $this->password ) ) { - $this->logError( "authentication error connecting to $server" ); - } - } - } catch ( RedisException $e ) { - $this->deadServers[$server] = time() + 30; - wfDebugLog( 'redis', "Redis exception: " . $e->getMessage() . "\n" ); - return false; - } - - $conn->setOption( Redis::OPT_SERIALIZER, Redis::SERIALIZER_PHP ); - $this->conns[$server] = $conn; - return $conn; - } - /** * Log a fatal error */ @@ -397,9 +336,8 @@ class RedisBagOStuff extends BagOStuff { * not. The safest response for us is to explicitly destroy the connection * object and let it be reopened during the next request. */ - protected function handleException( $server, $e ) { - wfDebugLog( 'redis', "Redis exception on server $server: " . $e->getMessage() . "\n" ); - unset( $this->conns[$server] ); + protected function handleException( $server, RedisConnRef $conn, $e ) { + $this->redisPool->handleException( $server, $conn, $e ); } /** @@ -410,4 +348,3 @@ class RedisBagOStuff extends BagOStuff { ( $result === false ? "failure" : "success" ) ); } } - diff --git a/includes/objectcache/SqlBagOStuff.php b/includes/objectcache/SqlBagOStuff.php index 54051dc1..87f787d8 100644 --- a/includes/objectcache/SqlBagOStuff.php +++ b/includes/objectcache/SqlBagOStuff.php @@ -32,23 +32,26 @@ class SqlBagOStuff extends BagOStuff { */ var $lb; - /** - * @var DatabaseBase - */ - var $db; - var $serverInfo; + var $serverInfos; + var $serverNames; + var $numServers; + var $conns; var $lastExpireAll = 0; var $purgePeriod = 100; var $shards = 1; var $tableName = 'objectcache'; - protected $connFailureTime = 0; // UNIX timestamp - protected $connFailureError; // exception + protected $connFailureTimes = array(); // UNIX timestamps + protected $connFailureErrors = array(); // exceptions /** * Constructor. Parameters are: - * - server: A server info structure in the format required by each - * element in $wgDBServers. + * - server: A server info structure in the format required by each + * element in $wgDBServers. + * + * - servers: An array of server info structures describing a set of + * database servers to distribute keys to. If this is + * specified, the "server" option will be ignored. * * - purgePeriod: The average number of object cache requests in between * garbage collection operations, where expired entries @@ -59,8 +62,8 @@ class SqlBagOStuff extends BagOStuff { * * - tableName: The table name to use, default is "objectcache". * - * - shards: The number of tables to use for data storage. If this is - * more than 1, table names will be formed in the style + * - shards: The number of tables to use for data storage on each server. + * If this is more than 1, table names will be formed in the style * objectcacheNNN where NNN is the shard index, between 0 and * shards-1. The number of digits will be the minimum number * required to hold the largest shard index. Data will be @@ -70,9 +73,19 @@ class SqlBagOStuff extends BagOStuff { * @param $params array */ public function __construct( $params ) { - if ( isset( $params['server'] ) ) { - $this->serverInfo = $params['server']; - $this->serverInfo['load'] = 1; + if ( isset( $params['servers'] ) ) { + $this->serverInfos = $params['servers']; + $this->numServers = count( $this->serverInfos ); + $this->serverNames = array(); + foreach ( $this->serverInfos as $i => $info ) { + $this->serverNames[$i] = isset( $info['host'] ) ? $info['host'] : "#$i"; + } + } elseif ( isset( $params['server'] ) ) { + $this->serverInfos = array( $params['server'] ); + $this->numServers = count( $this->serverInfos ); + } else { + $this->serverInfos = false; + $this->numServers = 1; } if ( isset( $params['purgePeriod'] ) ) { $this->purgePeriod = intval( $params['purgePeriod'] ); @@ -86,60 +99,81 @@ class SqlBagOStuff extends BagOStuff { } /** + * Get a connection to the specified database + * + * @param $serverIndex integer * @return DatabaseBase */ - protected function getDB() { + protected function getDB( $serverIndex ) { global $wgDebugDBTransactions; - # Don't keep timing out trying to connect for each call if the DB is down - if ( $this->connFailureError && ( time() - $this->connFailureTime ) < 60 ) { - throw $this->connFailureError; - } + if ( !isset( $this->conns[$serverIndex] ) ) { + if ( $serverIndex >= $this->numServers ) { + throw new MWException( __METHOD__ . ": Invalid server index \"$serverIndex\"" ); + } + + # Don't keep timing out trying to connect for each call if the DB is down + if ( isset( $this->connFailureErrors[$serverIndex] ) + && ( time() - $this->connFailureTimes[$serverIndex] ) < 60 ) + { + throw $this->connFailureErrors[$serverIndex]; + } - if ( !isset( $this->db ) ) { # If server connection info was given, use that - if ( $this->serverInfo ) { + if ( $this->serverInfos ) { if ( $wgDebugDBTransactions ) { - wfDebug( sprintf( "Using provided serverInfo for SqlBagOStuff\n" ) ); + wfDebug( "Using provided serverInfo for SqlBagOStuff\n" ); } - $this->lb = new LoadBalancer( array( - 'servers' => array( $this->serverInfo ) ) ); - $this->db = $this->lb->getConnection( DB_MASTER ); - $this->db->clearFlag( DBO_TRX ); + $info = $this->serverInfos[$serverIndex]; + $type = isset( $info['type'] ) ? $info['type'] : 'mysql'; + $host = isset( $info['host'] ) ? $info['host'] : '[unknown]'; + wfDebug( __CLASS__ . ": connecting to $host\n" ); + $db = DatabaseBase::factory( $type, $info ); + $db->clearFlag( DBO_TRX ); } else { /* * We must keep a separate connection to MySQL in order to avoid deadlocks - * However, SQLite has an opposite behaviour. And PostgreSQL needs to know + * However, SQLite has an opposite behavior. And PostgreSQL needs to know * if we are in transaction or no */ if ( wfGetDB( DB_MASTER )->getType() == 'mysql' ) { $this->lb = wfGetLBFactory()->newMainLB(); - $this->db = $this->lb->getConnection( DB_MASTER ); - $this->db->clearFlag( DBO_TRX ); // auto-commit mode + $db = $this->lb->getConnection( DB_MASTER ); + $db->clearFlag( DBO_TRX ); // auto-commit mode } else { - $this->db = wfGetDB( DB_MASTER ); + $db = wfGetDB( DB_MASTER ); } } if ( $wgDebugDBTransactions ) { - wfDebug( sprintf( "Connection %s will be used for SqlBagOStuff\n", $this->db ) ); + wfDebug( sprintf( "Connection %s will be used for SqlBagOStuff\n", $db ) ); } + $this->conns[$serverIndex] = $db; } - return $this->db; + return $this->conns[$serverIndex]; } /** - * Get the table name for a given key + * Get the server index and table name for a given key * @param $key string - * @return string + * @return Array: server index and table name */ protected function getTableByKey( $key ) { if ( $this->shards > 1 ) { $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; - return $this->getTableByShard( $hash % $this->shards ); + $tableIndex = $hash % $this->shards; } else { - return $this->tableName; + $tableIndex = 0; + } + if ( $this->numServers > 1 ) { + $sortedServers = $this->serverNames; + ArrayUtils::consistentHashSort( $sortedServers, $key ); + reset( $sortedServers ); + $serverIndex = key( $sortedServers ); + } else { + $serverIndex = 0; } + return array( $serverIndex, $this->getTableNameByShard( $tableIndex ) ); } /** @@ -147,7 +181,7 @@ class SqlBagOStuff extends BagOStuff { * @param $index int * @return string */ - protected function getTableByShard( $index ) { + protected function getTableNameByShard( $index ) { if ( $this->shards > 1 ) { $decimals = strlen( $this->shards - 1 ); return $this->tableName . @@ -159,11 +193,16 @@ class SqlBagOStuff extends BagOStuff { /** * @param $key string + * @param $casToken[optional] mixed * @return mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { $values = $this->getMulti( array( $key ) ); - return array_key_exists( $key, $values ) ? $values[$key] : false; + if ( array_key_exists( $key, $values ) ) { + $casToken = $values[$key]; + return $values[$key]; + } + return false; } /** @@ -173,59 +212,61 @@ class SqlBagOStuff extends BagOStuff { public function getMulti( array $keys ) { $values = array(); // array of (key => value) - try { - $db = $this->getDB(); - $keysByTableName = array(); - foreach ( $keys as $key ) { - $tableName = $this->getTableByKey( $key ); - if ( !isset( $keysByTableName[$tableName] ) ) { - $keysByTableName[$tableName] = array(); - } - $keysByTableName[$tableName][] = $key; - } + $keysByTable = array(); + foreach ( $keys as $key ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $keysByTable[$serverIndex][$tableName][] = $key; + } - $this->garbageCollect(); // expire old entries if any + $this->garbageCollect(); // expire old entries if any - $dataRows = array(); - foreach ( $keysByTableName as $tableName => $tableKeys ) { - $res = $db->select( $tableName, - array( 'keyname', 'value', 'exptime' ), - array( 'keyname' => $tableKeys ), - __METHOD__ ); - foreach ( $res as $row ) { - $dataRows[$row->keyname] = $row; + $dataRows = array(); + foreach ( $keysByTable as $serverIndex => $serverKeys ) { + $db = $this->getDB( $serverIndex ); + try { + foreach ( $serverKeys as $tableName => $tableKeys ) { + $res = $db->select( $tableName, + array( 'keyname', 'value', 'exptime' ), + array( 'keyname' => $tableKeys ), + __METHOD__ ); + foreach ( $res as $row ) { + $row->serverIndex = $serverIndex; + $row->tableName = $tableName; + $dataRows[$row->keyname] = $row; + } } + } catch ( DBError $e ) { + $this->handleReadError( $e, $serverIndex ); } + } - foreach ( $keys as $key ) { - if ( isset( $dataRows[$key] ) ) { // HIT? - $row = $dataRows[$key]; - $this->debug( "get: retrieved data; expiry time is " . $row->exptime ); - if ( $this->isExpired( $row->exptime ) ) { // MISS - $this->debug( "get: key has expired, deleting" ); - try { - $db->begin( __METHOD__ ); - # Put the expiry time in the WHERE condition to avoid deleting a - # newly-inserted value - $db->delete( $this->getTableByKey( $key ), - array( 'keyname' => $key, 'exptime' => $row->exptime ), - __METHOD__ ); - $db->commit( __METHOD__ ); - } catch ( DBQueryError $e ) { - $this->handleWriteError( $e ); - } - $values[$key] = false; - } else { // HIT - $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) ); + foreach ( $keys as $key ) { + if ( isset( $dataRows[$key] ) ) { // HIT? + $row = $dataRows[$key]; + $this->debug( "get: retrieved data; expiry time is " . $row->exptime ); + $db = $this->getDB( $row->serverIndex ); + if ( $this->isExpired( $db, $row->exptime ) ) { // MISS + $this->debug( "get: key has expired, deleting" ); + try { + $db->begin( __METHOD__ ); + # Put the expiry time in the WHERE condition to avoid deleting a + # newly-inserted value + $db->delete( $row->tableName, + array( 'keyname' => $key, 'exptime' => $row->exptime ), + __METHOD__ ); + $db->commit( __METHOD__ ); + } catch ( DBQueryError $e ) { + $this->handleWriteError( $e, $row->serverIndex ); } - } else { // MISS $values[$key] = false; - $this->debug( 'get: no matching rows' ); + } else { // HIT + $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) ); } + } else { // MISS + $values[$key] = false; + $this->debug( 'get: no matching rows' ); } - } catch ( DBError $e ) { - $this->handleReadError( $e ); - }; + } return $values; } @@ -237,8 +278,9 @@ class SqlBagOStuff extends BagOStuff { * @return bool */ public function set( $key, $value, $exptime = 0 ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); try { - $db = $this->getDB(); + $db = $this->getDB( $serverIndex ); $exptime = intval( $exptime ); if ( $exptime < 0 ) { @@ -246,7 +288,7 @@ class SqlBagOStuff extends BagOStuff { } if ( $exptime == 0 ) { - $encExpiry = $this->getMaxDateTime(); + $encExpiry = $this->getMaxDateTime( $db ); } else { if ( $exptime < 3.16e8 ) { # ~10 years $exptime += time(); @@ -258,7 +300,7 @@ class SqlBagOStuff extends BagOStuff { // (bug 24425) use a replace if the db supports it instead of // delete/insert to avoid clashes with conflicting keynames $db->replace( - $this->getTableByKey( $key ), + $tableName, array( 'keyname' ), array( 'keyname' => $key, @@ -267,29 +309,81 @@ class SqlBagOStuff extends BagOStuff { ), __METHOD__ ); $db->commit( __METHOD__ ); } catch ( DBError $e ) { - $this->handleWriteError( $e ); + $this->handleWriteError( $e, $serverIndex ); return false; } return true; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + try { + $db = $this->getDB( $serverIndex ); + $exptime = intval( $exptime ); + + if ( $exptime < 0 ) { + $exptime = 0; + } + + if ( $exptime == 0 ) { + $encExpiry = $this->getMaxDateTime( $db ); + } else { + if ( $exptime < 3.16e8 ) { # ~10 years + $exptime += time(); + } + $encExpiry = $db->timestamp( $exptime ); + } + $db->begin( __METHOD__ ); + // (bug 24425) use a replace if the db supports it instead of + // delete/insert to avoid clashes with conflicting keynames + $db->update( + $tableName, + array( + 'keyname' => $key, + 'value' => $db->encodeBlob( $this->serialize( $value ) ), + 'exptime' => $encExpiry + ), + array( + 'keyname' => $key, + 'value' => $db->encodeBlob( $this->serialize( $casToken ) ) + ), + __METHOD__ + ); + $db->commit( __METHOD__ ); + } catch ( DBQueryError $e ) { + $this->handleWriteError( $e, $serverIndex ); + + return false; + } + + return (bool) $db->affectedRows(); + } + /** * @param $key string * @param $time int * @return bool */ public function delete( $key, $time = 0 ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); try { - $db = $this->getDB(); + $db = $this->getDB( $serverIndex ); $db->begin( __METHOD__ ); $db->delete( - $this->getTableByKey( $key ), + $tableName, array( 'keyname' => $key ), __METHOD__ ); $db->commit( __METHOD__ ); } catch ( DBError $e ) { - $this->handleWriteError( $e ); + $this->handleWriteError( $e, $serverIndex ); return false; } @@ -302,9 +396,9 @@ class SqlBagOStuff extends BagOStuff { * @return int|null */ public function incr( $key, $step = 1 ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); try { - $db = $this->getDB(); - $tableName = $this->getTableByKey( $key ); + $db = $this->getDB( $serverIndex ); $step = intval( $step ); $db->begin( __METHOD__ ); $row = $db->selectRow( @@ -320,7 +414,7 @@ class SqlBagOStuff extends BagOStuff { return null; } $db->delete( $tableName, array( 'keyname' => $key ), __METHOD__ ); - if ( $this->isExpired( $row->exptime ) ) { + if ( $this->isExpired( $db, $row->exptime ) ) { // Expired, do not reinsert $db->commit( __METHOD__ ); @@ -342,51 +436,29 @@ class SqlBagOStuff extends BagOStuff { } $db->commit( __METHOD__ ); } catch ( DBError $e ) { - $this->handleWriteError( $e ); + $this->handleWriteError( $e, $serverIndex ); return null; } return $newValue; } - /** - * @return Array - */ - public function keys() { - $result = array(); - - try { - $db = $this->getDB(); - for ( $i = 0; $i < $this->shards; $i++ ) { - $res = $db->select( $this->getTableByShard( $i ), - array( 'keyname' ), false, __METHOD__ ); - foreach ( $res as $row ) { - $result[] = $row->keyname; - } - } - } catch ( DBError $e ) { - $this->handleReadError( $e ); - } - - return $result; - } - /** * @param $exptime string * @return bool */ - protected function isExpired( $exptime ) { - return $exptime != $this->getMaxDateTime() && wfTimestamp( TS_UNIX, $exptime ) < time(); + protected function isExpired( $db, $exptime ) { + return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time(); } /** * @return string */ - protected function getMaxDateTime() { + protected function getMaxDateTime( $db ) { if ( time() > 0x7fffffff ) { - return $this->getDB()->timestamp( 1 << 62 ); + return $db->timestamp( 1 << 62 ); } else { - return $this->getDB()->timestamp( 0x7fffffff ); + return $db->timestamp( 0x7fffffff ); } } @@ -418,87 +490,91 @@ class SqlBagOStuff extends BagOStuff { * @return bool */ public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) { - try { - $db = $this->getDB(); - $dbTimestamp = $db->timestamp( $timestamp ); - $totalSeconds = false; - $baseConds = array( 'exptime < ' . $db->addQuotes( $dbTimestamp ) ); - for ( $i = 0; $i < $this->shards; $i++ ) { - $maxExpTime = false; - while ( true ) { - $conds = $baseConds; - if ( $maxExpTime !== false ) { - $conds[] = 'exptime > ' . $db->addQuotes( $maxExpTime ); - } - $rows = $db->select( - $this->getTableByShard( $i ), - array( 'keyname', 'exptime' ), - $conds, - __METHOD__, - array( 'LIMIT' => 100, 'ORDER BY' => 'exptime' ) ); - if ( !$rows->numRows() ) { - break; - } - $keys = array(); - $row = $rows->current(); - $minExpTime = $row->exptime; - if ( $totalSeconds === false ) { - $totalSeconds = wfTimestamp( TS_UNIX, $timestamp ) - - wfTimestamp( TS_UNIX, $minExpTime ); - } - foreach ( $rows as $row ) { - $keys[] = $row->keyname; - $maxExpTime = $row->exptime; - } - - $db->begin( __METHOD__ ); - $db->delete( - $this->getTableByShard( $i ), - array( - 'exptime >= ' . $db->addQuotes( $minExpTime ), - 'exptime < ' . $db->addQuotes( $dbTimestamp ), - 'keyname' => $keys - ), - __METHOD__ ); - $db->commit( __METHOD__ ); + for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { + try { + $db = $this->getDB( $serverIndex ); + $dbTimestamp = $db->timestamp( $timestamp ); + $totalSeconds = false; + $baseConds = array( 'exptime < ' . $db->addQuotes( $dbTimestamp ) ); + for ( $i = 0; $i < $this->shards; $i++ ) { + $maxExpTime = false; + while ( true ) { + $conds = $baseConds; + if ( $maxExpTime !== false ) { + $conds[] = 'exptime > ' . $db->addQuotes( $maxExpTime ); + } + $rows = $db->select( + $this->getTableNameByShard( $i ), + array( 'keyname', 'exptime' ), + $conds, + __METHOD__, + array( 'LIMIT' => 100, 'ORDER BY' => 'exptime' ) ); + if ( !$rows->numRows() ) { + break; + } + $keys = array(); + $row = $rows->current(); + $minExpTime = $row->exptime; + if ( $totalSeconds === false ) { + $totalSeconds = wfTimestamp( TS_UNIX, $timestamp ) + - wfTimestamp( TS_UNIX, $minExpTime ); + } + foreach ( $rows as $row ) { + $keys[] = $row->keyname; + $maxExpTime = $row->exptime; + } - if ( $progressCallback ) { - if ( intval( $totalSeconds ) === 0 ) { - $percent = 0; - } else { - $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp ) - - wfTimestamp( TS_UNIX, $maxExpTime ); - if ( $remainingSeconds > $totalSeconds ) { - $totalSeconds = $remainingSeconds; + $db->begin( __METHOD__ ); + $db->delete( + $this->getTableNameByShard( $i ), + array( + 'exptime >= ' . $db->addQuotes( $minExpTime ), + 'exptime < ' . $db->addQuotes( $dbTimestamp ), + 'keyname' => $keys + ), + __METHOD__ ); + $db->commit( __METHOD__ ); + + if ( $progressCallback ) { + if ( intval( $totalSeconds ) === 0 ) { + $percent = 0; + } else { + $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp ) + - wfTimestamp( TS_UNIX, $maxExpTime ); + if ( $remainingSeconds > $totalSeconds ) { + $totalSeconds = $remainingSeconds; + } + $percent = ( $i + $remainingSeconds / $totalSeconds ) + / $this->shards * 100; } - $percent = ( $i + $remainingSeconds / $totalSeconds ) - / $this->shards * 100; + $percent = ( $percent / $this->numServers ) + + ( $serverIndex / $this->numServers * 100 ); + call_user_func( $progressCallback, $percent ); } - call_user_func( $progressCallback, $percent ); } } + } catch ( DBError $e ) { + $this->handleWriteError( $e, $serverIndex ); + return false; } - } catch ( DBError $e ) { - $this->handleWriteError( $e ); - return false; } - return true; } public function deleteAll() { - try { - $db = $this->getDB(); - for ( $i = 0; $i < $this->shards; $i++ ) { - $db->begin( __METHOD__ ); - $db->delete( $this->getTableByShard( $i ), '*', __METHOD__ ); - $db->commit( __METHOD__ ); + for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { + try { + $db = $this->getDB( $serverIndex ); + for ( $i = 0; $i < $this->shards; $i++ ) { + $db->begin( __METHOD__ ); + $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ ); + $db->commit( __METHOD__ ); + } + } catch ( DBError $e ) { + $this->handleWriteError( $e, $serverIndex ); + return false; } - } catch ( DBError $e ) { - $this->handleWriteError( $e ); - return false; } - return true; } @@ -544,58 +620,77 @@ class SqlBagOStuff extends BagOStuff { /** * Handle a DBError which occurred during a read operation. */ - protected function handleReadError( DBError $exception ) { + protected function handleReadError( DBError $exception, $serverIndex ) { if ( $exception instanceof DBConnectionError ) { - $this->connFailureTime = time(); - $this->connFailureError = $exception; + $this->markServerDown( $exception, $serverIndex ); } wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" ); - if ( $this->db ) { - wfDebug( __METHOD__ . ": ignoring query error\n" ); - } else { + if ( $exception instanceof DBConnectionError ) { wfDebug( __METHOD__ . ": ignoring connection error\n" ); + } else { + wfDebug( __METHOD__ . ": ignoring query error\n" ); } } /** * Handle a DBQueryError which occurred during a write operation. */ - protected function handleWriteError( DBError $exception ) { + protected function handleWriteError( DBError $exception, $serverIndex ) { if ( $exception instanceof DBConnectionError ) { - $this->connFailureTime = time(); - $this->connFailureError = $exception; + $this->markServerDown( $exception, $serverIndex ); } - if ( $this->db && $this->db->wasReadOnlyError() ) { + if ( $exception->db && $exception->db->wasReadOnlyError() ) { try { - $this->db->rollback( __METHOD__ ); + $exception->db->rollback( __METHOD__ ); } catch ( DBError $e ) {} } wfDebugLog( 'SQLBagOStuff', "DBError: {$exception->getMessage()}" ); - if ( $this->db ) { - wfDebug( __METHOD__ . ": ignoring query error\n" ); - } else { + if ( $exception instanceof DBConnectionError ) { wfDebug( __METHOD__ . ": ignoring connection error\n" ); + } else { + wfDebug( __METHOD__ . ": ignoring query error\n" ); + } + } + + /** + * Mark a server down due to a DBConnectionError exception + */ + protected function markServerDown( $exception, $serverIndex ) { + if ( isset( $this->connFailureTimes[$serverIndex] ) ) { + if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) { + unset( $this->connFailureTimes[$serverIndex] ); + unset( $this->connFailureErrors[$serverIndex] ); + } else { + wfDebug( __METHOD__ . ": Server #$serverIndex already down\n" ); + return; + } } + $now = time(); + wfDebug( __METHOD__ . ": Server #$serverIndex down until " . ( $now + 60 ) . "\n" ); + $this->connFailureTimes[$serverIndex] = $now; + $this->connFailureErrors[$serverIndex] = $exception; } /** * Create shard tables. For use from eval.php. */ public function createTables() { - $db = $this->getDB(); - if ( $db->getType() !== 'mysql' - || version_compare( $db->getServerVersion(), '4.1.0', '<' ) ) - { - throw new MWException( __METHOD__ . ' is not supported on this DB server' ); - } + for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { + $db = $this->getDB( $serverIndex ); + if ( $db->getType() !== 'mysql' + || version_compare( $db->getServerVersion(), '4.1.0', '<' ) ) + { + throw new MWException( __METHOD__ . ' is not supported on this DB server' ); + } - for ( $i = 0; $i < $this->shards; $i++ ) { - $db->begin( __METHOD__ ); - $db->query( - 'CREATE TABLE ' . $db->tableName( $this->getTableByShard( $i ) ) . - ' LIKE ' . $db->tableName( 'objectcache' ), - __METHOD__ ); - $db->commit( __METHOD__ ); + for ( $i = 0; $i < $this->shards; $i++ ) { + $db->begin( __METHOD__ ); + $db->query( + 'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) . + ' LIKE ' . $db->tableName( 'objectcache' ), + __METHOD__ ); + $db->commit( __METHOD__ ); + } } } } @@ -604,4 +699,3 @@ class SqlBagOStuff extends BagOStuff { * Backwards compatibility alias */ class MediaWikiBagOStuff extends SqlBagOStuff { } - diff --git a/includes/objectcache/WinCacheBagOStuff.php b/includes/objectcache/WinCacheBagOStuff.php index 21aa39e7..6d9b47ad 100644 --- a/includes/objectcache/WinCacheBagOStuff.php +++ b/includes/objectcache/WinCacheBagOStuff.php @@ -32,12 +32,15 @@ class WinCacheBagOStuff extends BagOStuff { /** * Get a value from the WinCache object cache * - * @param $key String: cache key + * @param string $key cache key + * @param $casToken[optional] int: cas token * @return mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { $val = wincache_ucache_get( $key ); + $casToken = $val; + if ( is_string( $val ) ) { $val = unserialize( $val ); } @@ -48,9 +51,9 @@ class WinCacheBagOStuff extends BagOStuff { /** * Store a value in the WinCache object cache * - * @param $key String: cache key + * @param string $key cache key * @param $value Mixed: object to store - * @param $expire Int: expiration time + * @param int $expire expiration time * @return bool */ public function set( $key, $value, $expire = 0 ) { @@ -62,34 +65,28 @@ class WinCacheBagOStuff extends BagOStuff { } /** - * Remove a value from the WinCache object cache + * Store a value in the WinCache object cache, race condition-safe * - * @param $key String: cache key - * @param $time Int: not used in this implementation + * @param int $casToken cas token + * @param string $key cache key + * @param int $value object to store + * @param int $exptime expiration time * @return bool */ - public function delete( $key, $time = 0 ) { - wincache_ucache_delete( $key ); - - return true; + public function cas( $casToken, $key, $value, $exptime = 0 ) { + return wincache_ucache_cas( $key, $casToken, serialize( $value ) ); } /** - * @return Array + * Remove a value from the WinCache object cache + * + * @param string $key cache key + * @param int $time not used in this implementation + * @return bool */ - public function keys() { - $info = wincache_ucache_info(); - $list = $info['ucache_entries']; - $keys = array(); - - if ( is_null( $list ) ) { - return array(); - } - - foreach ( $list as $entry ) { - $keys[] = $entry['key_name']; - } + public function delete( $key, $time = 0 ) { + wincache_ucache_delete( $key ); - return $keys; + return true; } } diff --git a/includes/objectcache/XCacheBagOStuff.php b/includes/objectcache/XCacheBagOStuff.php index bc68b596..0f45db73 100644 --- a/includes/objectcache/XCacheBagOStuff.php +++ b/includes/objectcache/XCacheBagOStuff.php @@ -31,10 +31,11 @@ class XCacheBagOStuff extends BagOStuff { /** * Get a value from the XCache object cache * - * @param $key String: cache key + * @param string $key cache key + * @param $casToken mixed: cas token * @return mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { $val = xcache_get( $key ); if ( is_string( $val ) ) { @@ -53,9 +54,9 @@ class XCacheBagOStuff extends BagOStuff { /** * Store a value in the XCache object cache * - * @param $key String: cache key + * @param string $key cache key * @param $value Mixed: object to store - * @param $expire Int: expiration time + * @param int $expire expiration time * @return bool */ public function set( $key, $value, $expire = 0 ) { @@ -67,11 +68,23 @@ class XCacheBagOStuff extends BagOStuff { return true; } + /** + * @param $casToken mixed + * @param $key string + * @param $value mixed + * @param $exptime int + * @return bool + */ + public function cas( $casToken, $key, $value, $exptime = 0 ) { + // Can't find any documentation on xcache cas + throw new MWException( "CAS is not implemented in " . __CLASS__ ); + } + /** * Remove a value from the XCache object cache * - * @param $key String: cache key - * @param $time Int: not used in this implementation + * @param string $key cache key + * @param int $time not used in this implementation * @return bool */ public function delete( $key, $time = 0 ) { @@ -79,6 +92,21 @@ class XCacheBagOStuff extends BagOStuff { return true; } + /** + * Merge an item. + * XCache does not seem to support any way of performing CAS - this however will + * provide a way to perform CAS-like functionality. + * + * @param $key string + * @param $callback closure Callback method to be executed + * @param int $exptime Either an interval in seconds or a unix timestamp for expiry + * @param int $attempts The amount of times to attempt a merge in case of failure + * @return bool success + */ + public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) { + return $this->mergeViaLock( $key, $callback, $exptime, $attempts ); + } + public function incr( $key, $value = 1 ) { return xcache_inc( $key, $value ); } -- cgit v1.2.3-54-g00ecf