httpAdapter = $httpAdapter; } /** * Makes calls to the elasticsearch server. * * All calls that are made to the server are done through this function * * @param \Elastica\Request $elasticaRequest * @param array $params Host, Port, ... * * @throws \Elastica\Exception\ConnectionException * @throws \Elastica\Exception\ResponseException * @throws \Elastica\Exception\Connection\HttpException * * @return \Elastica\Response Response object */ public function exec(ElasticaRequest $elasticaRequest, array $params) { $connection = $this->getConnection(); if ($timeout = $connection->getTimeout()) { $this->httpAdapter->getConfiguration()->setTimeout($timeout); } $httpAdapterRequest = $this->_createHttpAdapterRequest($elasticaRequest, $connection); $start = microtime(true); $httpAdapterResponse = $this->httpAdapter->sendRequest($httpAdapterRequest); $end = microtime(true); $elasticaResponse = $this->_createElasticaResponse($httpAdapterResponse, $connection); $elasticaResponse->setQueryTime($end - $start); $elasticaResponse->setTransferInfo( array( 'request_header' => $httpAdapterRequest->getMethod(), 'http_code' => $httpAdapterResponse->getStatusCode(), ) ); if ($elasticaResponse->hasError()) { throw new ResponseException($elasticaRequest, $elasticaResponse); } if ($elasticaResponse->hasFailedShards()) { throw new PartialShardFailureException($elasticaRequest, $elasticaResponse); } return $elasticaResponse; } /** * @param HttpAdapterResponse $httpAdapterResponse * * @return ElasticaResponse */ protected function _createElasticaResponse(HttpAdapterResponse $httpAdapterResponse) { return new ElasticaResponse((string) $httpAdapterResponse->getBody(), $httpAdapterResponse->getStatusCode()); } /** * @param ElasticaRequest $elasticaRequest * @param Connection $connection * * @return HttpAdapterRequest */ protected function _createHttpAdapterRequest(ElasticaRequest $elasticaRequest, Connection $connection) { $data = $elasticaRequest->getData(); $body = null; $method = $elasticaRequest->getMethod(); $headers = $connection->hasConfig('headers') ?: array(); if (!empty($data) || '0' === $data) { if ($method == ElasticaRequest::GET) { $method = ElasticaRequest::POST; } if ($this->hasParam('postWithRequestBody') && $this->getParam('postWithRequestBody') == true) { $elasticaRequest->setMethod(ElasticaRequest::POST); $method = ElasticaRequest::POST; } if (is_array($data)) { $body = JSON::stringify($data, 'JSON_ELASTICSEARCH'); } else { $body = $data; } } $url = $this->_getUri($elasticaRequest, $connection); $streamBody = new StringStream($body); return new HttpAdapterRequest($url, $method, HttpAdapterRequest::PROTOCOL_VERSION_1_1, $headers, $streamBody); } /** * @param ElasticaRequest $request * @param \Elastica\Connection $connection * * @return string */ protected function _getUri(ElasticaRequest $request, Connection $connection) { $url = $connection->hasConfig('url') ? $connection->getConfig('url') : ''; if (!empty($url)) { $baseUri = $url; } else { $baseUri = $this->_scheme.'://'.$connection->getHost().':'.$connection->getPort().'/'.$connection->getPath(); } $baseUri .= $request->getPath(); $query = $request->getQuery(); if (!empty($query)) { $baseUri .= '?'.http_build_query($query); } return $baseUri; } }