diff options
Diffstat (limited to 'vendor/ruflin/elastica/lib/Elastica/Client.php')
-rw-r--r-- | vendor/ruflin/elastica/lib/Elastica/Client.php | 312 |
1 files changed, 170 insertions, 142 deletions
diff --git a/vendor/ruflin/elastica/lib/Elastica/Client.php b/vendor/ruflin/elastica/lib/Elastica/Client.php index a5821180..b30bdb43 100644 --- a/vendor/ruflin/elastica/lib/Elastica/Client.php +++ b/vendor/ruflin/elastica/lib/Elastica/Client.php @@ -1,27 +1,21 @@ <?php - namespace Elastica; -use Elastica\Bulk; use Elastica\Bulk\Action; -use Elastica\Exception\ResponseException; -use Elastica\Exception\ClientException; use Elastica\Exception\ConnectionException; use Elastica\Exception\InvalidException; use Elastica\Exception\RuntimeException; use Psr\Log\LoggerInterface; /** - * Client to connect the the elasticsearch server + * Client to connect the the elasticsearch server. * - * @category Xodoa - * @package Elastica * @author Nicolas Ruflin <spam@ruflin.com> */ class Client { /** - * Config with defaults + * Config with defaults. * * log: Set to true, to enable logging, set a string to log to a specific file * retryOnConflict: Use in \Elastica\Client::updateDocument @@ -29,26 +23,21 @@ class Client * @var array */ protected $_config = array( - 'host' => null, - 'port' => null, - 'path' => null, - 'url' => null, - 'proxy' => null, - 'transport' => null, - 'persistent' => true, - 'timeout' => null, - 'connections' => array(), // host, port, path, timeout, transport, persistent, timeout, config -> (curl, headers, url) - 'roundRobin' => false, - 'log' => false, + 'host' => null, + 'port' => null, + 'path' => null, + 'url' => null, + 'proxy' => null, + 'transport' => null, + 'persistent' => true, + 'timeout' => null, + 'connections' => array(), // host, port, path, timeout, transport, persistent, timeout, config -> (curl, headers, url) + 'roundRobin' => false, + 'log' => false, 'retryOnConflict' => 0, ); /** - * @var \Elastica\Connection[] List of connections - */ - protected $_connections = array(); - - /** * @var callback */ protected $_callback = null; @@ -67,9 +56,13 @@ class Client * @var LoggerInterface */ protected $_logger = null; + /** + * @var Connection\ConnectionPool + */ + protected $_connectionPool = null; /** - * Creates a new Elastica client + * Creates a new Elastica client. * * @param array $config OPTIONAL Additional config options * @param callback $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down) @@ -82,32 +75,45 @@ class Client } /** - * Inits the client connections + * Inits the client connections. */ protected function _initConnections() { - $connections = $this->getConfig('connections'); + $connections = array(); - foreach ($connections as $connection) { - $this->_connections[] = Connection::create($this->_prepareConnectionParams($connection)); + foreach ($this->getConfig('connections') as $connection) { + $connections[] = Connection::create($this->_prepareConnectionParams($connection)); } if (isset($this->_config['servers'])) { foreach ($this->getConfig('servers') as $server) { - $this->_connections[] = Connection::create($this->_prepareConnectionParams($server)); + $connections[] = Connection::create($this->_prepareConnectionParams($server)); } } // If no connections set, create default connection - if (empty($this->_connections)) { - $this->_connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig())); + if (empty($connections)) { + $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig())); } + + if (!isset($this->_config['connectionStrategy'])) { + if ($this->getConfig('roundRobin') === true) { + $this->setConfigValue('connectionStrategy', 'RoundRobin'); + } else { + $this->setConfigValue('connectionStrategy', 'Simple'); + } + } + + $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy')); + + $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback); } /** * Creates a Connection params array from a Client or server config array. * * @param array $config + * * @return array */ protected function _prepareConnectionParams(array $config) @@ -126,10 +132,11 @@ class Client } /** - * Sets specific config values (updates and keeps default values) + * Sets specific config values (updates and keeps default values). + * + * @param array $config Params * - * @param array $config Params - * @return \Elastica\Client + * @return $this */ public function setConfig(array $config) { @@ -142,11 +149,13 @@ class Client /** * Returns a specific config key or the whole - * config array if not set + * config array if not set. + * + * @param string $key Config key * - * @param string $key Config key * @throws \Elastica\Exception\InvalidException - * @return array|string Config value + * + * @return array|string Config value */ public function getConfig($key = '') { @@ -155,18 +164,19 @@ class Client } if (!array_key_exists($key, $this->_config)) { - throw new InvalidException('Config key is not set: ' . $key); + throw new InvalidException('Config key is not set: '.$key); } return $this->_config[$key]; } /** - * Sets / overwrites a specific config value + * Sets / overwrites a specific config value. + * + * @param string $key Key to set + * @param mixed $value Value * - * @param string $key Key to set - * @param mixed $value Value - * @return \Elastica\Client Client object + * @return $this */ public function setConfigValue($key, $value) { @@ -174,8 +184,9 @@ class Client } /** - * @param array|string $keys config key or path of config keys - * @param mixed $default default value will be returned if key was not found + * @param array|string $keys config key or path of config keys + * @param mixed $default default value will be returned if key was not found + * * @return mixed */ public function getConfigValue($keys, $default = null) @@ -188,13 +199,15 @@ class Client return $default; } } + return $value; } /** - * Returns the index for the given connection + * Returns the index for the given connection. + * + * @param string $name Index name to create connection to * - * @param string $name Index name to create connection to * @return \Elastica\Index Index for the given name */ public function getIndex($name) @@ -203,11 +216,14 @@ class Client } /** - * Adds a HTTP Header + * Adds a HTTP Header. + * + * @param string $header The HTTP Header + * @param string $headerValue The HTTP Header Value * - * @param string $header The HTTP Header - * @param string $headerValue The HTTP Header Value * @throws \Elastica\Exception\InvalidException If $header or $headerValue is not a string + * + * @return $this */ public function addHeader($header, $headerValue) { @@ -216,13 +232,18 @@ class Client } else { throw new InvalidException('Header must be a string'); } + + return $this; } /** - * Remove a HTTP Header + * Remove a HTTP Header. * - * @param string $header The HTTP Header to remove - * @throws \Elastica\Exception\InvalidException IF $header is not a string + * @param string $header The HTTP Header to remove + * + * @throws \Elastica\Exception\InvalidException If $header is not a string + * + * @return $this */ public function removeHeader($header) { @@ -233,21 +254,27 @@ class Client } else { throw new InvalidException('Header must be a string'); } + + return $this; } /** - * Uses _bulk to send documents to the server + * Uses _bulk to send documents to the server. * * Array of \Elastica\Document as input. Index and type has to be * set inside the document, because for bulk settings documents, * documents can belong to any type and index * - * @param array|\Elastica\Document[] $docs Array of Elastica\Document - * @return \Elastica\Bulk\ResponseSet Response object + * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + * + * @param array|\Elastica\Document[] $docs Array of Elastica\Document + * * @throws \Elastica\Exception\InvalidException If docs is empty - * @link http://www.elasticsearch.org/guide/reference/api/bulk.html + * + * @return \Elastica\Bulk\ResponseSet Response object */ - public function updateDocuments(array $docs) { + public function updateDocuments(array $docs) + { if (empty($docs)) { throw new InvalidException('Array has to consist of at least one element'); } @@ -260,16 +287,19 @@ class Client } /** - * Uses _bulk to send documents to the server + * Uses _bulk to send documents to the server. * * Array of \Elastica\Document as input. Index and type has to be * set inside the document, because for bulk settings documents, * documents can belong to any type and index * - * @param array|\Elastica\Document[] $docs Array of Elastica\Document - * @return \Elastica\Bulk\ResponseSet Response object + * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + * + * @param array|\Elastica\Document[] $docs Array of Elastica\Document + * * @throws \Elastica\Exception\InvalidException If docs is empty - * @link http://www.elasticsearch.org/guide/reference/api/bulk.html + * + * @return \Elastica\Bulk\ResponseSet Response object */ public function addDocuments(array $docs) { @@ -285,25 +315,25 @@ class Client } /** - * Update document, using update script. Requires elasticsearch >= 0.19.0 + * Update document, using update script. Requires elasticsearch >= 0.19.0. + * + * @param int $id document id + * @param array|\Elastica\Script|\Elastica\Document $data raw data for request body + * @param string $index index to update + * @param string $type type of index to update + * @param array $options array of query params to use for query. For possible options check es api * - * @param int $id document id - * @param array|\Elastica\Script|\Elastica\Document $data raw data for request body - * @param string $index index to update - * @param string $type type of index to update - * @param array $options array of query params to use for query. For possible options check es api * @return \Elastica\Response - * @link http://www.elasticsearch.org/guide/reference/api/update.html + * + * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html */ public function updateDocument($id, $data, $index, $type, array $options = array()) { - $path = $index . '/' . $type . '/' . $id . '/_update'; + $path = $index.'/'.$type.'/'.$id.'/_update'; if ($data instanceof Script) { $requestData = $data->toArray(); - } elseif ($data instanceof Document) { - $requestData = array('doc' => $data->getData()); if ($data->getDocAsUpsert()) { @@ -339,14 +369,13 @@ class Client //If an upsert document exists if ($data instanceof Script || $data instanceof Document) { - if ($data->hasUpsert()) { $requestData['upsert'] = $data->getUpsert()->getData(); } } if (!isset($options['retry_on_conflict'])) { - $retryOnConflict = $this->getConfig("retryOnConflict"); + $retryOnConflict = $this->getConfig('retryOnConflict'); $options['retry_on_conflict'] = $retryOnConflict; } @@ -371,7 +400,7 @@ class Client /** * @param \Elastica\Response $response * @param \Elastica\Document $document - * @param string $fields Array of field names to be populated or '_source' if whole document data should be updated + * @param string $fields Array of field names to be populated or '_source' if whole document data should be updated */ protected function _populateDocumentFieldsFromResponse(Response $response, Document $document, $fields) { @@ -395,11 +424,13 @@ class Client } /** - * Bulk deletes documents + * Bulk deletes documents. * * @param array|\Elastica\Document[] $docs - * @return \Elastica\Bulk\ResponseSet + * * @throws \Elastica\Exception\InvalidException + * + * @return \Elastica\Bulk\ResponseSet */ public function deleteDocuments(array $docs) { @@ -414,7 +445,7 @@ class Client } /** - * Returns the status object for all indices + * Returns the status object for all indices. * * @return \Elastica\Status Status object */ @@ -424,7 +455,7 @@ class Client } /** - * Returns the current cluster + * Returns the current cluster. * * @return \Elastica\Cluster Cluster object */ @@ -434,54 +465,35 @@ class Client } /** - * @param \Elastica\Connection $connection - * @return \Elastica\Client + * @param \Elastica\Connection $connection + * + * @return $this */ public function addConnection(Connection $connection) { - $this->_connections[] = $connection; + $this->_connectionPool->addConnection($connection); return $this; } /** * Determines whether a valid connection is available for use. - * + * * @return bool */ public function hasConnection() { - foreach ($this->_connections as $connection) - { - if ($connection->isEnabled()) - { - return true; - } - } - - return false; + return $this->_connectionPool->hasConnection(); } /** * @throws \Elastica\Exception\ClientException + * * @return \Elastica\Connection */ public function getConnection() { - $enabledConnection = null; - - foreach ($this->_connections as $connection) { - if ($connection->isEnabled()) { - $enabledConnection = $connection; - break; - } - } - - if (empty($enabledConnection)) { - throw new ClientException('No enabled connection'); - } - - return $enabledConnection; + return $this->_connectionPool->getConnection(); } /** @@ -489,30 +501,42 @@ class Client */ public function getConnections() { - return $this->_connections; + return $this->_connectionPool->getConnections(); + } + + /** + * @return \Elastica\Connection\Strategy\StrategyInterface + */ + public function getConnectionStrategy() + { + return $this->_connectionPool->getStrategy(); } /** - * @param \Elastica\Connection[] $connections - * @return \Elastica\Client + * @param array|\Elastica\Connection[] $connections + * + * @return $this */ public function setConnections(array $connections) { - $this->_connections = $connections; + $this->_connectionPool->setConnections($connections); return $this; } /** - * Deletes documents with the given ids, index, type from the index + * Deletes documents with the given ids, index, type from the index. + * + * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + * + * @param array $ids Document ids + * @param string|\Elastica\Index $index Index name + * @param string|\Elastica\Type $type Type of documents + * @param string|false $routing Optional routing key for all ids * - * @param array $ids Document ids - * @param string|\Elastica\Index $index Index name - * @param string|\Elastica\Type $type Type of documents - * @param string|false $routing Optional routing key for all ids * @throws \Elastica\Exception\InvalidException - * @return \Elastica\Bulk\ResponseSet Response object - * @link http://www.elasticsearch.org/guide/reference/api/bulk.html + * + * @return \Elastica\Bulk\ResponseSet Response object */ public function deleteIds(array $ids, $index, $type, $routing = false) { @@ -539,7 +563,7 @@ class Client } /** - * Bulk operation + * Bulk operation. * * Every entry in the params array has to exactly on array * of the bulk operation. An example param array would be: @@ -550,11 +574,14 @@ class Client * array('delete' => array('_index' => 'test', '_type' => 'user', '_id' => '2')) * ); * - * @param array $params Parameter array + * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + * + * @param array $params Parameter array + * * @throws \Elastica\Exception\ResponseException * @throws \Elastica\Exception\InvalidException - * @return \Elastica\Bulk\ResponseSet Response object - * @link http://www.elasticsearch.org/guide/reference/api/bulk.html + * + * @return \Elastica\Bulk\ResponseSet Response object */ public function bulk(array $params) { @@ -570,15 +597,17 @@ class Client } /** - * Makes calls to the elasticsearch server based on this index + * Makes calls to the elasticsearch server based on this index. * * It's possible to make any REST query directly over this method * - * @param string $path Path to call - * @param string $method Rest method to use (GET, POST, DELETE, PUT) - * @param array $data OPTIONAL Arguments as array - * @param array $query OPTIONAL Query params + * @param string $path Path to call + * @param string $method Rest method to use (GET, POST, DELETE, PUT) + * @param array $data OPTIONAL Arguments as array + * @param array $query OPTIONAL Query params + * * @throws Exception\ConnectionException|\Exception + * * @return \Elastica\Response Response object */ public function request($path, $method = Request::GET, $data = array(), array $query = array()) @@ -595,30 +624,26 @@ class Client $this->_lastResponse = $response; return $response; - } catch (ConnectionException $e) { - $connection->setEnabled(false); - - // Calls callback with connection as param to make it possible to persist invalid connections - if ($this->_callback) { - call_user_func($this->_callback, $connection, $e, $this); - } + $this->_connectionPool->onFail($connection, $e, $this); // In case there is no valid connection left, throw exception which caused the disabling of the connection. - if (!$this->hasConnection()) - { + if (!$this->hasConnection()) { throw $e; } + return $this->request($path, $method, $data, $query); } } /** - * Optimizes all search indices + * Optimizes all search indices. + * + * @param array $args OPTIONAL Optional arguments * - * @param array $args OPTIONAL Optional arguments * @return \Elastica\Response Response object - * @link http://www.elasticsearch.org/guide/reference/api/admin-indices-optimize.html + * + * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-optimize.html */ public function optimizeAll($args = array()) { @@ -626,10 +651,11 @@ class Client } /** - * Refreshes all search indices + * Refreshes all search indices. * * @return \Elastica\Response Response object - * @link http://www.elasticsearch.org/guide/reference/api/admin-indices-refresh.html + * + * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html */ public function refreshAll() { @@ -637,9 +663,10 @@ class Client } /** - * logging + * logging. * * @param string|\Elastica\Request $context + * * @throws Exception\RuntimeException */ protected function _log($context) @@ -677,9 +704,10 @@ class Client } /** - * set Logger + * set Logger. * * @param LoggerInterface $logger + * * @return $this */ public function setLogger(LoggerInterface $logger) |