diff options
author | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-05-01 15:12:12 -0400 |
---|---|---|
committer | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-05-01 15:12:12 -0400 |
commit | c9aa36da061816dee256a979c2ff8d2ee41824d9 (patch) | |
tree | 29f7002b80ee984b488bd047dbbd80b36bf892e9 /includes/db/DatabaseMssql.php | |
parent | b4274e0e33eafb5e9ead9d949ebf031a9fb8363b (diff) | |
parent | d1ba966140d7a60cd5ae4e8667ceb27c1a138592 (diff) |
Merge branch 'archwiki'
# Conflicts:
# skins/ArchLinux.php
# skins/ArchLinux/archlogo.gif
Diffstat (limited to 'includes/db/DatabaseMssql.php')
-rw-r--r-- | includes/db/DatabaseMssql.php | 1502 |
1 files changed, 911 insertions, 591 deletions
diff --git a/includes/db/DatabaseMssql.php b/includes/db/DatabaseMssql.php index 37f5372e..af3cc72d 100644 --- a/includes/db/DatabaseMssql.php +++ b/includes/db/DatabaseMssql.php @@ -22,47 +22,54 @@ * @author Joel Penner <a-joelpe at microsoft dot com> * @author Chris Pucci <a-cpucci at microsoft dot com> * @author Ryan Biesemeyer <v-ryanbi at microsoft dot com> + * @author Ryan Schmidt <skizzerz at gmail dot com> */ /** * @ingroup Database */ class DatabaseMssql extends DatabaseBase { - var $mInsertId = null; - var $mLastResult = null; - var $mAffectedRows = null; - - var $mPort; - - function cascadingDeletes() { + protected $mInsertId = null; + protected $mLastResult = null; + protected $mAffectedRows = null; + protected $mSubqueryId = 0; + protected $mScrollableCursor = true; + protected $mPrepareStatements = true; + protected $mBinaryColumnCache = null; + protected $mBitColumnCache = null; + protected $mIgnoreDupKeyErrors = false; + + protected $mPort; + + public function cascadingDeletes() { return true; } - function cleanupTriggers() { - return true; + public function cleanupTriggers() { + return false; } - function strictIPs() { - return true; + public function strictIPs() { + return false; } - function realTimestamps() { - return true; + public function realTimestamps() { + return false; } - function implicitGroupby() { + public function implicitGroupby() { return false; } - function implicitOrderby() { + public function implicitOrderby() { return false; } - function functionalIndexes() { + public function functionalIndexes() { return true; } - function unionSupportsOrderAndLimit() { + public function unionSupportsOrderAndLimit() { return false; } @@ -75,16 +82,21 @@ class DatabaseMssql extends DatabaseBase { * @throws DBConnectionError * @return bool|DatabaseBase|null */ - function open( $server, $user, $password, $dbName ) { + public function open( $server, $user, $password, $dbName ) { # Test for driver support, to avoid suppressed fatal error if ( !function_exists( 'sqlsrv_connect' ) ) { - throw new DBConnectionError( $this, "MS Sql Server Native (sqlsrv) functions missing. You can download the driver from: http://go.microsoft.com/fwlink/?LinkId=123470\n" ); + throw new DBConnectionError( + $this, + "Microsoft SQL Server Native (sqlsrv) functions missing. + You can download the driver from: http://go.microsoft.com/fwlink/?LinkId=123470\n" + ); } - global $wgDBport; + global $wgDBport, $wgDBWindowsAuthentication; - if ( !strlen( $user ) ) { # e.g. the class is being loaded - return; + # e.g. the class is being loaded + if ( !strlen( $user ) ) { + return null; } $this->close(); @@ -100,35 +112,23 @@ class DatabaseMssql extends DatabaseBase { $connectionInfo['Database'] = $dbName; } - // Start NT Auth Hack - // Quick and dirty work around to provide NT Auth designation support. - // Current solution requires installer to know to input 'ntauth' for both username and password - // to trigger connection via NT Auth. - ugly, ugly, ugly - // TO-DO: Make this better and add NT Auth choice to MW installer when SQL Server option is chosen. - $ntAuthUserTest = strtolower( $user ); - $ntAuthPassTest = strtolower( $password ); - // Decide which auth scenerio to use - if ( $ntAuthPassTest == 'ntauth' && $ntAuthUserTest == 'ntauth' ) { - // Don't add credentials to $connectionInfo - } else { + // if we are using Windows auth, don't add credentials to $connectionInfo + if ( !$wgDBWindowsAuthentication ) { $connectionInfo['UID'] = $user; $connectionInfo['PWD'] = $password; } - // End NT Auth Hack wfSuppressWarnings(); $this->mConn = sqlsrv_connect( $server, $connectionInfo ); wfRestoreWarnings(); if ( $this->mConn === false ) { - wfDebug( "DB connection error\n" ); - wfDebug( "Server: $server, Database: $dbName, User: $user, Password: " . substr( $password, 0, 3 ) . "...\n" ); - wfDebug( $this->lastError() . "\n" ); - return false; + throw new DBConnectionError( $this, $this->lastError() ); } $this->mOpened = true; + return $this->mConn; } @@ -141,14 +141,39 @@ class DatabaseMssql extends DatabaseBase { return sqlsrv_close( $this->mConn ); } + /** + * @param bool|MssqlResultWrapper|resource $result + * @return bool|MssqlResultWrapper + */ + public function resultObject( $result ) { + if ( empty( $result ) ) { + return false; + } elseif ( $result instanceof MssqlResultWrapper ) { + return $result; + } elseif ( $result === true ) { + // Successful write query + return $result; + } else { + return new MssqlResultWrapper( $this, $result ); + } + } + + /** + * @param string $sql + * @return bool|MssqlResult + * @throws DBUnexpectedError + */ protected function doQuery( $sql ) { - wfDebug( "SQL: [$sql]\n" ); + if ( $this->debug() ) { + wfDebug( "SQL: [$sql]\n" ); + } $this->offset = 0; - // several extensions seem to think that all databases support limits via LIMIT N after the WHERE clause - // well, MSSQL uses SELECT TOP N, so to catch any of those extensions we'll do a quick check for a LIMIT - // clause and pass $sql through $this->LimitToTopN() which parses the limit clause and passes the result to - // $this->limitResult(); + // several extensions seem to think that all databases support limits + // via LIMIT N after the WHERE clause well, MSSQL uses SELECT TOP N, + // so to catch any of those extensions we'll do a quick check for a + // LIMIT clause and pass $sql through $this->LimitToTopN() which parses + // the limit clause and passes the result to $this->limitResult(); if ( preg_match( '/\bLIMIT\s*/i', $sql ) ) { // massage LIMIT -> TopN $sql = $this->LimitToTopN( $sql ); @@ -161,149 +186,235 @@ class DatabaseMssql extends DatabaseBase { } // perform query - $stmt = sqlsrv_query( $this->mConn, $sql ); - if ( $stmt == false ) { - $message = "A database error has occurred. Did you forget to run maintenance/update.php after upgrading? See: http://www.mediawiki.org/wiki/Manual:Upgrading#Run_the_update_script\n" . - "Query: " . htmlentities( $sql ) . "\n" . - "Function: " . __METHOD__ . "\n"; - // process each error (our driver will give us an array of errors unlike other providers) - foreach ( sqlsrv_errors() as $error ) { - $message .= $message . "ERROR[" . $error['code'] . "] " . $error['message'] . "\n"; - } - throw new DBUnexpectedError( $this, $message ); + // SQLSRV_CURSOR_STATIC is slower than SQLSRV_CURSOR_CLIENT_BUFFERED (one of the two is + // needed if we want to be able to seek around the result set), however CLIENT_BUFFERED + // has a bug in the sqlsrv driver where wchar_t types (such as nvarchar) that are empty + // strings make php throw a fatal error "Severe error translating Unicode" + if ( $this->mScrollableCursor ) { + $scrollArr = array( 'Scrollable' => SQLSRV_CURSOR_STATIC ); + } else { + $scrollArr = array(); } - // remember number of rows affected - $this->mAffectedRows = sqlsrv_rows_affected( $stmt ); - // if it is a SELECT statement, or an insert with a request to output something we want to return a row. - if ( ( preg_match( '#\bSELECT\s#i', $sql ) ) || - ( preg_match( '#\bINSERT\s#i', $sql ) && preg_match( '#\bOUTPUT\s+INSERTED\b#i', $sql ) ) ) { - // this is essentially a rowset, but Mediawiki calls these 'result' - // the rowset owns freeing the statement - $res = new MssqlResult( $stmt ); + if ( $this->mPrepareStatements ) { + // we do prepare + execute so we can get its field metadata for later usage if desired + $stmt = sqlsrv_prepare( $this->mConn, $sql, array(), $scrollArr ); + $success = sqlsrv_execute( $stmt ); } else { - // otherwise we simply return it was successful, failure throws an exception - $res = true; + $stmt = sqlsrv_query( $this->mConn, $sql, array(), $scrollArr ); + $success = (bool)$stmt; } - return $res; - } - function freeResult( $res ) { - if ( $res instanceof ResultWrapper ) { - $res = $res->result; + if ( $this->mIgnoreDupKeyErrors ) { + // ignore duplicate key errors, but nothing else + // this emulates INSERT IGNORE in MySQL + if ( $success === false ) { + $errors = sqlsrv_errors( SQLSRV_ERR_ERRORS ); + $success = true; + + foreach ( $errors as $err ) { + if ( $err['SQLSTATE'] == '23000' && $err['code'] == '2601' ) { + continue; // duplicate key error caused by unique index + } elseif ( $err['SQLSTATE'] == '23000' && $err['code'] == '2627' ) { + continue; // duplicate key error caused by primary key + } elseif ( $err['SQLSTATE'] == '01000' && $err['code'] == '3621' ) { + continue; // generic "the statement has been terminated" error + } + + $success = false; // getting here means we got an error we weren't expecting + break; + } + + if ( $success ) { + $this->mAffectedRows = 0; + return $stmt; + } + } + } + + if ( $success === false ) { + return false; } - $res->free(); + // remember number of rows affected + $this->mAffectedRows = sqlsrv_rows_affected( $stmt ); + + return $stmt; } - function fetchObject( $res ) { + public function freeResult( $res ) { if ( $res instanceof ResultWrapper ) { $res = $res->result; } - $row = $res->fetch( 'OBJECT' ); - return $row; + + sqlsrv_free_stmt( $res ); } - function getErrors() { - $strRet = ''; - $retErrors = sqlsrv_errors( SQLSRV_ERR_ALL ); - if ( $retErrors != null ) { - foreach ( $retErrors as $arrError ) { - $strRet .= "SQLState: " . $arrError['SQLSTATE'] . "\n"; - $strRet .= "Error Code: " . $arrError['code'] . "\n"; - $strRet .= "Message: " . $arrError['message'] . "\n"; - } - } else { - $strRet = "No errors found"; - } - return $strRet; + /** + * @param MssqlResultWrapper $res + * @return stdClass + */ + public function fetchObject( $res ) { + // $res is expected to be an instance of MssqlResultWrapper here + return $res->fetchObject(); } - function fetchRow( $res ) { - if ( $res instanceof ResultWrapper ) { - $res = $res->result; - } - $row = $res->fetch( SQLSRV_FETCH_BOTH ); - return $row; + /** + * @param MssqlResultWrapper $res + * @return array + */ + public function fetchRow( $res ) { + return $res->fetchRow(); } - function numRows( $res ) { + /** + * @param mixed $res + * @return int + */ + public function numRows( $res ) { if ( $res instanceof ResultWrapper ) { $res = $res->result; } - return ( $res ) ? $res->numrows() : 0; + + return sqlsrv_num_rows( $res ); } - function numFields( $res ) { + /** + * @param mixed $res + * @return int + */ + public function numFields( $res ) { if ( $res instanceof ResultWrapper ) { $res = $res->result; } - return ( $res ) ? $res->numfields() : 0; + + return sqlsrv_num_fields( $res ); } - function fieldName( $res, $n ) { + /** + * @param mixed $res + * @param int $n + * @return int + */ + public function fieldName( $res, $n ) { if ( $res instanceof ResultWrapper ) { $res = $res->result; } - return ( $res ) ? $res->fieldname( $n ) : 0; + + $metadata = sqlsrv_field_metadata( $res ); + return $metadata[$n]['Name']; } /** * This must be called after nextSequenceVal - * @return null + * @return int|null */ - function insertId() { + public function insertId() { return $this->mInsertId; } - function dataSeek( $res, $row ) { - if ( $res instanceof ResultWrapper ) { - $res = $res->result; - } - return ( $res ) ? $res->seek( $row ) : false; + /** + * @param MssqlResultWrapper $res + * @param int $row + * @return bool + */ + public function dataSeek( $res, $row ) { + return $res->seek( $row ); } - function lastError() { - if ( $this->mConn ) { - return $this->getErrors(); + /** + * @return string + */ + public function lastError() { + $strRet = ''; + $retErrors = sqlsrv_errors( SQLSRV_ERR_ALL ); + if ( $retErrors != null ) { + foreach ( $retErrors as $arrError ) { + $strRet .= $this->formatError( $arrError ) . "\n"; + } } else { - return "No database connection"; + $strRet = "No errors found"; } + + return $strRet; + } + + /** + * @param array $err + * @return string + */ + private function formatError( $err ) { + return '[SQLSTATE ' . $err['SQLSTATE'] . '][Error Code ' . $err['code'] . ']' . $err['message']; } - function lastErrno() { + /** + * @return string + */ + public function lastErrno() { $err = sqlsrv_errors( SQLSRV_ERR_ALL ); - if ( $err[0] ) { + if ( $err !== null && isset( $err[0] ) ) { return $err[0]['code']; } else { return 0; } } - function affectedRows() { + /** + * @return int + */ + public function affectedRows() { return $this->mAffectedRows; } /** * SELECT wrapper * - * @param $table Mixed: array or string, table name(s) (prefix auto-added) - * @param $vars Mixed: array or string, field name(s) to be retrieved - * @param $conds Mixed: array or string, condition(s) for WHERE - * @param $fname String: calling function name (use __METHOD__) for logs/profiling - * @param array $options associative array of options (e.g. array('GROUP BY' => 'page_title')), - * see Database::makeSelectOptions code for list of supported stuff - * @param $join_conds Array: Associative array of table join conditions (optional) - * (e.g. array( 'page' => array('LEFT JOIN','page_latest=rev_id') ) - * @return Mixed: database result resource (feed to Database::fetchObject or whatever), or false on failure + * @param mixed $table Array or string, table name(s) (prefix auto-added) + * @param mixed $vars Array or string, field name(s) to be retrieved + * @param mixed $conds Array or string, condition(s) for WHERE + * @param string $fname Calling function name (use __METHOD__) for logs/profiling + * @param array $options Associative array of options (e.g. + * array('GROUP BY' => 'page_title')), see Database::makeSelectOptions + * code for list of supported stuff + * @param array $join_conds Associative array of table join conditions + * (optional) (e.g. array( 'page' => array('LEFT JOIN','page_latest=rev_id') ) + * @return mixed Database result resource (feed to Database::fetchObject + * or whatever), or false on failure */ - function select( $table, $vars, $conds = '', $fname = __METHOD__, $options = array(), $join_conds = array() ) - { + public function select( $table, $vars, $conds = '', $fname = __METHOD__, + $options = array(), $join_conds = array() + ) { $sql = $this->selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds ); if ( isset( $options['EXPLAIN'] ) ) { - sqlsrv_query( $this->mConn, "SET SHOWPLAN_ALL ON;" ); - $ret = $this->query( $sql, $fname ); - sqlsrv_query( $this->mConn, "SET SHOWPLAN_ALL OFF;" ); + try { + $this->mScrollableCursor = false; + $this->mPrepareStatements = false; + $this->query( "SET SHOWPLAN_ALL ON" ); + $ret = $this->query( $sql, $fname ); + $this->query( "SET SHOWPLAN_ALL OFF" ); + } catch ( DBQueryError $dqe ) { + if ( isset( $options['FOR COUNT'] ) ) { + // likely don't have privs for SHOWPLAN, so run a select count instead + $this->query( "SET SHOWPLAN_ALL OFF" ); + unset( $options['EXPLAIN'] ); + $ret = $this->select( + $table, + 'COUNT(*) AS EstimateRows', + $conds, + $fname, + $options, + $join_conds + ); + } else { + // someone actually wanted the query plan instead of an est row count + // let them know of the error + $this->mScrollableCursor = true; + $this->mPrepareStatements = true; + throw $dqe; + } + } + $this->mScrollableCursor = true; + $this->mPrepareStatements = true; return $ret; } return $this->query( $sql, $fname ); @@ -312,21 +423,70 @@ class DatabaseMssql extends DatabaseBase { /** * SELECT wrapper * - * @param $table Mixed: Array or string, table name(s) (prefix auto-added) - * @param $vars Mixed: Array or string, field name(s) to be retrieved - * @param $conds Mixed: Array or string, condition(s) for WHERE - * @param $fname String: Calling function name (use __METHOD__) for logs/profiling - * @param array $options Associative array of options (e.g. array('GROUP BY' => 'page_title')), - * see Database::makeSelectOptions code for list of supported stuff - * @param $join_conds Array: Associative array of table join conditions (optional) - * (e.g. array( 'page' => array('LEFT JOIN','page_latest=rev_id') ) - * @return string, the SQL text + * @param mixed $table Array or string, table name(s) (prefix auto-added) + * @param mixed $vars Array or string, field name(s) to be retrieved + * @param mixed $conds Array or string, condition(s) for WHERE + * @param string $fname Calling function name (use __METHOD__) for logs/profiling + * @param array $options Associative array of options (e.g. array('GROUP BY' => 'page_title')), + * see Database::makeSelectOptions code for list of supported stuff + * @param array $join_conds Associative array of table join conditions (optional) + * (e.g. array( 'page' => array('LEFT JOIN','page_latest=rev_id') ) + * @return string The SQL text */ - function selectSQLText( $table, $vars, $conds = '', $fname = __METHOD__, $options = array(), $join_conds = array() ) { + public function selectSQLText( $table, $vars, $conds = '', $fname = __METHOD__, + $options = array(), $join_conds = array() + ) { if ( isset( $options['EXPLAIN'] ) ) { unset( $options['EXPLAIN'] ); } - return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds ); + + $sql = parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds ); + + // try to rewrite aggregations of bit columns (currently MAX and MIN) + if ( strpos( $sql, 'MAX(' ) !== false || strpos( $sql, 'MIN(' ) !== false ) { + $bitColumns = array(); + if ( is_array( $table ) ) { + foreach ( $table as $t ) { + $bitColumns += $this->getBitColumns( $this->tableName( $t ) ); + } + } else { + $bitColumns = $this->getBitColumns( $this->tableName( $table ) ); + } + + foreach ( $bitColumns as $col => $info ) { + $replace = array( + "MAX({$col})" => "MAX(CAST({$col} AS tinyint))", + "MIN({$col})" => "MIN(CAST({$col} AS tinyint))", + ); + $sql = str_replace( array_keys( $replace ), array_values( $replace ), $sql ); + } + } + + return $sql; + } + + public function deleteJoin( $delTable, $joinTable, $delVar, $joinVar, $conds, + $fname = __METHOD__ + ) { + $this->mScrollableCursor = false; + try { + parent::deleteJoin( $delTable, $joinTable, $delVar, $joinVar, $conds, $fname ); + } catch ( Exception $e ) { + $this->mScrollableCursor = true; + throw $e; + } + $this->mScrollableCursor = true; + } + + public function delete( $table, $conds, $fname = __METHOD__ ) { + $this->mScrollableCursor = false; + try { + parent::delete( $table, $conds, $fname ); + } catch ( Exception $e ) { + $this->mScrollableCursor = true; + throw $e; + } + $this->mScrollableCursor = true; } /** @@ -335,30 +495,45 @@ class DatabaseMssql extends DatabaseBase { * This is not necessarily an accurate estimate, so use sparingly * Returns -1 if count cannot be found * Takes same arguments as Database::select() + * @param string $table + * @param string $vars + * @param string $conds + * @param string $fname + * @param array $options * @return int */ - function estimateRowCount( $table, $vars = '*', $conds = '', $fname = __METHOD__, $options = array() ) { - $options['EXPLAIN'] = true;// http://msdn2.microsoft.com/en-us/library/aa259203.aspx + public function estimateRowCount( $table, $vars = '*', $conds = '', + $fname = __METHOD__, $options = array() + ) { + // http://msdn2.microsoft.com/en-us/library/aa259203.aspx + $options['EXPLAIN'] = true; + $options['FOR COUNT'] = true; $res = $this->select( $table, $vars, $conds, $fname, $options ); $rows = -1; if ( $res ) { $row = $this->fetchRow( $res ); + if ( isset( $row['EstimateRows'] ) ) { $rows = $row['EstimateRows']; } } + return $rows; } /** * Returns information about an index * If errors are explicitly ignored, returns NULL on failure + * @param string $table + * @param string $index + * @param string $fname * @return array|bool|null */ - function indexInfo( $table, $index, $fname = __METHOD__ ) { - # This does not return the same info as MYSQL would, but that's OK because MediaWiki never uses the - # returned value except to check for the existance of indexes. + public function indexInfo( $table, $index, $fname = __METHOD__ ) { + # This does not return the same info as MYSQL would, but that's OK + # because MediaWiki never uses the returned value except to check for + # the existance of indexes. $sql = "sp_helpindex '" . $table . "'"; $res = $this->query( $sql, $fname ); if ( !$res ) { @@ -383,6 +558,7 @@ class DatabaseMssql extends DatabaseBase { } } } + return empty( $result ) ? false : $result; } @@ -401,7 +577,7 @@ class DatabaseMssql extends DatabaseBase { * @throws DBQueryError * @return bool */ - function insert( $table, $arrToInsert, $fname = __METHOD__, $options = array() ) { + public function insert( $table, $arrToInsert, $fname = __METHOD__, $options = array() ) { # No rows to insert, easy just return now if ( !count( $arrToInsert ) ) { return true; @@ -413,24 +589,39 @@ class DatabaseMssql extends DatabaseBase { $table = $this->tableName( $table ); - if ( !( isset( $arrToInsert[0] ) && is_array( $arrToInsert[0] ) ) ) {// Not multi row - $arrToInsert = array( 0 => $arrToInsert );// make everything multi row compatible + if ( !( isset( $arrToInsert[0] ) && is_array( $arrToInsert[0] ) ) ) { // Not multi row + $arrToInsert = array( 0 => $arrToInsert ); // make everything multi row compatible } - $allOk = true; - // We know the table we're inserting into, get its identity column $identity = null; - $tableRaw = preg_replace( '#\[([^\]]*)\]#', '$1', $table ); // strip matching square brackets from table name - $res = $this->doQuery( "SELECT NAME AS idColumn FROM SYS.IDENTITY_COLUMNS WHERE OBJECT_NAME(OBJECT_ID)='{$tableRaw}'" ); - if ( $res && $res->numrows() ) { + // strip matching square brackets and the db/schema from table name + $tableRawArr = explode( '.', preg_replace( '#\[([^\]]*)\]#', '$1', $table ) ); + $tableRaw = array_pop( $tableRawArr ); + $res = $this->doQuery( + "SELECT NAME AS idColumn FROM SYS.IDENTITY_COLUMNS " . + "WHERE OBJECT_NAME(OBJECT_ID)='{$tableRaw}'" + ); + if ( $res && sqlsrv_has_rows( $res ) ) { // There is an identity for this table. - $identity = array_pop( $res->fetch( SQLSRV_FETCH_ASSOC ) ); + $identityArr = sqlsrv_fetch_array( $res, SQLSRV_FETCH_ASSOC ); + $identity = array_pop( $identityArr ); + } + sqlsrv_free_stmt( $res ); + + // Determine binary/varbinary fields so we can encode data as a hex string like 0xABCDEF + $binaryColumns = $this->getBinaryColumns( $table ); + + // INSERT IGNORE is not supported by SQL Server + // remove IGNORE from options list and set ignore flag to true + if ( in_array( 'IGNORE', $options ) ) { + $options = array_diff( $options, array( 'IGNORE' ) ); + $this->mIgnoreDupKeyErrors = true; } - unset( $res ); foreach ( $arrToInsert as $a ) { - // start out with empty identity column, this is so we can return it as a result of the insert logic + // start out with empty identity column, this is so we can return + // it as a result of the insert logic $sqlPre = ''; $sqlPost = ''; $identityClause = ''; @@ -441,152 +632,246 @@ class DatabaseMssql extends DatabaseBase { foreach ( $a as $k => $v ) { if ( $k == $identity ) { if ( !is_null( $v ) ) { - // there is a value being passed to us, we need to turn on and off inserted identity + // there is a value being passed to us, + // we need to turn on and off inserted identity $sqlPre = "SET IDENTITY_INSERT $table ON;"; $sqlPost = ";SET IDENTITY_INSERT $table OFF;"; - } else { - // we can't insert NULL into an identity column, so remove the column from the insert. + // we can't insert NULL into an identity column, + // so remove the column from the insert. unset( $a[$k] ); } } } - $identityClause = "OUTPUT INSERTED.$identity "; // we want to output an identity column as result - } - - $keys = array_keys( $a ); - // INSERT IGNORE is not supported by SQL Server - // remove IGNORE from options list and set ignore flag to true - $ignoreClause = false; - foreach ( $options as $k => $v ) { - if ( strtoupper( $v ) == "IGNORE" ) { - unset( $options[$k] ); - $ignoreClause = true; - } + // we want to output an identity column as result + $identityClause = "OUTPUT INSERTED.$identity "; } - // translate MySQL INSERT IGNORE to something SQL Server can use - // example: - // MySQL: INSERT IGNORE INTO user_groups (ug_user,ug_group) VALUES ('1','sysop') - // MSSQL: IF NOT EXISTS (SELECT * FROM user_groups WHERE ug_user = '1') INSERT INTO user_groups (ug_user,ug_group) VALUES ('1','sysop') - if ( $ignoreClause ) { - $prival = $a[$keys[0]]; - $sqlPre .= "IF NOT EXISTS (SELECT * FROM $table WHERE $keys[0] = '$prival')"; - } + $keys = array_keys( $a ); // Build the actual query $sql = $sqlPre . 'INSERT ' . implode( ' ', $options ) . " INTO $table (" . implode( ',', $keys ) . ") $identityClause VALUES ("; $first = true; - foreach ( $a as $value ) { + foreach ( $a as $key => $value ) { + if ( isset( $binaryColumns[$key] ) ) { + $value = new MssqlBlob( $value ); + } if ( $first ) { $first = false; } else { $sql .= ','; } - if ( is_string( $value ) ) { - $sql .= $this->addQuotes( $value ); - } elseif ( is_null( $value ) ) { + if ( is_null( $value ) ) { $sql .= 'null'; } elseif ( is_array( $value ) || is_object( $value ) ) { - if ( is_object( $value ) && strtolower( get_class( $value ) ) == 'blob' ) { + if ( is_object( $value ) && $value instanceof Blob ) { $sql .= $this->addQuotes( $value ); } else { $sql .= $this->addQuotes( serialize( $value ) ); } } else { - $sql .= $value; + $sql .= $this->addQuotes( $value ); } } $sql .= ')' . $sqlPost; // Run the query - $ret = sqlsrv_query( $this->mConn, $sql ); - - if ( $ret === false ) { - throw new DBQueryError( $this, $this->getErrors(), $this->lastErrno(), $sql, $fname ); - } elseif ( $ret != null ) { - // remember number of rows affected - $this->mAffectedRows = sqlsrv_rows_affected( $ret ); - if ( !is_null( $identity ) ) { - // then we want to get the identity column value we were assigned and save it off - $row = sqlsrv_fetch_object( $ret ); + $this->mScrollableCursor = false; + try { + $ret = $this->query( $sql ); + } catch ( Exception $e ) { + $this->mScrollableCursor = true; + $this->mIgnoreDupKeyErrors = false; + throw $e; + } + $this->mScrollableCursor = true; + + if ( !is_null( $identity ) ) { + // then we want to get the identity column value we were assigned and save it off + $row = $ret->fetchObject(); + if( is_object( $row ) ){ $this->mInsertId = $row->$identity; } - sqlsrv_free_stmt( $ret ); - continue; } - $allOk = false; } - return $allOk; + $this->mIgnoreDupKeyErrors = false; + return $ret; } /** * INSERT SELECT wrapper * $varMap must be an associative array of the form array( 'dest1' => 'source1', ...) - * Source items may be literals rather than field names, but strings should be quoted with Database::addQuotes() - * $conds may be "*" to copy the whole table - * srcTable may be an array of tables. + * Source items may be literals rather than field names, but strings should + * be quoted with Database::addQuotes(). * @param string $destTable - * @param array|string $srcTable + * @param array|string $srcTable May be an array of tables. * @param array $varMap - * @param array $conds + * @param array $conds May be "*" to copy the whole table. * @param string $fname * @param array $insertOptions * @param array $selectOptions * @throws DBQueryError * @return null|ResultWrapper */ - function insertSelect( $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__, - $insertOptions = array(), $selectOptions = array() ) { - $ret = parent::insertSelect( $destTable, $srcTable, $varMap, $conds, $fname, $insertOptions, $selectOptions ); - - if ( $ret === false ) { - throw new DBQueryError( $this, $this->getErrors(), $this->lastErrno(), /*$sql*/ '', $fname ); - } elseif ( $ret != null ) { - // remember number of rows affected - $this->mAffectedRows = sqlsrv_rows_affected( $ret ); - return $ret; + public function insertSelect( $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__, + $insertOptions = array(), $selectOptions = array() + ) { + $this->mScrollableCursor = false; + try { + $ret = parent::insertSelect( + $destTable, + $srcTable, + $varMap, + $conds, + $fname, + $insertOptions, + $selectOptions + ); + } catch ( Exception $e ) { + $this->mScrollableCursor = true; + throw $e; } - return null; + $this->mScrollableCursor = true; + + return $ret; } /** - * Return the next in a sequence, save the value for retrieval via insertId() - * @return + * UPDATE wrapper. Takes a condition array and a SET array. + * + * @param string $table Name of the table to UPDATE. This will be passed through + * DatabaseBase::tableName(). + * + * @param array $values An array of values to SET. For each array element, + * the key gives the field name, and the value gives the data + * to set that field to. The data will be quoted by + * DatabaseBase::addQuotes(). + * + * @param array $conds An array of conditions (WHERE). See + * DatabaseBase::select() for the details of the format of + * condition arrays. Use '*' to update all rows. + * + * @param string $fname The function name of the caller (from __METHOD__), + * for logging and profiling. + * + * @param array $options An array of UPDATE options, can be: + * - IGNORE: Ignore unique key conflicts + * - LOW_PRIORITY: MySQL-specific, see MySQL manual. + * @return bool */ - function nextSequenceValue( $seqName ) { - if ( !$this->tableExists( 'sequence_' . $seqName ) ) { - sqlsrv_query( $this->mConn, "CREATE TABLE [sequence_$seqName] (id INT NOT NULL IDENTITY PRIMARY KEY, junk varchar(10) NULL)" ); + function update( $table, $values, $conds, $fname = __METHOD__, $options = array() ) { + $table = $this->tableName( $table ); + $binaryColumns = $this->getBinaryColumns( $table ); + + $opts = $this->makeUpdateOptions( $options ); + $sql = "UPDATE $opts $table SET " . $this->makeList( $values, LIST_SET, $binaryColumns ); + + if ( $conds !== array() && $conds !== '*' ) { + $sql .= " WHERE " . $this->makeList( $conds, LIST_AND, $binaryColumns ); } - sqlsrv_query( $this->mConn, "INSERT INTO [sequence_$seqName] (junk) VALUES ('')" ); - $ret = sqlsrv_query( $this->mConn, "SELECT TOP 1 id FROM [sequence_$seqName] ORDER BY id DESC" ); - $row = sqlsrv_fetch_array( $ret, SQLSRV_FETCH_ASSOC );// KEEP ASSOC THERE, weird weird bug dealing with the return value if you don't - sqlsrv_free_stmt( $ret ); - $this->mInsertId = $row['id']; - return $row['id']; + $this->mScrollableCursor = false; + try { + $ret = $this->query( $sql ); + } catch ( Exception $e ) { + $this->mScrollableCursor = true; + throw $e; + } + $this->mScrollableCursor = true; + return true; } /** - * Return the current value of a sequence. Assumes it has ben nextval'ed in this session. - * @return + * Makes an encoded list of strings from an array + * @param array $a Containing the data + * @param int $mode Constant + * - LIST_COMMA: comma separated, no field names + * - LIST_AND: ANDed WHERE clause (without the WHERE). See + * the documentation for $conds in DatabaseBase::select(). + * - LIST_OR: ORed WHERE clause (without the WHERE) + * - LIST_SET: comma separated with field names, like a SET clause + * - LIST_NAMES: comma separated field names + * @param array $binaryColumns Contains a list of column names that are binary types + * This is a custom parameter only present for MS SQL. + * + * @throws MWException|DBUnexpectedError + * @return string */ - function currentSequenceValue( $seqName ) { - $ret = sqlsrv_query( $this->mConn, "SELECT TOP 1 id FROM [sequence_$seqName] ORDER BY id DESC" ); - if ( $ret !== false ) { - $row = sqlsrv_fetch_array( $ret ); - sqlsrv_free_stmt( $ret ); - return $row['id']; - } else { - return $this->nextSequenceValue( $seqName ); + public function makeList( $a, $mode = LIST_COMMA, $binaryColumns = array() ) { + if ( !is_array( $a ) ) { + throw new DBUnexpectedError( $this, + 'DatabaseBase::makeList called with incorrect parameters' ); } + + $first = true; + $list = ''; + + foreach ( $a as $field => $value ) { + if ( $mode != LIST_NAMES && isset( $binaryColumns[$field] ) ) { + if ( is_array( $value ) ) { + foreach ( $value as &$v ) { + $v = new MssqlBlob( $v ); + } + } else { + $value = new MssqlBlob( $value ); + } + } + + if ( !$first ) { + if ( $mode == LIST_AND ) { + $list .= ' AND '; + } elseif ( $mode == LIST_OR ) { + $list .= ' OR '; + } else { + $list .= ','; + } + } else { + $first = false; + } + + if ( ( $mode == LIST_AND || $mode == LIST_OR ) && is_numeric( $field ) ) { + $list .= "($value)"; + } elseif ( ( $mode == LIST_SET ) && is_numeric( $field ) ) { + $list .= "$value"; + } elseif ( ( $mode == LIST_AND || $mode == LIST_OR ) && is_array( $value ) ) { + if ( count( $value ) == 0 ) { + throw new MWException( __METHOD__ . ": empty input for field $field" ); + } elseif ( count( $value ) == 1 ) { + // Special-case single values, as IN isn't terribly efficient + // Don't necessarily assume the single key is 0; we don't + // enforce linear numeric ordering on other arrays here. + $value = array_values( $value ); + $list .= $field . " = " . $this->addQuotes( $value[0] ); + } else { + $list .= $field . " IN (" . $this->makeList( $value ) . ") "; + } + } elseif ( $value === null ) { + if ( $mode == LIST_AND || $mode == LIST_OR ) { + $list .= "$field IS "; + } elseif ( $mode == LIST_SET ) { + $list .= "$field = "; + } + $list .= 'NULL'; + } else { + if ( $mode == LIST_AND || $mode == LIST_OR || $mode == LIST_SET ) { + $list .= "$field = "; + } + $list .= $mode == LIST_NAMES ? $value : $this->addQuotes( $value ); + } + } + + return $list; } - # Returns the size of a text field, or -1 for "unlimited" - function textFieldSize( $table, $field ) { + /** + * @param string $table + * @param string $field + * @return int Returns the size of a text field, or -1 for "unlimited" + */ + public function textFieldSize( $table, $field ) { $table = $this->tableName( $table ); $sql = "SELECT CHARACTER_MAXIMUM_LENGTH,DATA_TYPE FROM INFORMATION_SCHEMA.Columns WHERE TABLE_NAME = '$table' AND COLUMN_NAME = '$field'"; @@ -596,40 +881,75 @@ class DatabaseMssql extends DatabaseBase { if ( strtolower( $row['DATA_TYPE'] ) != 'text' ) { $size = $row['CHARACTER_MAXIMUM_LENGTH']; } + return $size; } /** * Construct a LIMIT query with optional offset * This is used for query pages - * $sql string SQL query we will append the limit too - * $limit integer the SQL limit - * $offset integer the SQL offset (default false) - * @return mixed|string + * + * @param string $sql SQL query we will append the limit too + * @param int $limit The SQL limit + * @param bool|int $offset The SQL offset (default false) + * @return array|string */ - function limitResult( $sql, $limit, $offset = false ) { + public function limitResult( $sql, $limit, $offset = false ) { if ( $offset === false || $offset == 0 ) { if ( strpos( $sql, "SELECT" ) === false ) { return "TOP {$limit} " . $sql; } else { - return preg_replace( '/\bSELECT(\s*DISTINCT)?\b/Dsi', 'SELECT$1 TOP ' . $limit, $sql, 1 ); + return preg_replace( '/\bSELECT(\s+DISTINCT)?\b/Dsi', + 'SELECT$1 TOP ' . $limit, $sql, 1 ); } } else { - $sql = ' - SELECT * FROM ( - SELECT sub2.*, ROW_NUMBER() OVER(ORDER BY sub2.line2) AS line3 FROM ( - SELECT 1 AS line2, sub1.* FROM (' . $sql . ') AS sub1 - ) as sub2 - ) AS sub3 - WHERE line3 BETWEEN ' . ( $offset + 1 ) . ' AND ' . ( $offset + $limit ); + // This one is fun, we need to pull out the select list as well as any ORDER BY clause + $select = $orderby = array(); + $s1 = preg_match( '#SELECT\s+(.+?)\s+FROM#Dis', $sql, $select ); + $s2 = preg_match( '#(ORDER BY\s+.+?)(\s*FOR XML .*)?$#Dis', $sql, $orderby ); + $overOrder = $postOrder = ''; + $first = $offset + 1; + $last = $offset + $limit; + $sub1 = 'sub_' . $this->mSubqueryId; + $sub2 = 'sub_' . ( $this->mSubqueryId + 1 ); + $this->mSubqueryId += 2; + if ( !$s1 ) { + // wat + throw new DBUnexpectedError( $this, "Attempting to LIMIT a non-SELECT query\n" ); + } + if ( !$s2 ) { + // no ORDER BY + $overOrder = 'ORDER BY (SELECT 1)'; + } else { + if ( !isset( $orderby[2] ) || !$orderby[2] ) { + // don't need to strip it out if we're using a FOR XML clause + $sql = str_replace( $orderby[1], '', $sql ); + } + $overOrder = $orderby[1]; + $postOrder = ' ' . $overOrder; + } + $sql = "SELECT {$select[1]} + FROM ( + SELECT ROW_NUMBER() OVER({$overOrder}) AS rowNumber, * + FROM ({$sql}) {$sub1} + ) {$sub2} + WHERE rowNumber BETWEEN {$first} AND {$last}{$postOrder}"; + return $sql; } } - // If there is a limit clause, parse it, strip it, and pass the remaining sql through limitResult() - // with the appropriate parameters. Not the prettiest solution, but better than building a whole new parser. - // This exists becase there are still too many extensions that don't use dynamic sql generation. - function LimitToTopN( $sql ) { + /** + * If there is a limit clause, parse it, strip it, and pass the remaining + * SQL through limitResult() with the appropriate parameters. Not the + * prettiest solution, but better than building a whole new parser. This + * exists becase there are still too many extensions that don't use dynamic + * sql generation. + * + * @param string $sql + * @return array|mixed|string + */ + public function LimitToTopN( $sql ) { // Matches: LIMIT {[offset,] row_count | row_count OFFSET offset} $pattern = '/\bLIMIT\s+((([0-9]+)\s*,\s*)?([0-9]+)(\s+OFFSET\s+([0-9]+))?)/i'; if ( preg_match( $pattern, $sql, $matches ) ) { @@ -637,22 +957,20 @@ class DatabaseMssql extends DatabaseBase { $row_count = $matches[4]; // offset = $matches[3] OR $matches[6] $offset = $matches[3] or - $offset = $matches[6] or - $offset = false; + $offset = $matches[6] or + $offset = false; // strip the matching LIMIT clause out $sql = str_replace( $matches[0], '', $sql ); + return $this->limitResult( $sql, $row_count, $offset ); } - return $sql; - } - function timestamp( $ts = 0 ) { - return wfTimestamp( TS_ISO_8601, $ts ); + return $sql; } /** - * @return string wikitext of a link to the server software's web site + * @return string Wikitext of a link to the server software's web site */ public function getSoftwareLink() { return "[{{int:version-db-mssql-url}} MS SQL Server]"; @@ -661,23 +979,40 @@ class DatabaseMssql extends DatabaseBase { /** * @return string Version information from the database */ - function getServerVersion() { + public function getServerVersion() { $server_info = sqlsrv_server_info( $this->mConn ); $version = 'Error'; if ( isset( $server_info['SQLServerVersion'] ) ) { $version = $server_info['SQLServerVersion']; } + return $version; } - function tableExists( $table, $fname = __METHOD__, $schema = false ) { - $res = sqlsrv_query( $this->mConn, "SELECT * FROM information_schema.tables - WHERE table_type='BASE TABLE' AND table_name = '$table'" ); - if ( $res === false ) { - print "Error in tableExists query: " . $this->getErrors(); + /** + * @param string $table + * @param string $fname + * @return bool + */ + public function tableExists( $table, $fname = __METHOD__ ) { + list( $db, $schema, $table ) = $this->tableName( $table, 'split' ); + + if ( $db !== false ) { + // remote database + wfDebug( "Attempting to call tableExists on a remote table" ); return false; } - if ( sqlsrv_fetch( $res ) ) { + + if ( $schema === false ) { + global $wgDBmwschema; + $schema = $wgDBmwschema; + } + + $res = $this->query( "SELECT 1 FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_TYPE = 'BASE TABLE' + AND TABLE_SCHEMA = '$schema' AND TABLE_NAME = '$table'" ); + + if ( $res->numRows() ) { return true; } else { return false; @@ -686,40 +1021,53 @@ class DatabaseMssql extends DatabaseBase { /** * Query whether a given column exists in the mediawiki schema + * @param string $table + * @param string $field + * @param string $fname * @return bool */ - function fieldExists( $table, $field, $fname = __METHOD__ ) { - $table = $this->tableName( $table ); - $res = sqlsrv_query( $this->mConn, "SELECT DATA_TYPE FROM INFORMATION_SCHEMA.Columns - WHERE TABLE_NAME = '$table' AND COLUMN_NAME = '$field'" ); - if ( $res === false ) { - print "Error in fieldExists query: " . $this->getErrors(); + public function fieldExists( $table, $field, $fname = __METHOD__ ) { + list( $db, $schema, $table ) = $this->tableName( $table, 'split' ); + + if ( $db !== false ) { + // remote database + wfDebug( "Attempting to call fieldExists on a remote table" ); return false; } - if ( sqlsrv_fetch( $res ) ) { + + $res = $this->query( "SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '$schema' AND TABLE_NAME = '$table' AND COLUMN_NAME = '$field'" ); + + if ( $res->numRows() ) { return true; } else { return false; } } - function fieldInfo( $table, $field ) { - $table = $this->tableName( $table ); - $res = sqlsrv_query( $this->mConn, "SELECT * FROM INFORMATION_SCHEMA.Columns - WHERE TABLE_NAME = '$table' AND COLUMN_NAME = '$field'" ); - if ( $res === false ) { - print "Error in fieldInfo query: " . $this->getErrors(); + public function fieldInfo( $table, $field ) { + list( $db, $schema, $table ) = $this->tableName( $table, 'split' ); + + if ( $db !== false ) { + // remote database + wfDebug( "Attempting to call fieldInfo on a remote table" ); return false; } - $meta = $this->fetchRow( $res ); + + $res = $this->query( "SELECT * FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = '$schema' AND TABLE_NAME = '$table' AND COLUMN_NAME = '$field'" ); + + $meta = $res->fetchRow(); if ( $meta ) { return new MssqlField( $meta ); } + return false; } /** * Begin a transaction, committing any previously open transaction + * @param string $fname */ protected function doBegin( $fname = __METHOD__ ) { sqlsrv_begin_transaction( $this->mConn ); @@ -728,6 +1076,7 @@ class DatabaseMssql extends DatabaseBase { /** * End a transaction + * @param string $fname */ protected function doCommit( $fname = __METHOD__ ) { sqlsrv_commit( $this->mConn ); @@ -737,6 +1086,7 @@ class DatabaseMssql extends DatabaseBase { /** * Rollback a transaction. * No-op on non-transactional databases. + * @param string $fname */ protected function doRollback( $fname = __METHOD__ ) { sqlsrv_rollback( $this->mConn ); @@ -747,7 +1097,7 @@ class DatabaseMssql extends DatabaseBase { * Escapes a identifier for use inm SQL. * Throws an exception if it is invalid. * Reference: http://msdn.microsoft.com/en-us/library/aa224033%28v=SQL.80%29.aspx - * @param $identifier + * @param string $identifier * @throws MWException * @return string */ @@ -758,153 +1108,82 @@ class DatabaseMssql extends DatabaseBase { if ( strlen( $identifier ) > 128 ) { throw new MWException( "The identifier '$identifier' is too long (max. 128)" ); } - if ( ( strpos( $identifier, '[' ) !== false ) || ( strpos( $identifier, ']' ) !== false ) ) { - // It may be allowed if you quoted with double quotation marks, but that would break if QUOTED_IDENTIFIER is OFF - throw new MWException( "You can't use square brackers in the identifier '$identifier'" ); + if ( ( strpos( $identifier, '[' ) !== false ) + || ( strpos( $identifier, ']' ) !== false ) + ) { + // It may be allowed if you quoted with double quotation marks, but + // that would break if QUOTED_IDENTIFIER is OFF + throw new MWException( "Square brackets are not allowed in '$identifier'" ); } + return "[$identifier]"; } /** - * Initial setup. - * Precondition: This object is connected as the superuser. - * Creates the database, schema, user and login. + * @param string $s + * @return string */ - function initial_setup( $dbName, $newUser, $loginPassword ) { - $dbName = $this->escapeIdentifier( $dbName ); - - // It is not clear what can be used as a login, - // From http://msdn.microsoft.com/en-us/library/ms173463.aspx - // a sysname may be the same as an identifier. - $newUser = $this->escapeIdentifier( $newUser ); - $loginPassword = $this->addQuotes( $loginPassword ); - - $this->doQuery( "CREATE DATABASE $dbName;" ); - $this->doQuery( "USE $dbName;" ); - $this->doQuery( "CREATE SCHEMA $dbName;" ); - $this->doQuery( " - CREATE - LOGIN $newUser - WITH - PASSWORD=$loginPassword - ; - " ); - $this->doQuery( " - CREATE - USER $newUser - FOR - LOGIN $newUser - WITH - DEFAULT_SCHEMA=$dbName - ; - " ); - $this->doQuery( " - GRANT - BACKUP DATABASE, - BACKUP LOG, - CREATE DEFAULT, - CREATE FUNCTION, - CREATE PROCEDURE, - CREATE RULE, - CREATE TABLE, - CREATE VIEW, - CREATE FULLTEXT CATALOG - ON - DATABASE::$dbName - TO $newUser - ; - " ); - $this->doQuery( " - GRANT - CONTROL - ON - SCHEMA::$dbName - TO $newUser - ; - " ); - } - - function encodeBlob( $b ) { - // we can't have zero's and such, this is a simple encoding to make sure we don't barf - return base64_encode( $b ); - } - - function decodeBlob( $b ) { - // we can't have zero's and such, this is a simple encoding to make sure we don't barf - return base64_decode( $b ); + public function strencode( $s ) { # Should not be called by us + return str_replace( "'", "''", $s ); } /** - * @private + * @param string $s * @return string */ - function tableNamesWithUseIndexOrJOIN( $tables, $use_index = array(), $join_conds = array() ) { - $ret = array(); - $retJOIN = array(); - $use_index_safe = is_array( $use_index ) ? $use_index : array(); - $join_conds_safe = is_array( $join_conds ) ? $join_conds : array(); - foreach ( $tables as $table ) { - // Is there a JOIN and INDEX clause for this table? - if ( isset( $join_conds_safe[$table] ) && isset( $use_index_safe[$table] ) ) { - $tableClause = $join_conds_safe[$table][0] . ' ' . $this->tableName( $table ); - $tableClause .= ' ' . $this->useIndexClause( implode( ',', (array)$use_index_safe[$table] ) ); - $tableClause .= ' ON (' . $this->makeList( (array)$join_conds_safe[$table][1], LIST_AND ) . ')'; - $retJOIN[] = $tableClause; - // Is there an INDEX clause? - } elseif ( isset( $use_index_safe[$table] ) ) { - $tableClause = $this->tableName( $table ); - $tableClause .= ' ' . $this->useIndexClause( implode( ',', (array)$use_index_safe[$table] ) ); - $ret[] = $tableClause; - // Is there a JOIN clause? - } elseif ( isset( $join_conds_safe[$table] ) ) { - $tableClause = $join_conds_safe[$table][0] . ' ' . $this->tableName( $table ); - $tableClause .= ' ON (' . $this->makeList( (array)$join_conds_safe[$table][1], LIST_AND ) . ')'; - $retJOIN[] = $tableClause; - } else { - $tableClause = $this->tableName( $table ); - $ret[] = $tableClause; - } - } - // We can't separate explicit JOIN clauses with ',', use ' ' for those - $straightJoins = !empty( $ret ) ? implode( ',', $ret ) : ""; - $otherJoins = !empty( $retJOIN ) ? implode( ' ', $retJOIN ) : ""; - // Compile our final table clause - return implode( ' ', array( $straightJoins, $otherJoins ) ); - } - - function strencode( $s ) { # Should not be called by us - return str_replace( "'", "''", $s ); - } - - function addQuotes( $s ) { - if ( $s instanceof Blob ) { - return "'" . $s->fetch( $s ) . "'"; + public function addQuotes( $s ) { + if ( $s instanceof MssqlBlob ) { + return $s->fetch(); + } elseif ( $s instanceof Blob ) { + // this shouldn't really ever be called, but it's here if needed + // (and will quite possibly make the SQL error out) + $blob = new MssqlBlob( $s->fetch() ); + return $blob->fetch(); } else { + if ( is_bool( $s ) ) { + $s = $s ? 1 : 0; + } return parent::addQuotes( $s ); } } + /** + * @param string $s + * @return string + */ public function addIdentifierQuotes( $s ) { // http://msdn.microsoft.com/en-us/library/aa223962.aspx return '[' . $s . ']'; } + /** + * @param string $name + * @return bool + */ public function isQuotedIdentifier( $name ) { - return $name[0] == '[' && substr( $name, -1, 1 ) == ']'; + return strlen( $name ) && $name[0] == '[' && substr( $name, -1, 1 ) == ']'; } - function selectDB( $db ) { - return ( $this->query( "SET DATABASE $db" ) !== false ); + /** + * @param string $db + * @return bool + */ + public function selectDB( $db ) { + try { + $this->mDBname = $db; + $this->query( "USE $db" ); + return true; + } catch ( Exception $e ) { + return false; + } } /** - * @private - * - * @param array $options an associative array of options to be turned into - * an SQL query, valid keys are listed in the function. - * @return Array + * @param array $options An associative array of options to be turned into + * an SQL query, valid keys are listed in the function. + * @return array */ - function makeSelectOptions( $options ) { + public function makeSelectOptions( $options ) { $tailOpts = ''; $startOpts = ''; @@ -919,10 +1198,15 @@ class DatabaseMssql extends DatabaseBase { $tailOpts .= $this->makeOrderBy( $options ); - if ( isset( $noKeyOptions['DISTINCT'] ) && isset( $noKeyOptions['DISTINCTROW'] ) ) { + if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) { $startOpts .= 'DISTINCT'; } + if ( isset( $noKeyOptions['FOR XML'] ) ) { + // used in group concat field emulation + $tailOpts .= " FOR XML PATH('')"; + } + // we want this to be compatible with the output of parent::makeSelectOptions() return array( $startOpts, '', $tailOpts, '' ); } @@ -931,27 +1215,165 @@ class DatabaseMssql extends DatabaseBase { * Get the type of the DBMS, as it appears in $wgDBtype. * @return string */ - function getType() { + public function getType() { return 'mssql'; } - function buildConcat( $stringList ) { + /** + * @param array $stringList + * @return string + */ + public function buildConcat( $stringList ) { return implode( ' + ', $stringList ); } + /** + * Build a GROUP_CONCAT or equivalent statement for a query. + * MS SQL doesn't have GROUP_CONCAT so we emulate it with other stuff (and boy is it nasty) + * + * This is useful for combining a field for several rows into a single string. + * NULL values will not appear in the output, duplicated values will appear, + * and the resulting delimiter-separated values have no defined sort order. + * Code using the results may need to use the PHP unique() or sort() methods. + * + * @param string $delim Glue to bind the results together + * @param string|array $table Table name + * @param string $field Field name + * @param string|array $conds Conditions + * @param string|array $join_conds Join conditions + * @return string SQL text + * @since 1.23 + */ + public function buildGroupConcatField( $delim, $table, $field, $conds = '', + $join_conds = array() + ) { + $gcsq = 'gcsq_' . $this->mSubqueryId; + $this->mSubqueryId++; + + $delimLen = strlen( $delim ); + $fld = "{$field} + {$this->addQuotes( $delim )}"; + $sql = "(SELECT LEFT({$field}, LEN({$field}) - {$delimLen}) FROM (" + . $this->selectSQLText( $table, $fld, $conds, null, array( 'FOR XML' ), $join_conds ) + . ") {$gcsq} ({$field}))"; + + return $sql; + } + + /** + * @return string + */ public function getSearchEngine() { return "SearchMssql"; } /** - * Since MSSQL doesn't recognize the infinity keyword, set date manually. - * @todo Remove magic date + * Returns an associative array for fields that are of type varbinary, binary, or image + * $table can be either a raw table name or passed through tableName() first + * @param string $table + * @return array + */ + private function getBinaryColumns( $table ) { + $tableRawArr = explode( '.', preg_replace( '#\[([^\]]*)\]#', '$1', $table ) ); + $tableRaw = array_pop( $tableRawArr ); + + if ( $this->mBinaryColumnCache === null ) { + $this->populateColumnCaches(); + } + + return isset( $this->mBinaryColumnCache[$tableRaw] ) + ? $this->mBinaryColumnCache[$tableRaw] + : array(); + } + + /** + * @param string $table + * @return array + */ + private function getBitColumns( $table ) { + $tableRawArr = explode( '.', preg_replace( '#\[([^\]]*)\]#', '$1', $table ) ); + $tableRaw = array_pop( $tableRawArr ); + + if ( $this->mBitColumnCache === null ) { + $this->populateColumnCaches(); + } + + return isset( $this->mBitColumnCache[$tableRaw] ) + ? $this->mBitColumnCache[$tableRaw] + : array(); + } + + private function populateColumnCaches() { + $res = $this->select( 'INFORMATION_SCHEMA.COLUMNS', '*', + array( + 'TABLE_CATALOG' => $this->mDBname, + 'TABLE_SCHEMA' => $this->mSchema, + 'DATA_TYPE' => array( 'varbinary', 'binary', 'image', 'bit' ) + ) ); + + $this->mBinaryColumnCache = array(); + $this->mBitColumnCache = array(); + foreach ( $res as $row ) { + if ( $row->DATA_TYPE == 'bit' ) { + $this->mBitColumnCache[$row->TABLE_NAME][$row->COLUMN_NAME] = $row; + } else { + $this->mBinaryColumnCache[$row->TABLE_NAME][$row->COLUMN_NAME] = $row; + } + } + } + + /** + * @param string $name + * @param string $format + * @return string + */ + function tableName( $name, $format = 'quoted' ) { + # Replace reserved words with better ones + switch ( $name ) { + case 'user': + return $this->realTableName( 'mwuser', $format ); + default: + return $this->realTableName( $name, $format ); + } + } + + /** + * call this instead of tableName() in the updater when renaming tables + * @param string $name + * @param string $format One of quoted, raw, or split * @return string */ - public function getInfinity() { - return '3000-01-31 00:00:00.000'; + function realTableName( $name, $format = 'quoted' ) { + $table = parent::tableName( $name, $format ); + if ( $format == 'split' ) { + // Used internally, we want the schema split off from the table name and returned + // as a list with 3 elements (database, schema, table) + $table = explode( '.', $table ); + while ( count( $table ) < 3 ) { + array_unshift( $table, false ); + } + } + return $table; } + /** + * Called in the installer and updater. + * Probably doesn't need to be called anywhere else in the codebase. + * @param bool|null $value + * @return bool|null + */ + public function prepareStatements( $value = null ) { + return wfSetVar( $this->mPrepareStatements, $value ); + } + + /** + * Called in the installer and updater. + * Probably doesn't need to be called anywhere else in the codebase. + * @param bool|null $value + * @return bool|null + */ + public function scrollableCursor( $value = null ) { + return wfSetVar( $this->mScrollableCursor, $value ); + } } // end DatabaseMssql class /** @@ -960,10 +1382,11 @@ class DatabaseMssql extends DatabaseBase { * @ingroup Database */ class MssqlField implements Field { - private $name, $tablename, $default, $max_length, $nullable, $type; + private $name, $tableName, $default, $max_length, $nullable, $type; + function __construct( $info ) { $this->name = $info['COLUMN_NAME']; - $this->tablename = $info['TABLE_NAME']; + $this->tableName = $info['TABLE_NAME']; $this->default = $info['COLUMN_DEFAULT']; $this->max_length = $info['CHARACTER_MAXIMUM_LENGTH']; $this->nullable = !( strtolower( $info['IS_NULLABLE'] ) == 'no' ); @@ -995,208 +1418,105 @@ class MssqlField implements Field { } } -/** - * The MSSQL PHP driver doesn't support sqlsrv_num_rows, so we recall all rows into an array and maintain our - * own cursor index into that array...This is similar to the way the Oracle driver handles this same issue - * - * @ingroup Database - */ -class MssqlResult { - - public function __construct( $queryresult = false ) { - $this->mCursor = 0; - $this->mRows = array(); - $this->mNumFields = sqlsrv_num_fields( $queryresult ); - $this->mFieldMeta = sqlsrv_field_metadata( $queryresult ); +class MssqlBlob extends Blob { + public function __construct( $data ) { + if ( $data instanceof MssqlBlob ) { + return $data; + } elseif ( $data instanceof Blob ) { + $this->mData = $data->fetch(); + } elseif ( is_array( $data ) && is_object( $data ) ) { + $this->mData = serialize( $data ); + } else { + $this->mData = $data; + } + } - $rows = sqlsrv_fetch_array( $queryresult, SQLSRV_FETCH_ASSOC ); + /** + * Returns an unquoted hex representation of a binary string + * for insertion into varbinary-type fields + * @return string + */ + public function fetch() { + if ( $this->mData === null ) { + return 'null'; + } - foreach ( $rows as $row ) { - if ( $row !== null ) { - foreach ( $row as $k => $v ) { - if ( is_object( $v ) && method_exists( $v, 'format' ) ) {// DateTime Object - $row[$k] = $v->format( "Y-m-d\TH:i:s\Z" ); - } - } - $this->mRows[] = $row;// read results into memory, cursors are not supported - } + $ret = '0x'; + $dataLength = strlen( $this->mData ); + for ( $i = 0; $i < $dataLength; $i++ ) { + $ret .= bin2hex( pack( 'C', ord( $this->mData[$i] ) ) ); } - $this->mRowCount = count( $this->mRows ); - sqlsrv_free_stmt( $queryresult ); + + return $ret; } +} - private function array_to_obj( $array, &$obj ) { - foreach ( $array as $key => $value ) { - if ( is_array( $value ) ) { - $obj->$key = new stdClass(); - $this->array_to_obj( $value, $obj->$key ); - } else { - if ( !empty( $key ) ) { - $obj->$key = $value; - } - } +class MssqlResultWrapper extends ResultWrapper { + private $mSeekTo = null; + + /** + * @return stdClass|bool + */ + public function fetchObject() { + $res = $this->result; + + if ( $this->mSeekTo !== null ) { + $result = sqlsrv_fetch_object( $res, 'stdClass', array(), + SQLSRV_SCROLL_ABSOLUTE, $this->mSeekTo ); + $this->mSeekTo = null; + } else { + $result = sqlsrv_fetch_object( $res ); } - return $obj; - } - public function fetch( $mode = SQLSRV_FETCH_BOTH, $object_class = 'stdClass' ) { - if ( $this->mCursor >= $this->mRowCount || $this->mRowCount == 0 ) { + // MediaWiki expects us to return boolean false when there are no more rows instead of null + if ( $result === null ) { return false; } - $arrNum = array(); - if ( $mode == SQLSRV_FETCH_NUMERIC || $mode == SQLSRV_FETCH_BOTH ) { - foreach ( $this->mRows[$this->mCursor] as $value ) { - $arrNum[] = $value; - } - } - switch ( $mode ) { - case SQLSRV_FETCH_ASSOC: - $ret = $this->mRows[$this->mCursor]; - break; - case SQLSRV_FETCH_NUMERIC: - $ret = $arrNum; - break; - case 'OBJECT': - $o = new $object_class; - $ret = $this->array_to_obj( $this->mRows[$this->mCursor], $o ); - break; - case SQLSRV_FETCH_BOTH: - default: - $ret = $this->mRows[$this->mCursor] + $arrNum; - break; - } - $this->mCursor++; - return $ret; + return $result; } - public function get( $pos, $fld ) { - return $this->mRows[$pos][$fld]; - } + /** + * @return array|bool + */ + public function fetchRow() { + $res = $this->result; - public function numrows() { - return $this->mRowCount; - } + if ( $this->mSeekTo !== null ) { + $result = sqlsrv_fetch_array( $res, SQLSRV_FETCH_BOTH, + SQLSRV_SCROLL_ABSOLUTE, $this->mSeekTo ); + $this->mSeekTo = null; + } else { + $result = sqlsrv_fetch_array( $res ); + } - public function seek( $iRow ) { - $this->mCursor = min( $iRow, $this->mRowCount ); - } + // MediaWiki expects us to return boolean false when there are no more rows instead of null + if ( $result === null ) { + return false; + } - public function numfields() { - return $this->mNumFields; + return $result; } - public function fieldname( $nr ) { - $arrKeys = array_keys( $this->mRows[0] ); - return $arrKeys[$nr]; - } + /** + * @param int $row + * @return bool + */ + public function seek( $row ) { + $res = $this->result; - public function fieldtype( $nr ) { - $i = 0; - $intType = -1; - foreach ( $this->mFieldMeta as $meta ) { - if ( $nr == $i ) { - $intType = $meta['Type']; - break; - } - $i++; - } - // http://msdn.microsoft.com/en-us/library/cc296183.aspx contains type table - switch ( $intType ) { - case SQLSRV_SQLTYPE_BIGINT: - $strType = 'bigint'; - break; - case SQLSRV_SQLTYPE_BINARY: - $strType = 'binary'; - break; - case SQLSRV_SQLTYPE_BIT: - $strType = 'bit'; - break; - case SQLSRV_SQLTYPE_CHAR: - $strType = 'char'; - break; - case SQLSRV_SQLTYPE_DATETIME: - $strType = 'datetime'; - break; - case SQLSRV_SQLTYPE_DECIMAL: // ($precision, $scale) - $strType = 'decimal'; - break; - case SQLSRV_SQLTYPE_FLOAT: - $strType = 'float'; - break; - case SQLSRV_SQLTYPE_IMAGE: - $strType = 'image'; - break; - case SQLSRV_SQLTYPE_INT: - $strType = 'int'; - break; - case SQLSRV_SQLTYPE_MONEY: - $strType = 'money'; - break; - case SQLSRV_SQLTYPE_NCHAR: // ($charCount): - $strType = 'nchar'; - break; - case SQLSRV_SQLTYPE_NUMERIC: // ($precision, $scale): - $strType = 'numeric'; - break; - case SQLSRV_SQLTYPE_NVARCHAR: // ($charCount) - $strType = 'nvarchar'; - break; - // case SQLSRV_SQLTYPE_NVARCHAR('max'): - // $strType = 'nvarchar(MAX)'; - // break; - case SQLSRV_SQLTYPE_NTEXT: - $strType = 'ntext'; - break; - case SQLSRV_SQLTYPE_REAL: - $strType = 'real'; - break; - case SQLSRV_SQLTYPE_SMALLDATETIME: - $strType = 'smalldatetime'; - break; - case SQLSRV_SQLTYPE_SMALLINT: - $strType = 'smallint'; - break; - case SQLSRV_SQLTYPE_SMALLMONEY: - $strType = 'smallmoney'; - break; - case SQLSRV_SQLTYPE_TEXT: - $strType = 'text'; - break; - case SQLSRV_SQLTYPE_TIMESTAMP: - $strType = 'timestamp'; - break; - case SQLSRV_SQLTYPE_TINYINT: - $strType = 'tinyint'; - break; - case SQLSRV_SQLTYPE_UNIQUEIDENTIFIER: - $strType = 'uniqueidentifier'; - break; - case SQLSRV_SQLTYPE_UDT: - $strType = 'UDT'; - break; - case SQLSRV_SQLTYPE_VARBINARY: // ($byteCount) - $strType = 'varbinary'; - break; - // case SQLSRV_SQLTYPE_VARBINARY('max'): - // $strType = 'varbinary(MAX)'; - // break; - case SQLSRV_SQLTYPE_VARCHAR: // ($charCount) - $strType = 'varchar'; - break; - // case SQLSRV_SQLTYPE_VARCHAR('max'): - // $strType = 'varchar(MAX)'; - // break; - case SQLSRV_SQLTYPE_XML: - $strType = 'xml'; - break; - default: - $strType = $intType; + // check bounds + $numRows = $this->db->numRows( $res ); + $row = intval( $row ); + + if ( $numRows === 0 ) { + return false; + } elseif ( $row < 0 || $row > $numRows - 1 ) { + return false; } - return $strType; - } - public function free() { - unset( $this->mRows ); + // Unlike MySQL, the seek actually happens on the next access + $this->mSeekTo = $row; + return true; } } |