From 8b124c6004907f1055d98b8e1ef801e9544d63fe Mon Sep 17 00:00:00 2001 From: dblock Date: Sun, 4 Aug 2024 17:19:32 -0400 Subject: [PATCH] WIP: Upgrading guzzle. Signed-off-by: dblock --- composer.json | 7 +- samples/composer.json | 18 ++ samples/index.php | 25 ++ src/OpenSearch/ClientBuilder.php | 85 +++--- src/OpenSearch/Connections/Connection.php | 246 +++++++++--------- src/OpenSearch/Handlers/SigV4Handler.php | 2 +- .../Namespaces/BooleanRequestWrapper.php | 1 - src/OpenSearch/Transport.php | 12 +- tests/ClientTest.php | 2 - tests/Handlers/SigV4HandlerTest.php | 1 - tests/TransportTest.php | 2 - 11 files changed, 227 insertions(+), 174 deletions(-) create mode 100644 samples/composer.json create mode 100644 samples/index.php diff --git a/composer.json b/composer.json index 6b5af53bb..bc9c73c92 100644 --- a/composer.json +++ b/composer.json @@ -14,9 +14,12 @@ ], "require": { "php": "^7.3 || ^8.0", - "ext-json": ">=1.3.7", "ext-curl": "*", - "ezimuel/ringphp": "^1.1.2", + "ext-json": ">=1.3.7", + "guzzlehttp/guzzle": "^7.9", + "guzzlehttp/promises": "^2.0", + "psr/http-client": "^1.0", + "psr/http-message": "^1.1 || ^2.0", "psr/log": "^1|^2|^3", "symfony/yaml": "*" }, diff --git a/samples/composer.json b/samples/composer.json new file mode 100644 index 000000000..90aa91375 --- /dev/null +++ b/samples/composer.json @@ -0,0 +1,18 @@ +{ + "name": "opensearch-php/samples", + "description": "OpenSearch PHP client samples.", + "type": "project", + "authors": [], + "scripts": { + "index": ["php index.php"] + }, + "repositories": [ + { + "type": "path", + "url": "../" + } + ], + "require": { + "opensearch-project/opensearch-php": "dev-main" + } + } \ No newline at end of file diff --git a/samples/index.php b/samples/index.php new file mode 100644 index 000000000..813cee5a6 --- /dev/null +++ b/samples/index.php @@ -0,0 +1,25 @@ + [ + 'https://localhost:9200' + ], + 'BasicAuthentication' => ['admin', getenv('OPENSEARCH_PASSWORD')], + 'Retries' => 2, + 'SSLVerification' => false +]); + +$info = $client->info(); + +print($info->getBody()); + +echo "{$info['version']['distribution']}: {$info['version']['number']}\n"; + +?> diff --git a/src/OpenSearch/ClientBuilder.php b/src/OpenSearch/ClientBuilder.php index 20c6b7af5..b0e28f131 100644 --- a/src/OpenSearch/ClientBuilder.php +++ b/src/OpenSearch/ClientBuilder.php @@ -38,12 +38,14 @@ use OpenSearch\Namespaces\NamespaceBuilderInterface; use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Serializers\SmartSerializer; -use GuzzleHttp\Ring\Client\CurlHandler; -use GuzzleHttp\Ring\Client\CurlMultiHandler; -use GuzzleHttp\Ring\Client\Middleware; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use ReflectionClass; +use GuzzleHttp\Handler\CurlHandler; +use GuzzleHttp\Handler\CurlMultiHandler; +use GuzzleHttp\Handler\Proxy; +use Psr\Http\Message\RequestInterface; +use GuzzleHttp\Utils; class ClientBuilder { @@ -239,30 +241,30 @@ public static function fromConfig(array $config, bool $quiet = false): Client return $builder->build(); } - /** - * Get the default handler - * - * @param array $multiParams - * @param array $singleParams - * @throws \RuntimeException - */ - public static function defaultHandler(array $multiParams = [], array $singleParams = []): callable - { - $future = null; - if (extension_loaded('curl')) { - $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); - if (function_exists('curl_reset')) { - $default = new CurlHandler($singleParams); - $future = new CurlMultiHandler($config); - } else { - $default = new CurlMultiHandler($config); - } - } else { - throw new \RuntimeException('OpenSearch-PHP requires cURL, or a custom HTTP handler.'); - } - - return $future ? Middleware::wrapFuture($default, $future) : $default; - } + // /** + // * Get the default handler + // * + // * @param array $multiParams + // * @param array $singleParams + // * @throws \RuntimeException + // */ + // public static function defaultHandler(array $multiParams = [], array $singleParams = []): callable + // { + // $future = null; + // if (extension_loaded('curl')) { + // $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); + // if (function_exists('curl_reset')) { + // $default = new CurlHandler($singleParams); + // $future = new CurlMultiHandler($config); + // } else { + // $default = new CurlMultiHandler($config); + // } + // } else { + // throw new \RuntimeException('OpenSearch-PHP requires cURL, or a custom HTTP handler.'); + // } + + // return $future ? Proxy::wrapSync($default, $future) : $default; + // } /** * Get the multi handler for async (CurlMultiHandler) @@ -583,7 +585,7 @@ public function build(): Client $this->buildLoggers(); if (is_null($this->handler)) { - $this->handler = ClientBuilder::defaultHandler(); + $this->handler = Utils::chooseHandler(); // ClientBuilder::defaultHandler(); } if (!is_null($this->sigV4CredentialProvider)) { @@ -610,18 +612,21 @@ public function build(): Client } if (!is_null($sslOptions)) { - $sslHandler = function (callable $handler, array $sslOptions) { - return function (array $request) use ($handler, $sslOptions) { - // Add our custom headers - foreach ($sslOptions as $key => $value) { - $request['client'][$key] = $value; - } - - // Send the request using the handler and return the response. - return $handler($request); - }; - }; - $this->handler = $sslHandler($this->handler, $sslOptions); + // $sslHandler = function (callable $handler, array $sslOptions) { + // return function (RequestInterface $request, array $options) use ($handler, $sslOptions) { + // // Add our custom headers + // foreach ($sslOptions as $key => $value) { + // $request['client'][$key] = $value; + // } + + // // Send the request using the handler and return the response. + // return $handler($request); + // }; + // }; + // $this->handler = $sslHandler($this->handler, $sslOptions); + foreach ($sslOptions as $key => $value) { + $this->connectionParams['client'][$key] = $value; + } } if (is_null($this->serializer)) { diff --git a/src/OpenSearch/Connections/Connection.php b/src/OpenSearch/Connections/Connection.php index 3a67e9309..24d3e7cf7 100644 --- a/src/OpenSearch/Connections/Connection.php +++ b/src/OpenSearch/Connections/Connection.php @@ -42,10 +42,9 @@ use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Transport; use Exception; -use GuzzleHttp\Ring\Core; -use GuzzleHttp\Ring\Exception\ConnectException; -use GuzzleHttp\Ring\Exception\RingException; use Psr\Log\LoggerInterface; +use GuzzleHttp\Psr7\Request; +use Psr\Http\Message\RequestInterface; class Connection implements ConnectionInterface { @@ -209,33 +208,40 @@ public function performRequest(string $method, string $uri, ?array $params = [], $headers = array_merge($this->headers, $options['client']['headers']); } - $host = $this->host; - if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) { - $host .= ':' . $this->port; - } - - $request = [ - 'http_method' => $method, - 'scheme' => $this->transportSchema, - 'uri' => $this->getURI($uri, $params), - 'body' => $body, - 'headers' => array_merge( - [ - 'Host' => [$host] - ], - $headers - ) - ]; - - $request = array_replace_recursive($request, $this->connectionParams, $options); + // $host = $this->host; + // if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) { + // $host .= ':' . $this->port; + // } + + // $request = [ + // 'http_method' => $method, + // 'scheme' => $this->transportSchema, + // 'uri' => $this->getURI($uri, $params), + // 'body' => $body, + // 'headers' => array_merge( + // [ + // 'Host' => [$host] + // ], + // $headers + // ) + // ]; + + // $request = array_replace_recursive($request, $this->connectionParams, $options); + + $request = new Request( + $method, + $this->transportSchema . '://' . $this->host . ':' . $this->port . $this->getURI($uri, $params), + $headers, + $body + ); - // RingPHP does not like if client is empty - if (empty($request['client'])) { - unset($request['client']); - } + // // RingPHP does not like if client is empty + // if (empty($request['client'])) { + // unset($request['client']); + // } $handler = $this->handler; - $future = $handler($request, $this, $transport, $options); + $future = $handler($request, $this, $transport, ['verify' => false, 'auth' => ['admin', 'myStrongPassword123!']] /*$options*/); return $future; } @@ -252,93 +258,95 @@ public function getLastRequestInfo(): array private function wrapHandler(callable $handler): callable { - return function (array $request, Connection $connection, Transport $transport = null, $options) use ($handler) { + return function (RequestInterface $request, Connection $connection, Transport $transport = null, $options) use ($handler) { $this->lastRequest = []; $this->lastRequest['request'] = $request; - // Send the request using the wrapped handler. - $response = Core::proxy( - $handler($request), - function ($response) use ($connection, $transport, $request, $options) { - $this->lastRequest['response'] = $response; - - if (isset($response['error']) === true) { - if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { - $this->log->warning("Curl exception encountered."); - - $exception = $this->getCurlRetryException($request, $response); - - $this->logRequestFail($request, $response, $exception); - - $node = $connection->getHost(); - $this->log->warning("Marking node $node dead."); - $connection->markDead(); - - // If the transport has not been set, we are inside a Ping or Sniff, - // so we don't want to retrigger retries anyway. - // - // TODO this could be handled better, but we are limited because connectionpools do not - // have access to Transport. Architecturally, all of this needs to be refactored - if (isset($transport) === true) { - $transport->connectionPool->scheduleCheck(); - - $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; - $shouldRetry = $transport->shouldRetry($request); - $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; - - $this->log->warning("Retries left? $shouldRetryText"); - if ($shouldRetry && !$neverRetry) { - return $transport->performRequest( - $request['http_method'], - $request['uri'], - [], - $request['body'], - $options - ); - } - } - - $this->log->warning("Out of retries, throwing exception from $node"); - // Only throw if we run out of retries - throw $exception; - } else { - // Something went seriously wrong, bail - $exception = new TransportException($response['error']->getMessage()); - $this->logRequestFail($request, $response, $exception); - throw $exception; - } - } else { - $connection->markAlive(); - - if (isset($response['headers']['Warning'])) { - $this->logWarning($request, $response); - } - if (isset($response['body']) === true) { - $response['body'] = stream_get_contents($response['body']); - $this->lastRequest['response']['body'] = $response['body']; - } - - if ($response['status'] >= 400 && $response['status'] < 500) { - $ignore = $request['client']['ignore'] ?? []; - // Skip 404 if succeeded true in the body (e.g. clear_scroll) - $body = $response['body'] ?? ''; - if (strpos($body, '"succeeded":true') !== false) { - $ignore[] = 404; - } - $this->process4xxError($request, $response, $ignore); - } elseif ($response['status'] >= 500) { - $ignore = $request['client']['ignore'] ?? []; - $this->process5xxError($request, $response, $ignore); - } - - // No error, deserialize - $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); - } - $this->logRequestSuccess($request, $response); - - return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; - } - ); + $response = $handler($request, $options); + + // // Send the request using the wrapped handler. + // $response = Core::proxy( + // $handler($request), + // function ($response) use ($connection, $transport, $request, $options) { + // $this->lastRequest['response'] = $response; + + // if (isset($response['error']) === true) { + // if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { + // $this->log->warning("Curl exception encountered."); + + // $exception = $this->getCurlRetryException($request, $response); + + // $this->logRequestFail($request, $response, $exception); + + // $node = $connection->getHost(); + // $this->log->warning("Marking node $node dead."); + // $connection->markDead(); + + // // If the transport has not been set, we are inside a Ping or Sniff, + // // so we don't want to retrigger retries anyway. + // // + // // TODO this could be handled better, but we are limited because connectionpools do not + // // have access to Transport. Architecturally, all of this needs to be refactored + // if (isset($transport) === true) { + // $transport->connectionPool->scheduleCheck(); + + // $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; + // $shouldRetry = $transport->shouldRetry($request); + // $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; + + // $this->log->warning("Retries left? $shouldRetryText"); + // if ($shouldRetry && !$neverRetry) { + // return $transport->performRequest( + // $request['http_method'], + // $request['uri'], + // [], + // $request['body'], + // $options + // ); + // } + // } + + // $this->log->warning("Out of retries, throwing exception from $node"); + // // Only throw if we run out of retries + // throw $exception; + // } else { + // // Something went seriously wrong, bail + // $exception = new TransportException($response['error']->getMessage()); + // $this->logRequestFail($request, $response, $exception); + // throw $exception; + // } + // } else { + // $connection->markAlive(); + + // if (isset($response['headers']['Warning'])) { + // $this->logWarning($request, $response); + // } + // if (isset($response['body']) === true) { + // $response['body'] = stream_get_contents($response['body']); + // $this->lastRequest['response']['body'] = $response['body']; + // } + + // if ($response['status'] >= 400 && $response['status'] < 500) { + // $ignore = $request['client']['ignore'] ?? []; + // // Skip 404 if succeeded true in the body (e.g. clear_scroll) + // $body = $response['body'] ?? ''; + // if (strpos($body, '"succeeded":true') !== false) { + // $ignore[] = 404; + // } + // $this->process4xxError($request, $response, $ignore); + // } elseif ($response['status'] >= 500) { + // $ignore = $request['client']['ignore'] ?? []; + // $this->process5xxError($request, $response, $ignore); + // } + + // // No error, deserialize + // $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); + // } + // $this->logRequestSuccess($request, $response); + + // return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; + // } + // ); return $response; }; @@ -375,7 +383,7 @@ public function getHeaders(): array return $this->headers; } - public function logWarning(array $request, array $response): void + public function logWarning(RequestInterface $request, ResponseInterface $response): void { $this->log->warning('Deprecation', $response['headers']['Warning']); } @@ -383,11 +391,11 @@ public function logWarning(array $request, array $response): void /** * Log a successful request * - * @param array $request - * @param array $response + * @param RequestInterface $request + * @param ResponseInterface $response * @return void */ - public function logRequestSuccess(array $request, array $response): void + public function logRequestSuccess(RequestInterface $request, ResponseInterface $response): void { $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; $uri = $this->addPortInUrl($response['effective_url'], (int) $port); @@ -431,7 +439,7 @@ public function logRequestSuccess(array $request, array $response): void * * @return void */ - public function logRequestFail(array $request, array $response, \Throwable $exception): void + public function logRequestFail(RequestInterface $request, ResponseInterface $response, \Throwable $exception): void { $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; $uri = $this->addPortInUrl($response['effective_url'], (int) $port); @@ -563,7 +571,7 @@ public function getPort() return $this->port; } - protected function getCurlRetryException(array $request, array $response): OpenSearchException + protected function getCurlRetryException(RequestInterface $request, ResponseInterface $response): OpenSearchException { $exception = null; $message = $response['error']->getMessage(); @@ -634,7 +642,7 @@ private function buildCurlCommand(string $method, string $url, ?string $body): s /** * @throws OpenSearchException */ - private function process4xxError(array $request, array $response, array $ignore): void + private function process4xxError(RequestInterface $request, ResponseInterface $response, array $ignore): void { $statusCode = $response['status']; @@ -672,7 +680,7 @@ private function process4xxError(array $request, array $response, array $ignore) /** * @throws OpenSearchException */ - private function process5xxError(array $request, array $response, array $ignore): void + private function process5xxError(RequestInterface $request, ResponseInterface $response, array $ignore): void { $statusCode = (int) $response['status']; $responseBody = $response['body']; diff --git a/src/OpenSearch/Handlers/SigV4Handler.php b/src/OpenSearch/Handlers/SigV4Handler.php index 41bc8b8cf..330aab242 100644 --- a/src/OpenSearch/Handlers/SigV4Handler.php +++ b/src/OpenSearch/Handlers/SigV4Handler.php @@ -46,7 +46,7 @@ public function __construct( ?: CredentialProvider::defaultProvider(); } - public function __invoke(array $request) + public function __invoke(RequestInterface $request) { $creds = call_user_func($this->credentialProvider)->wait(); diff --git a/src/OpenSearch/Namespaces/BooleanRequestWrapper.php b/src/OpenSearch/Namespaces/BooleanRequestWrapper.php index 9c29f656b..245777bac 100644 --- a/src/OpenSearch/Namespaces/BooleanRequestWrapper.php +++ b/src/OpenSearch/Namespaces/BooleanRequestWrapper.php @@ -25,7 +25,6 @@ use OpenSearch\Common\Exceptions\RoutingMissingException; use OpenSearch\Endpoints\AbstractEndpoint; use OpenSearch\Transport; -use GuzzleHttp\Ring\Future\FutureArrayInterface; abstract class BooleanRequestWrapper { diff --git a/src/OpenSearch/Transport.php b/src/OpenSearch/Transport.php index 8655ce5ea..bb7a87496 100644 --- a/src/OpenSearch/Transport.php +++ b/src/OpenSearch/Transport.php @@ -25,8 +25,8 @@ use OpenSearch\ConnectionPool\AbstractConnectionPool; use OpenSearch\Connections\Connection; use OpenSearch\Connections\ConnectionInterface; -use GuzzleHttp\Ring\Future\FutureArrayInterface; use Psr\Log\LoggerInterface; +use GuzzleHttp\Promise\Promise; class Transport { @@ -96,7 +96,7 @@ public function getConnection(): ConnectionInterface * * @throws Common\Exceptions\NoNodesAvailableException|\Exception */ - public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface + public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): Promise { try { $connection = $this->getConnection(); @@ -118,7 +118,7 @@ public function performRequest(string $method, string $uri, array $params = [], $this ); - $future->promise()->then( + $future->then( //onSuccess function ($response) { $this->retryAttempts = 0; @@ -144,19 +144,19 @@ function ($response) { * * @return callable|array */ - public function resultOrFuture(FutureArrayInterface $result, array $options = []) + public function resultOrFuture(Promise $result, array $options = []) { $response = null; $async = isset($options['client']['future']) ? $options['client']['future'] : null; if (is_null($async) || $async === false) { do { $result = $result->wait(); - } while ($result instanceof FutureArrayInterface); + } while ($result instanceof Promise); } return $result; } - public function shouldRetry(array $request): bool + public function shouldRetry(RequestInterface $request): bool { if ($this->retryAttempts < $this->retries) { $this->retryAttempts += 1; diff --git a/tests/ClientTest.php b/tests/ClientTest.php index 434706078..845e0f61e 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -21,8 +21,6 @@ namespace OpenSearch\Tests; -use GuzzleHttp\Ring\Client\MockHandler; -use GuzzleHttp\Ring\Future\FutureArray; use Mockery as m; use OpenSearch; use OpenSearch\Client; diff --git a/tests/Handlers/SigV4HandlerTest.php b/tests/Handlers/SigV4HandlerTest.php index 862043b21..4cc821b82 100644 --- a/tests/Handlers/SigV4HandlerTest.php +++ b/tests/Handlers/SigV4HandlerTest.php @@ -6,7 +6,6 @@ use Aws\Credentials\CredentialProvider; use Aws\Credentials\Credentials; -use GuzzleHttp\Ring\Future\CompletedFutureArray; use OpenSearch\ClientBuilder; use OpenSearch\Handlers\SigV4Handler; use PHPUnit\Framework\TestCase; diff --git a/tests/TransportTest.php b/tests/TransportTest.php index 96469261b..613fbac10 100644 --- a/tests/TransportTest.php +++ b/tests/TransportTest.php @@ -26,8 +26,6 @@ use OpenSearch\Connections\Connection; use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Transport; -use GuzzleHttp\Ring\Future\FutureArray; -use GuzzleHttp\Ring\Future\FutureArrayInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface;