diff options
Diffstat (limited to 'includes/objectcache/MemcachedClient.php')
-rw-r--r-- | includes/objectcache/MemcachedClient.php | 187 |
1 files changed, 127 insertions, 60 deletions
diff --git a/includes/objectcache/MemcachedClient.php b/includes/objectcache/MemcachedClient.php index 536ba6ea..0d96ed6c 100644 --- a/includes/objectcache/MemcachedClient.php +++ b/includes/objectcache/MemcachedClient.php @@ -99,7 +99,6 @@ class MWMemcached { // }}} - /** * Command statistics * @@ -242,7 +241,7 @@ class MWMemcached { /** * Memcache initializer * - * @param $args Array Associative array of settings + * @param array $args Associative array of settings * * @return mixed */ @@ -272,12 +271,12 @@ class MWMemcached { * Adds a key/value to the memcache server if one isn't already set with * that key * - * @param $key String: key to set with data + * @param string $key key to set with data * @param $val Mixed: value to store * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or * longer must be the timestamp of the time at which the mapping should expire. It - * is safe to use timestamps in all cases, regardless of exipration + * is safe to use timestamps in all cases, regardless of expiration * eg: strtotime("+3 hour") * * @return Boolean @@ -292,7 +291,7 @@ class MWMemcached { /** * Decrease a value stored on the memcache server * - * @param $key String: key to decrease + * @param string $key key to decrease * @param $amt Integer: (optional) amount to decrease * * @return Mixed: FALSE on failure, value on success @@ -307,7 +306,7 @@ class MWMemcached { /** * Deletes a key from the server, optionally after $time * - * @param $key String: key to delete + * @param string $key key to delete * @param $time Integer: (optional) how long to wait before deleting * * @return Boolean: TRUE on success, FALSE on failure @@ -407,11 +406,12 @@ class MWMemcached { /** * Retrieves the value associated with the key from the memcache server * - * @param $key array|string key to retrieve + * @param array|string $key key to retrieve + * @param $casToken[optional] Float * * @return Mixed */ - public function get( $key ) { + public function get( $key, &$casToken = null ) { wfProfileIn( __METHOD__ ); if ( $this->_debug ) { @@ -437,14 +437,14 @@ class MWMemcached { $this->stats['get'] = 1; } - $cmd = "get $key\r\n"; + $cmd = "gets $key\r\n"; if ( !$this->_fwrite( $sock, $cmd ) ) { wfProfileOut( __METHOD__ ); return false; } $val = array(); - $this->_load_items( $sock, $val ); + $this->_load_items( $sock, $val, $casToken ); if ( $this->_debug ) { foreach ( $val as $k => $v ) { @@ -466,7 +466,7 @@ class MWMemcached { /** * Get multiple keys from the server(s) * - * @param $keys Array: keys to retrieve + * @param array $keys keys to retrieve * * @return Array */ @@ -498,7 +498,7 @@ class MWMemcached { $gather = array(); // Send out the requests foreach ( $socks as $sock ) { - $cmd = 'get'; + $cmd = 'gets'; foreach ( $sock_keys[ intval( $sock ) ] as $key ) { $cmd .= ' ' . $key; } @@ -512,7 +512,7 @@ class MWMemcached { // Parse responses $val = array(); foreach ( $gather as $sock ) { - $this->_load_items( $sock, $val ); + $this->_load_items( $sock, $val, $casToken ); } if ( $this->_debug ) { @@ -530,7 +530,7 @@ class MWMemcached { /** * Increments $key (optionally) by $amt * - * @param $key String: key to increment + * @param string $key key to increment * @param $amt Integer: (optional) amount to increment * * @return Integer: null if the key does not exist yet (this does NOT @@ -547,7 +547,7 @@ class MWMemcached { /** * Overwrites an existing value for key; only works if key is already set * - * @param $key String: key to set value as + * @param string $key key to set value as * @param $value Mixed: value to store * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or @@ -569,7 +569,7 @@ class MWMemcached { * output as an array (null array if no output) * * @param $sock Resource: socket to send command on - * @param $cmd String: command to run + * @param string $cmd command to run * * @return Array: output array */ @@ -603,7 +603,7 @@ class MWMemcached { * Unconditionally sets a key to a given value in the memcache. Returns true * if set successfully. * - * @param $key String: key to set value as + * @param string $key key to set value as * @param $value Mixed: value to set * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or @@ -618,6 +618,28 @@ class MWMemcached { } // }}} + // {{{ cas() + + /** + * Sets a key to a given value in the memcache if the current value still corresponds + * to a known, given value. Returns true if set successfully. + * + * @param $casToken Float: current known value + * @param string $key key to set value as + * @param $value Mixed: value to set + * @param $exp Integer: (optional) Expiration time. This can be a number of seconds + * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or + * longer must be the timestamp of the time at which the mapping should expire. It + * is safe to use timestamps in all cases, regardless of exipration + * eg: strtotime("+3 hour") + * + * @return Boolean: TRUE on success + */ + public function cas( $casToken, $key, $value, $exp = 0 ) { + return $this->_set( 'cas', $key, $value, $exp, $casToken ); + } + + // }}} // {{{ set_compress_threshold() /** @@ -649,7 +671,7 @@ class MWMemcached { /** * Sets the server list to distribute key gets and puts between * - * @param $list Array of servers to connect to + * @param array $list of servers to connect to * * @see MWMemcached::__construct() */ @@ -684,7 +706,7 @@ class MWMemcached { /** * Close the specified socket * - * @param $sock String: socket to close + * @param string $sock socket to close * * @access private */ @@ -701,7 +723,7 @@ class MWMemcached { * Connects $sock to $host, timing out after $timeout * * @param $sock Integer: socket to connect - * @param $host String: Host:IP to connect to + * @param string $host Host:IP to connect to * * @return boolean * @access private @@ -743,7 +765,7 @@ class MWMemcached { /** * Marks a host as dead until 30-40 seconds in the future * - * @param $sock String: socket to mark as dead + * @param string $sock socket to mark as dead * * @access private */ @@ -769,7 +791,7 @@ class MWMemcached { /** * get_sock * - * @param $key String: key to retrieve value for; + * @param string $key key to retrieve value for; * * @return Mixed: resource on success, false on failure * @access private @@ -818,7 +840,7 @@ class MWMemcached { /** * Creates a hash integer based on the $key * - * @param $key String: key to hash + * @param string $key key to hash * * @return Integer: hash value * @access private @@ -836,8 +858,8 @@ class MWMemcached { /** * Perform increment/decriment on $key * - * @param $cmd String command to perform - * @param $key String|array key to perform it on + * @param string $cmd command to perform + * @param string|array $key key to perform it on * @param $amt Integer amount to adjust * * @return Integer: new value of $key @@ -878,40 +900,78 @@ class MWMemcached { * Load items into $ret from $sock * * @param $sock Resource: socket to read from - * @param $ret Array: returned values + * @param array $ret returned values + * @param $casToken[optional] Float * @return boolean True for success, false for failure * * @access private */ - function _load_items( $sock, &$ret ) { + function _load_items( $sock, &$ret, &$casToken = null ) { + $results = array(); + while ( 1 ) { $decl = $this->_fgets( $sock ); + if( $decl === false ) { + /* + * If nothing can be read, something is wrong because we know exactly when + * to stop reading (right after "END") and we return right after that. + */ return false; + } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+) (\d+)$/', $decl, $match ) ) { + /* + * Read all data returned. This can be either one or multiple values. + * Save all that data (in an array) to be processed later: we'll first + * want to continue reading until "END" before doing anything else, + * to make sure that we don't leave our client in a state where it's + * output is not yet fully read. + */ + $results[] = array( + $match[1], // rkey + $match[2], // flags + $match[3], // len + $match[4], // casToken + $this->_fread( $sock, $match[3] + 2 ), // data + ); } elseif ( $decl == "END" ) { - return true; - } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)$/', $decl, $match ) ) { - list( $rkey, $flags, $len ) = array( $match[1], $match[2], $match[3] ); - $data = $this->_fread( $sock, $len + 2 ); - if ( $data === false ) { - return false; - } - if ( substr( $data, -2 ) !== "\r\n" ) { - $this->_handle_error( $sock, - 'line ending missing from data block from $1' ); + if ( count( $results ) == 0 ) { return false; } - $data = substr( $data, 0, -2 ); - $ret[$rkey] = $data; - if ( $this->_have_zlib && $flags & self::COMPRESSED ) { - $ret[$rkey] = gzuncompress( $ret[$rkey] ); - } + /** + * All data has been read, time to process the data and build + * meaningful return values. + */ + foreach ( $results as $vars ) { + list( $rkey, $flags, $len, $casToken, $data ) = $vars; + + if ( $data === false || substr( $data, -2 ) !== "\r\n" ) { + $this->_handle_error( $sock, + 'line ending missing from data block from $1' ); + return false; + } + $data = substr( $data, 0, -2 ); + $ret[$rkey] = $data; + + if ( $this->_have_zlib && $flags & self::COMPRESSED ) { + $ret[$rkey] = gzuncompress( $ret[$rkey] ); + } - if ( $flags & self::SERIALIZED ) { - $ret[$rkey] = unserialize( $ret[$rkey] ); + /* + * This unserialize is the exact reason that we only want to + * process data after having read until "END" (instead of doing + * this right away): "unserialize" can trigger outside code: + * in the event that $ret[$rkey] is a serialized object, + * unserializing it will trigger __wakeup() if present. If that + * function attempted to read from memcached (while we did not + * yet read "END"), these 2 calls would collide. + */ + if ( $flags & self::SERIALIZED ) { + $ret[$rkey] = unserialize( $ret[$rkey] ); + } } + return true; } else { $this->_handle_error( $sock, 'Error parsing response from $1' ); return false; @@ -925,19 +985,20 @@ class MWMemcached { /** * Performs the requested storage operation to the memcache server * - * @param $cmd String: command to perform - * @param $key String: key to act on + * @param string $cmd command to perform + * @param string $key key to act on * @param $val Mixed: what we need to store * @param $exp Integer: (optional) Expiration time. This can be a number of seconds * to cache for (up to 30 days inclusive). Any timespans of 30 days + 1 second or * longer must be the timestamp of the time at which the mapping should expire. It * is safe to use timestamps in all cases, regardless of exipration * eg: strtotime("+3 hour") + * @param $casToken[optional] Float * * @return Boolean * @access private */ - function _set( $cmd, $key, $val, $exp ) { + function _set( $cmd, $key, $val, $exp, $casToken = null ) { if ( !$this->_active ) { return false; } @@ -966,7 +1027,7 @@ class MWMemcached { $len = strlen( $val ); if ( $this->_have_zlib && $this->_compress_enable && - $this->_compress_threshold && $len >= $this->_compress_threshold ) + $this->_compress_threshold && $len >= $this->_compress_threshold ) { $c_val = gzcompress( $val, 9 ); $c_len = strlen( $c_val ); @@ -980,7 +1041,13 @@ class MWMemcached { $flags |= self::COMPRESSED; } } - if ( !$this->_fwrite( $sock, "$cmd $key $flags $exp $len\r\n$val\r\n" ) ) { + + $command = "$cmd $key $flags $exp $len"; + if ( $casToken ) { + $command .= " $casToken"; + } + + if ( !$this->_fwrite( $sock, "$command\r\n$val\r\n" ) ) { return false; } @@ -1001,7 +1068,7 @@ class MWMemcached { /** * Returns the socket for the host * - * @param $host String: Host:IP to get socket for + * @param string $host Host:IP to get socket for * * @return Mixed: IO Stream or false * @access private @@ -1036,11 +1103,6 @@ class MWMemcached { * @param $text string */ function _debugprint( $text ) { - global $wgDebugLogGroups; - if( !isset( $wgDebugLogGroups['memcached'] ) ) { - # Prefix message since it will end up in main debug log file - $text = "memcached: $text"; - } wfDebugLog( 'memcached', $text ); } @@ -1096,7 +1158,7 @@ class MWMemcached { } /** - * Read the specified number of bytes from a stream. If there is an error, + * Read the specified number of bytes from a stream. If there is an error, * mark the socket dead. * * @param $sock The socket @@ -1137,7 +1199,7 @@ class MWMemcached { function _fgets( $sock ) { $result = fgets( $sock ); // fgets() may return a partial line if there is a select timeout after - // a successful recv(), so we have to check for a timeout even if we + // a successful recv(), so we have to check for a timeout even if we // got a string response. $data = stream_get_meta_data( $sock ); if ( $data['timed_out'] ) { @@ -1167,10 +1229,16 @@ class MWMemcached { if ( !is_resource( $f ) ) { return; } - $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); + $r = array( $f ); + $w = null; + $e = null; + $n = stream_select( $r, $w, $e, 0, 0 ); while ( $n == 1 && !feof( $f ) ) { fread( $f, 1024 ); - $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); + $r = array( $f ); + $w = null; + $e = null; + $n = stream_select( $r, $w, $e, 0, 0 ); } } @@ -1179,7 +1247,6 @@ class MWMemcached { // }}} } - // }}} class MemCachedClientforWiki extends MWMemcached { |