From ce60d4224b89ee5f512e3c583f36b17f35eb6ea8 Mon Sep 17 00:00:00 2001 From: Kim Pepper Date: Mon, 21 Oct 2024 14:14:47 +1100 Subject: [PATCH] Switch to PSR interfaces Signed-off-by: Kim Pepper --- composer.json | 31 +- samples/index.php | 45 +- src/OpenSearch/ClientBuilder.php | 672 ++------------- .../Exceptions/NoAsyncClientException.php} | 17 +- .../ConnectionPool/AbstractConnectionPool.php | 5 + .../ConnectionPoolInterface.php | 5 + .../Selectors/RoundRobinSelector.php | 46 - .../Selectors/SelectorInterface.php | 5 + .../Selectors/StickyRoundRobinSelector.php | 57 -- .../ConnectionPool/SimpleConnectionPool.php | 48 -- .../ConnectionPool/SniffingConnectionPool.php | 181 ---- .../ConnectionPool/StaticConnectionPool.php | 105 --- .../StaticNoPingConnectionPool.php | 88 -- src/OpenSearch/Connections/Connection.php | 789 ------------------ .../Connections/ConnectionFactory.php | 81 -- .../ConnectionFactoryInterface.php | 5 + .../Connections/ConnectionInterface.php | 7 +- src/OpenSearch/Endpoints/AbstractEndpoint.php | 35 +- src/OpenSearch/Handlers/SigV4Handler.php | 151 ---- .../Namespaces/AbstractNamespace.php | 32 +- src/OpenSearch/RequestFactory.php | 137 +++ src/OpenSearch/RequestFactoryInterface.php | 19 + src/OpenSearch/Transport.php | 169 ++-- src/OpenSearch/TransportInterface.php | 21 + tests/ClientBuilderTest.php | 30 +- tests/ClientTest.php | 457 ++-------- tests/Endpoints/AbstractEndpointTest.php | 3 +- tests/RegisteredNamespaceTest.php | 2 +- tests/TransportTest.php | 119 +-- util/Endpoint.php | 7 +- util/template/client-class | 59 +- util/template/namespace-class | 2 - util/template/namespace-property | 6 +- 33 files changed, 596 insertions(+), 2840 deletions(-) rename src/OpenSearch/{ConnectionPool/Selectors/RandomSelector.php => Common/Exceptions/NoAsyncClientException.php} (59%) delete mode 100644 src/OpenSearch/ConnectionPool/Selectors/RoundRobinSelector.php delete mode 100644 src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php delete mode 100644 src/OpenSearch/ConnectionPool/SimpleConnectionPool.php delete mode 100644 src/OpenSearch/ConnectionPool/SniffingConnectionPool.php delete mode 100644 src/OpenSearch/ConnectionPool/StaticConnectionPool.php delete mode 100644 src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php delete mode 100644 src/OpenSearch/Connections/Connection.php delete mode 100644 src/OpenSearch/Connections/ConnectionFactory.php delete mode 100644 src/OpenSearch/Handlers/SigV4Handler.php create mode 100644 src/OpenSearch/RequestFactory.php create mode 100644 src/OpenSearch/RequestFactoryInterface.php create mode 100644 src/OpenSearch/TransportInterface.php diff --git a/composer.json b/composer.json index dee24ad86..1d06fc9de 100644 --- a/composer.json +++ b/composer.json @@ -23,20 +23,30 @@ "require": { "php": "^8.0", "ext-json": ">=1.3.7", - "ext-curl": "*", - "ezimuel/ringphp": "^1.2.2", + "php-http/async-client-implementation": "^1.0", + "php-http/discovery": "^1.20", + "php-http/guzzle7-adapter": "^1.0", + "psr/http-client": "^1.0", + "psr/http-client-implementation": "^1.0", + "psr/http-factory": "^1.1", + "psr/http-factory-implementation": "^2.4", + "psr/http-message": "^2.0", + "psr/http-message-implementation": "^1.0", "psr/log": "^1|^2|^3", "symfony/yaml": "*" }, "require-dev": { "ext-zip": "*", "aws/aws-sdk-php": "^3.0", - "friendsofphp/php-cs-fixer": "^3.0", - "mockery/mockery": "^1.2", - "phpstan/phpstan": "^1.7.15", - "phpstan/phpstan-mockery": "^1.1.0", - "phpunit/phpunit": "^9.3", - "symfony/finder": "~4.0 || ~5.0" + "friendsofphp/php-cs-fixer": "^v3.64", + "guzzlehttp/psr7": "^2.7", + "mockery/mockery": "^1.6", + "phpstan/phpstan": "^1.12", + "phpstan/phpstan-mockery": "^1.1", + "phpunit/phpunit": "^9.6", + "symfony/finder": "^6.4|^7.0", + "symfony/http-client": "^7.1", + "symfony/http-client-contracts": "^3.5" }, "suggest": { "monolog/monolog": "Allows for client-level logging and tracing", @@ -54,7 +64,10 @@ } }, "config": { - "sort-packages": true + "sort-packages": true, + "allow-plugins": { + "php-http/discovery": true + } }, "scripts": { "php-cs": [ diff --git a/samples/index.php b/samples/index.php index b7bb97f8c..0d9d59ef4 100644 --- a/samples/index.php +++ b/samples/index.php @@ -5,17 +5,46 @@ * SPDX-License-Identifier: Apache-2.0 */ +use OpenSearch\Client; + require_once __DIR__ . '/vendor/autoload.php'; -$client = OpenSearch\ClientBuilder::fromConfig([ - 'Hosts' => [ - 'https://localhost:9200' - ], - 'BasicAuthentication' => ['admin', getenv('OPENSEARCH_PASSWORD')], - 'Retries' => 2, - 'SSLVerification' => false +// Guzzle example + +$guzzleClient = new \GuzzleHttp\Client([ + 'base_uri' => 'https://localhost:9200', + 'auth' => ['admin', getenv('OPENSEARCH_PASSWORD')], + 'verify' => false, + 'retries' => 2, + 'headers' => [ + 'Accept' => 'application/json', + 'Content-Type' => 'application/json', + 'User-Agent' => sprintf('opensearch-php/%s (%s; PHP %s)', Client::VERSION, PHP_OS, PHP_VERSION), + ] ]); +$requestFactory = new \OpenSearch\RequestFactory(); +$transport = new OpenSearch\Transport($guzzleClient, $requestFactory); + +$client = (new \OpenSearch\ClientBuilder($transport))->build(); $info = $client->info(); -echo "{$info['version']['distribution']}: {$info['version']['number']}\n"; + +// Symfony example + +$symfonyPsr18Client = (new \Symfony\Component\HttpClient\Psr18Client())->withOptions([ + 'base_uri' => 'https://localhost:9200', + 'auth_basic' => ['admin', getenv('OPENSEARCH_PASSWORD')], + 'verify_peer' => false, + 'max_retries' => 2, + 'headers' => [ + 'Accept' => 'application/json', + 'Content-Type' => 'application/json', + ], +]); + +$transport = new OpenSearch\Transport($symfonyPsr18Client, $requestFactory); + +$client = (new \OpenSearch\ClientBuilder($transport))->build(); + +$info = $client->info(); diff --git a/src/OpenSearch/ClientBuilder.php b/src/OpenSearch/ClientBuilder.php index a075e36f9..c1c9b9b75 100644 --- a/src/OpenSearch/ClientBuilder.php +++ b/src/OpenSearch/ClientBuilder.php @@ -21,162 +21,57 @@ namespace OpenSearch; -use Aws\Credentials\CredentialProvider; -use Aws\Credentials\Credentials; -use Aws\Credentials\CredentialsInterface; -use GuzzleHttp\Ring\Client\CurlHandler; -use GuzzleHttp\Ring\Client\CurlMultiHandler; -use GuzzleHttp\Ring\Client\Middleware; -use OpenSearch\Common\Exceptions\AuthenticationConfigException; -use OpenSearch\Common\Exceptions\InvalidArgumentException; -use OpenSearch\Common\Exceptions\RuntimeException; use OpenSearch\ConnectionPool\AbstractConnectionPool; -use OpenSearch\ConnectionPool\Selectors\RoundRobinSelector; -use OpenSearch\ConnectionPool\Selectors\SelectorInterface; -use OpenSearch\ConnectionPool\StaticNoPingConnectionPool; -use OpenSearch\Connections\ConnectionFactory; use OpenSearch\Connections\ConnectionFactoryInterface; -use OpenSearch\Connections\ConnectionInterface; -use OpenSearch\Handlers\SigV4Handler; use OpenSearch\Namespaces\NamespaceBuilderInterface; use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Serializers\SmartSerializer; use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -use ReflectionClass; class ClientBuilder { - public const ALLOWED_METHODS_FROM_CONFIG = ['includePortInHostHeader']; - - /** - * @var Transport|null - */ - private $transport; - - private ?EndpointFactoryInterface $endpointFactory = null; - - /** - * @var NamespaceBuilderInterface[] - */ - private $registeredNamespacesBuilders = []; - - /** - * @var ConnectionFactoryInterface|null - */ - private $connectionFactory; - - /** - * @var callable|null - */ - private $handler; - - /** - * @var LoggerInterface|null - */ - private $logger; - - /** - * @var LoggerInterface|null - */ - private $tracer; - - /** - * @var string|AbstractConnectionPool - */ - private $connectionPool = StaticNoPingConnectionPool::class; - - /** - * @var string|SerializerInterface|null - */ - private $serializer = SmartSerializer::class; - - /** - * @var string|SelectorInterface|null - */ - private $selector = RoundRobinSelector::class; - - /** - * @var array - */ - private $connectionPoolArgs = [ - 'randomizeHosts' => true - ]; - - /** - * @var array|null - */ - private $hosts; - - /** - * @var array - */ - private $connectionParams; - - /** - * @var int|null - */ - private $retries; - - /** - * @var null|callable - */ - private $sigV4CredentialProvider; - - /** - * @var null|string - */ - private $sigV4Region; - - /** - * @var null|string - */ - private $sigV4Service; - - /** - * @var bool - */ - private $sniffOnStart = false; - /** - * @var null|array + * The serializer. */ - private $sslCert; + private ?SerializerInterface $serializer = null; /** - * @var null|array + * The endpoint factory. */ - private $sslKey; + private ?EndpointFactoryInterface $endpointFactory = null; /** - * @var null|bool|string + * The transport. */ - private $sslVerification; + private ?TransportInterface $transport = null; /** - * @var bool + * @var NamespaceBuilderInterface[] */ - private $includePortInHostHeader = false; + private array $registeredNamespacesBuilders = []; - /** - * @var string|null - */ - private $basicAuthentication = null; + public function __construct(TransportInterface $transport) + { + $this->transport = $transport; + } /** * Create an instance of ClientBuilder + * + * @deprecated in 2.3.1 and will be removed in 3.0.0. */ - public static function create(): ClientBuilder + public static function create(): ?ClientBuilder { - return new self(); + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + return null; } /** - * Can supply first param to Client::__construct() when invoking manually or with dependency injection + * Gets the serializer instance. If not set, it will create a new instance of SmartSerializer. */ - public function getTransport(): Transport + private function getSerializer(): SerializerInterface { - return $this->transport; + return $this->serializer ?? $this->serializer = new SmartSerializer(); } /** @@ -214,28 +109,13 @@ public function getRegisteredNamespacesBuilders(): array * @param bool $quiet False if unknown settings throw exception, true to silently * ignore unknown settings * @throws Common\Exceptions\RuntimeException + * + * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public static function fromConfig(array $config, bool $quiet = false): Client { + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); $builder = new self(); - foreach ($config as $key => $value) { - $method = in_array($key, self::ALLOWED_METHODS_FROM_CONFIG, true) ? $key : "set$key"; - $reflection = new ReflectionClass($builder); - if ($reflection->hasMethod($method)) { - $func = $reflection->getMethod($method); - if ($func->getNumberOfParameters() > 1) { - $builder->$method(...$value); - } else { - $builder->$method($value); - } - unset($config[$key]); - } - } - - if ($quiet === false && count($config) > 0) { - $unknown = implode(array_keys($config)); - throw new RuntimeException("Unknown parameters provided: $unknown"); - } return $builder->build(); } @@ -244,24 +124,13 @@ public static function fromConfig(array $config, bool $quiet = false): Client * * @param array $multiParams * @param array $singleParams - * @throws \RuntimeException + * + * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public static function defaultHandler(array $multiParams = [], array $singleParams = []): callable { - $future = null; - if (extension_loaded('curl')) { - $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); - if (function_exists('curl_reset')) { - $default = new CurlHandler($singleParams); - $future = new CurlMultiHandler($config); - } else { - $default = new CurlMultiHandler($config); - } - } else { - throw new \RuntimeException('OpenSearch-PHP requires cURL, or a custom HTTP handler.'); - } - - return $future ? Middleware::wrapFuture($default, $future) : $default; + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + return fn() => @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); } /** @@ -269,13 +138,10 @@ public static function defaultHandler(array $multiParams = [], array $singlePara * * @throws \RuntimeException */ - public static function multiHandler(array $params = []): CurlMultiHandler + public static function multiHandler(array $params = []): ?CurlMultiHandler { - if (function_exists('curl_multi_init')) { - return new CurlMultiHandler(array_merge([ 'mh' => curl_multi_init() ], $params)); - } - - throw new \RuntimeException('CurlMulti handler requires cURL.'); + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + return null; } /** @@ -283,13 +149,10 @@ public static function multiHandler(array $params = []): CurlMultiHandler * * @throws \RuntimeException */ - public static function singleHandler(): CurlHandler + public static function singleHandler(): ?CurlHandler { - if (function_exists('curl_reset')) { - return new CurlHandler(); - } - - throw new \RuntimeException('CurlSingle handler requires cURL.'); + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + return null; } /** @@ -299,8 +162,7 @@ public static function singleHandler(): CurlHandler */ public function setConnectionFactory(ConnectionFactoryInterface $connectionFactory): ClientBuilder { - $this->connectionFactory = $connectionFactory; - + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); return $this; } @@ -313,15 +175,7 @@ public function setConnectionFactory(ConnectionFactoryInterface $connectionFacto */ public function setConnectionPool($connectionPool, array $args = []): ClientBuilder { - if (is_string($connectionPool)) { - $this->connectionPool = $connectionPool; - $this->connectionPoolArgs = $args; - } elseif (is_object($connectionPool)) { - $this->connectionPool = $connectionPool; - } else { - throw new InvalidArgumentException("Serializer must be a class path or instantiated object extending AbstractConnectionPool"); - } - + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); return $this; } @@ -362,22 +216,32 @@ public function registerNamespace(NamespaceBuilderInterface $namespaceBuilder): * Set the transport * * @param Transport $transport + * + * @deprecated in 2.3.1 and will be removed in 3.0.0. */ - public function setTransport(Transport $transport): ClientBuilder + public function setTransport(TransportInterface $transport): ClientBuilder { + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); $this->transport = $transport; return $this; } + public function getTransport(): TransportInterface + { + return $this->transport; + } + /** * Set the HTTP handler (cURL is default) * * @param mixed $handler + * + * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public function setHandler($handler): ClientBuilder { - $this->handler = $handler; + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); return $this; } @@ -386,10 +250,12 @@ public function setHandler($handler): ClientBuilder * Set the PSR-3 Logger * * @param LoggerInterface $logger + * + * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public function setLogger(LoggerInterface $logger): ClientBuilder { - $this->logger = $logger; + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); return $this; } @@ -398,190 +264,32 @@ public function setLogger(LoggerInterface $logger): ClientBuilder * Set the PSR-3 tracer * * @param LoggerInterface $tracer + * + * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public function setTracer(LoggerInterface $tracer): ClientBuilder { - $this->tracer = $tracer; + @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); return $this; } /** * Set the serializer - * - * @param \OpenSearch\Serializers\SerializerInterface|string $serializer - */ - public function setSerializer($serializer): ClientBuilder - { - $this->parseStringOrObject($serializer, $this->serializer, 'SerializerInterface'); - - return $this; - } - - /** - * Set the hosts (nodes) - * - * @param array $hosts - */ - public function setHosts(array $hosts): ClientBuilder - { - $this->hosts = $hosts; - - return $this; - } - - /** - * Set Basic access authentication - * - * @see https://en.wikipedia.org/wiki/Basic_access_authentication - * @param string $username - * @param string $password - * - * @throws AuthenticationConfigException - */ - public function setBasicAuthentication(string $username, string $password): ClientBuilder - { - $this->basicAuthentication = $username.':'.$password; - - return $this; - } - - /** - * Set connection parameters - * - * @param array $params - */ - public function setConnectionParams(array $params): ClientBuilder - { - $this->connectionParams = $params; - - return $this; - } - - /** - * Set number or retries (default is equal to number of nodes) - * - * @param int $retries - */ - public function setRetries(int $retries): ClientBuilder - { - $this->retries = $retries; - - return $this; - } - - /** - * Set the selector algorithm - * - * @param \OpenSearch\ConnectionPool\Selectors\SelectorInterface|string $selector */ - public function setSelector($selector): ClientBuilder + public function setSerializer(SerializerInterface $serializer): static { - $this->parseStringOrObject($selector, $this->selector, 'SelectorInterface'); + $this->serializer = $serializer; return $this; } - /** - * Set the credential provider for SigV4 request signing. The value provider should be a - * callable object that will return - * - * @param callable|bool|array|CredentialsInterface|null $credentialProvider - */ - public function setSigV4CredentialProvider($credentialProvider): ClientBuilder + private function getEndpointFactory(): EndpointFactoryInterface { - if ($credentialProvider !== null && $credentialProvider !== false) { - $this->sigV4CredentialProvider = $this->normalizeCredentialProvider($credentialProvider); + if ($this->endpointFactory) { + return $this->endpointFactory; } - - return $this; - } - - /** - * Set the region for SigV4 signing. - * - * @param string|null $region - */ - public function setSigV4Region($region): ClientBuilder - { - $this->sigV4Region = $region; - - return $this; - } - - /** - * Set the service for SigV4 signing. - * - * @param string|null $service - */ - public function setSigV4Service($service): ClientBuilder - { - $this->sigV4Service = $service; - - return $this; - } - - /** - * Set sniff on start - * - * @param bool $sniffOnStart enable or disable sniff on start - */ - - public function setSniffOnStart(bool $sniffOnStart): ClientBuilder - { - $this->sniffOnStart = $sniffOnStart; - - return $this; - } - - /** - * Set SSL certificate - * - * @param string $cert The name of a file containing a PEM formatted certificate. - * @param string $password if the certificate requires a password - */ - public function setSSLCert(string $cert, ?string $password = null): ClientBuilder - { - $this->sslCert = [$cert, $password]; - - return $this; - } - - /** - * Set SSL key - * - * @param string $key The name of a file containing a private SSL key - * @param string $password if the private key requires a password - */ - public function setSSLKey(string $key, ?string $password = null): ClientBuilder - { - $this->sslKey = [$key, $password]; - - return $this; - } - - /** - * Set SSL verification - * - * @param bool|string $value - */ - public function setSSLVerification($value = true): ClientBuilder - { - $this->sslVerification = $value; - - return $this; - } - - /** - * Include the port in Host header - * - * @see https://github.com/elastic/elasticsearch-php/issues/993 - */ - public function includePortInHostHeader(bool $enable): ClientBuilder - { - $this->includePortInHostHeader = $enable; - - return $this; + return $this->endpointFactory = new EndpointFactory($this->getSerializer()); } /** @@ -589,267 +297,7 @@ public function includePortInHostHeader(bool $enable): ClientBuilder */ public function build(): Client { - $this->buildLoggers(); - - if (is_null($this->handler)) { - $this->handler = ClientBuilder::defaultHandler(); - } - - if (!is_null($this->sigV4CredentialProvider)) { - if (is_null($this->sigV4Region)) { - throw new RuntimeException("A region must be supplied for SigV4 request signing."); - } - - if (is_null($this->sigV4Service)) { - $this->setSigV4Service("es"); - } - - $this->handler = new SigV4Handler($this->sigV4Region, $this->sigV4Service, $this->sigV4CredentialProvider, $this->handler); - } - - $sslOptions = null; - if (isset($this->sslKey)) { - $sslOptions['ssl_key'] = $this->sslKey; - } - if (isset($this->sslCert)) { - $sslOptions['cert'] = $this->sslCert; - } - if (isset($this->sslVerification)) { - $sslOptions['verify'] = $this->sslVerification; - } - - if (!is_null($sslOptions)) { - $sslHandler = function (callable $handler, array $sslOptions) { - return function (array $request) use ($handler, $sslOptions) { - // Add our custom headers - foreach ($sslOptions as $key => $value) { - $request['client'][$key] = $value; - } - - // Send the request using the handler and return the response. - return $handler($request); - }; - }; - $this->handler = $sslHandler($this->handler, $sslOptions); - } - - if (is_null($this->serializer)) { - $this->serializer = new SmartSerializer(); - } elseif (is_string($this->serializer)) { - $this->serializer = new $this->serializer(); - } - - $this->connectionParams['client']['port_in_header'] = $this->includePortInHostHeader; - - if (! is_null($this->basicAuthentication)) { - if (isset($this->connectionParams['client']['curl']) === false) { - $this->connectionParams['client']['curl'] = []; - } - - $this->connectionParams['client']['curl'] += [ - CURLOPT_HTTPAUTH => CURLAUTH_BASIC, - CURLOPT_USERPWD => $this->basicAuthentication - ]; - } - - if (is_null($this->connectionFactory)) { - // Make sure we are setting Content-Type and Accept (unless the user has explicitly - // overridden it - if (! isset($this->connectionParams['client']['headers'])) { - $this->connectionParams['client']['headers'] = []; - } - if (! isset($this->connectionParams['client']['headers']['Content-Type'])) { - $this->connectionParams['client']['headers']['Content-Type'] = ['application/json']; - } - if (! isset($this->connectionParams['client']['headers']['Accept'])) { - $this->connectionParams['client']['headers']['Accept'] = ['application/json']; - } - - $this->connectionFactory = new ConnectionFactory($this->handler, $this->connectionParams, $this->serializer, $this->logger, $this->tracer); - } - - if (is_null($this->hosts)) { - $this->hosts = $this->getDefaultHost(); - } - - if (is_null($this->selector)) { - $this->selector = new RoundRobinSelector(); - } elseif (is_string($this->selector)) { - $this->selector = new $this->selector(); - } - - $this->buildTransport(); - - if (is_null($this->endpointFactory)) { - $this->endpointFactory = new EndpointFactory($this->serializer); - } - - $registeredNamespaces = []; - foreach ($this->registeredNamespacesBuilders as $builder) { - /** - * @var NamespaceBuilderInterface $builder - */ - $registeredNamespaces[$builder->getName()] = $builder->getObject($this->transport, $this->serializer); - } - - return $this->instantiate($this->transport, $this->endpointFactory, $registeredNamespaces); - } - - protected function instantiate(Transport $transport, EndpointFactoryInterface $endpointFactory, array $registeredNamespaces): Client - { - return new Client($transport, $endpointFactory, $registeredNamespaces); - } - - private function buildLoggers(): void - { - if (is_null($this->logger)) { - $this->logger = new NullLogger(); - } - - if (is_null($this->tracer)) { - $this->tracer = new NullLogger(); - } - } - - private function buildTransport(): void - { - $connections = $this->buildConnectionsFromHosts($this->hosts); - - if (is_string($this->connectionPool)) { - $this->connectionPool = new $this->connectionPool( - $connections, - $this->selector, - $this->connectionFactory, - $this->connectionPoolArgs - ); - } - - if (is_null($this->retries)) { - $this->retries = count($connections); - } - - if (is_null($this->transport)) { - $this->transport = new Transport($this->retries, $this->connectionPool, $this->logger, $this->sniffOnStart); - } - } - - private function parseStringOrObject($arg, &$destination, $interface): void - { - if (is_string($arg)) { - $destination = new $arg(); - } elseif (is_object($arg)) { - $destination = $arg; - } else { - throw new InvalidArgumentException("Serializer must be a class path or instantiated object implementing $interface"); - } - } - - private function getDefaultHost(): array - { - return ['localhost:9200']; - } - - /** - * @return ConnectionInterface[] - * @throws RuntimeException - */ - private function buildConnectionsFromHosts(array $hosts): array - { - $connections = []; - foreach ($hosts as $host) { - if (is_string($host)) { - $host = $this->prependMissingScheme($host); - $host = $this->extractURIParts($host); - } elseif (is_array($host)) { - $host = $this->normalizeExtendedHost($host); - } else { - $this->logger->error("Could not parse host: ".print_r($host, true)); - throw new RuntimeException("Could not parse host: ".print_r($host, true)); - } - - $connections[] = $this->connectionFactory->create($host); - } - - return $connections; + return new Client($this->getTransport(), $this->getEndpointFactory(), $this->getRegisteredNamespacesBuilders()); } - /** - * @throws RuntimeException - */ - private function normalizeExtendedHost(array $host): array - { - if (isset($host['host']) === false) { - $this->logger->error("Required 'host' was not defined in extended format: ".print_r($host, true)); - throw new RuntimeException("Required 'host' was not defined in extended format: ".print_r($host, true)); - } - - if (isset($host['scheme']) === false) { - $host['scheme'] = 'http'; - } - if (isset($host['port']) === false) { - $host['port'] = 9200; - } - return $host; - } - - /** - * @throws InvalidArgumentException - */ - private function extractURIParts(string $host): array - { - $parts = parse_url($host); - - if ($parts === false) { - throw new InvalidArgumentException(sprintf('Could not parse URI: "%s"', $host)); - } - - if (isset($parts['port']) !== true) { - $parts['port'] = 9200; - } - - return $parts; - } - - private function prependMissingScheme(string $host): string - { - if (!preg_match("/^https?:\/\//", $host)) { - $host = 'http://' . $host; - } - - return $host; - } - - private function normalizeCredentialProvider($provider): ?callable - { - if ($provider === null || $provider === false) { - return null; - } - - if (is_callable($provider)) { - return $provider; - } - - SigV4Handler::assertDependenciesInstalled(); - - if ($provider === true) { - return CredentialProvider::defaultProvider(); - } - - if ($provider instanceof CredentialsInterface) { - return CredentialProvider::fromCredentials($provider); - } elseif (is_array($provider) && isset($provider['key']) && isset($provider['secret'])) { - return CredentialProvider::fromCredentials( - new Credentials( - $provider['key'], - $provider['secret'], - isset($provider['token']) ? $provider['token'] : null, - isset($provider['expires']) ? $provider['expires'] : null - ) - ); - } - - throw new InvalidArgumentException('Credentials must be an instance of Aws\Credentials\CredentialsInterface, an' - . ' associative array that contains "key", "secret", and an optional "token" key-value pairs, a credentials' - . ' provider function, or true.'); - } } diff --git a/src/OpenSearch/ConnectionPool/Selectors/RandomSelector.php b/src/OpenSearch/Common/Exceptions/NoAsyncClientException.php similarity index 59% rename from src/OpenSearch/ConnectionPool/Selectors/RandomSelector.php rename to src/OpenSearch/Common/Exceptions/NoAsyncClientException.php index ecb7b380a..b95ce0532 100644 --- a/src/OpenSearch/ConnectionPool/Selectors/RandomSelector.php +++ b/src/OpenSearch/Common/Exceptions/NoAsyncClientException.php @@ -1,7 +1,5 @@ current % count($connections)]; - - $this->current += 1; - - return $returnConnection; - } -} diff --git a/src/OpenSearch/ConnectionPool/Selectors/SelectorInterface.php b/src/OpenSearch/ConnectionPool/Selectors/SelectorInterface.php index eeb290863..1eff5268c 100644 --- a/src/OpenSearch/ConnectionPool/Selectors/SelectorInterface.php +++ b/src/OpenSearch/ConnectionPool/Selectors/SelectorInterface.php @@ -23,6 +23,11 @@ use OpenSearch\Connections\ConnectionInterface; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ interface SelectorInterface { /** diff --git a/src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php b/src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php deleted file mode 100644 index 7da041a06..000000000 --- a/src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php +++ /dev/null @@ -1,57 +0,0 @@ -current]->isAlive()) { - return $connections[$this->current]; - } - - $this->currentCounter += 1; - $this->current = $this->currentCounter % count($connections); - - return $connections[$this->current]; - } -} diff --git a/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php b/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php deleted file mode 100644 index 29c14d267..000000000 --- a/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php +++ /dev/null @@ -1,48 +0,0 @@ - $connectionPoolParams - */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) - { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - return $this->selector->select($this->connections); - } - - public function scheduleCheck(): void - { - } -} diff --git a/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php b/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php deleted file mode 100644 index 2c6d6b833..000000000 --- a/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php +++ /dev/null @@ -1,181 +0,0 @@ - $connectionPoolParams - */ - public function __construct( - $connections, - SelectorInterface $selector, - ConnectionFactoryInterface $factory, - $connectionPoolParams - ) { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - - $this->setConnectionPoolParams($connectionPoolParams); - $this->nextSniff = time() + $this->sniffingInterval; - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - $this->sniff($force); - - $size = count($this->connections); - while ($size--) { - /** - * @var Connection $connection - */ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true || $connection->ping() === true) { - return $connection; - } - } - - if ($force === true) { - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - return $this->nextConnection(true); - } - - public function scheduleCheck(): void - { - $this->nextSniff = -1; - } - - private function sniff(bool $force = false): void - { - if ($force === false && $this->nextSniff > time()) { - return; - } - - $total = count($this->connections); - - while ($total--) { - /** - * @var Connection $connection - */ - $connection = $this->selector->select($this->connections); - - if ($connection->isAlive() xor $force) { - continue; - } - - if ($this->sniffConnection($connection) === true) { - return; - } - } - - if ($force === true) { - return; - } - - foreach ($this->seedConnections as $connection) { - /** - * @var Connection $connection - */ - if ($this->sniffConnection($connection) === true) { - return; - } - } - } - - private function sniffConnection(Connection $connection): bool - { - try { - $response = $connection->sniff(); - } catch (OperationTimeoutException $exception) { - return false; - } - - $nodes = $this->parseClusterState($response); - - if (count($nodes) === 0) { - return false; - } - - $this->connections = []; - - foreach ($nodes as $node) { - $nodeDetails = [ - 'host' => $node['host'], - 'port' => $node['port'], - ]; - $this->connections[] = $this->connectionFactory->create($nodeDetails); - } - - $this->nextSniff = time() + $this->sniffingInterval; - - return true; - } - - /** - * @return list - */ - private function parseClusterState($nodeInfo): array - { - $pattern = '/([^:]*):(\d+)/'; - $hosts = []; - - foreach ($nodeInfo['nodes'] as $node) { - if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) { - if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) { - $hosts[] = [ - 'host' => $match[1], - 'port' => (int)$match[2], - ]; - } - } - } - - return $hosts; - } - - /** - * @param array $connectionPoolParams - */ - private function setConnectionPoolParams(array $connectionPoolParams): void - { - $this->sniffingInterval = (int)($connectionPoolParams['sniffingInterval'] ?? 300); - } -} diff --git a/src/OpenSearch/ConnectionPool/StaticConnectionPool.php b/src/OpenSearch/ConnectionPool/StaticConnectionPool.php deleted file mode 100644 index c074e8c8c..000000000 --- a/src/OpenSearch/ConnectionPool/StaticConnectionPool.php +++ /dev/null @@ -1,105 +0,0 @@ - $connectionPoolParams - */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) - { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - $this->scheduleCheck(); - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - $skipped = []; - - $total = count($this->connections); - while ($total--) { - /** - * @var Connection $connection - */ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true) { - return $connection; - } - - if ($this->readyToRevive($connection) === true) { - if ($connection->ping() === true) { - return $connection; - } - } else { - $skipped[] = $connection; - } - } - - // All "alive" nodes failed, force pings on "dead" nodes - foreach ($skipped as $connection) { - if ($connection->ping() === true) { - return $connection; - } - } - - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - public function scheduleCheck(): void - { - foreach ($this->connections as $connection) { - $connection->markDead(); - } - } - - private function readyToRevive(Connection $connection): bool - { - $timeout = min( - $this->pingTimeout * pow(2, $connection->getPingFailures()), - $this->maxPingTimeout - ); - - if ($connection->getLastPing() + $timeout < time()) { - return true; - } else { - return false; - } - } -} diff --git a/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php b/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php deleted file mode 100644 index a4a85e325..000000000 --- a/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php +++ /dev/null @@ -1,88 +0,0 @@ - $connectionPoolParams - */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) - { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - $total = count($this->connections); - while ($total--) { - /** - * @var Connection $connection -*/ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true) { - return $connection; - } - - if ($this->readyToRevive($connection) === true) { - return $connection; - } - } - - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - public function scheduleCheck(): void - { - } - - private function readyToRevive(Connection $connection): bool - { - $timeout = min( - $this->pingTimeout * pow(2, $connection->getPingFailures()), - $this->maxPingTimeout - ); - - if ($connection->getLastPing() + $timeout < time()) { - return true; - } else { - return false; - } - } -} diff --git a/src/OpenSearch/Connections/Connection.php b/src/OpenSearch/Connections/Connection.php deleted file mode 100644 index 7279f56cf..000000000 --- a/src/OpenSearch/Connections/Connection.php +++ /dev/null @@ -1,789 +0,0 @@ -> - */ - protected $headers = []; - - /** - * @var bool - */ - protected $isAlive = false; - - /** - * @var float - */ - private $pingTimeout = 1; //TODO expose this - - /** - * @var int - */ - private $lastPing = 0; - - /** - * @var int - */ - private $failedPings = 0; - - /** - * @var mixed[] - */ - private $lastRequest = array(); - - /** - * @var string - */ - private $OSVersion = null; - - /** - * @param array{host: string, port?: int, scheme?: string, user?: string, pass?: string, path?: string} $hostDetails - * @param array{client?: array{headers?: array>, curl?: array}} $connectionParams - */ - public function __construct( - callable $handler, - array $hostDetails, - array $connectionParams, - SerializerInterface $serializer, - LoggerInterface $log, - LoggerInterface $trace - ) { - if (isset($hostDetails['port']) !== true) { - $hostDetails['port'] = 9200; - } - - if (isset($hostDetails['scheme'])) { - $this->transportSchema = $hostDetails['scheme']; - } - - // Only Set the Basic if API Key is not set and setBasicAuthentication was not called prior - if (isset($connectionParams['client']['headers']['Authorization']) === false - && isset($connectionParams['client']['curl'][CURLOPT_HTTPAUTH]) === false - && isset($hostDetails['user']) - && isset($hostDetails['pass']) - ) { - $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC; - $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass']; - } - - $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port']; - - if (isset($connectionParams['client']['headers'])) { - $this->headers = $connectionParams['client']['headers']; - unset($connectionParams['client']['headers']); - } - - // Add the User-Agent using the format: / (metadata-values) - $this->headers['User-Agent'] = [sprintf( - 'opensearch-php/%s (%s %s; PHP %s)', - Client::VERSION, - PHP_OS, - $this->getOSVersion(), - PHP_VERSION - )]; - - $host = $hostDetails['host']; - $path = null; - if (isset($hostDetails['path']) === true) { - $path = $hostDetails['path']; - } - $port = $hostDetails['port']; - - $this->host = $host; - $this->path = $path; - $this->port = $port; - $this->log = $log; - $this->trace = $trace; - $this->connectionParams = $connectionParams; - $this->serializer = $serializer; - - $this->handler = $this->wrapHandler($handler); - } - - /** - * @param string $method - * @param string $uri - * @param null|array $params - * @param mixed $body - * @param array $options - * @param Transport|null $transport - * @return mixed - */ - public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null) - { - if ($body !== null) { - $body = $this->serializer->serialize($body); - } - - $headers = $this->headers; - if (isset($options['client']['headers']) && is_array($options['client']['headers'])) { - $headers = array_merge($this->headers, $options['client']['headers']); - } - - $host = $this->host; - if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) { - $host .= ':' . $this->port; - } - - $request = [ - 'http_method' => $method, - 'scheme' => $this->transportSchema, - 'uri' => $this->getURI($uri, $params), - 'body' => $body, - 'headers' => array_merge( - [ - 'Host' => [$host] - ], - $headers - ) - ]; - - $request = array_replace_recursive($request, $this->connectionParams, $options); - - // RingPHP does not like if client is empty - if (empty($request['client'])) { - unset($request['client']); - } - - $handler = $this->handler; - $future = $handler($request, $this, $transport, $options); - - return $future; - } - - public function getTransportSchema(): string - { - return $this->transportSchema; - } - - public function getLastRequestInfo(): array - { - return $this->lastRequest; - } - - private function wrapHandler(callable $handler): callable - { - return function (array $request, Connection $connection, ?Transport $transport, $options) use ($handler) { - $this->lastRequest = []; - $this->lastRequest['request'] = $request; - - // Send the request using the wrapped handler. - $response = Core::proxy( - $handler($request), - function ($response) use ($connection, $transport, $request, $options) { - $this->lastRequest['response'] = $response; - - if (isset($response['error']) === true) { - if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { - $this->log->warning("Curl exception encountered."); - - $exception = $this->getCurlRetryException($request, $response); - - $this->logRequestFail($request, $response, $exception); - - $node = $connection->getHost(); - $this->log->warning("Marking node $node dead."); - $connection->markDead(); - - // If the transport has not been set, we are inside a Ping or Sniff, - // so we don't want to retrigger retries anyway. - // - // TODO this could be handled better, but we are limited because connectionpools do not - // have access to Transport. Architecturally, all of this needs to be refactored - if (isset($transport) === true) { - $transport->connectionPool->scheduleCheck(); - - $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; - $shouldRetry = $transport->shouldRetry($request); - $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; - - $this->log->warning("Retries left? $shouldRetryText"); - if ($shouldRetry && !$neverRetry) { - return $transport->performRequest( - $request['http_method'], - $request['uri'], - [], - $request['body'], - $options - ); - } - } - - $this->log->warning("Out of retries, throwing exception from $node"); - // Only throw if we run out of retries - throw $exception; - } else { - // Something went seriously wrong, bail - $exception = new TransportException($response['error']->getMessage()); - $this->logRequestFail($request, $response, $exception); - throw $exception; - } - } else { - $connection->markAlive(); - - if (isset($response['headers']['Warning'])) { - $this->logWarning($request, $response); - } - if (isset($response['body']) === true) { - $response['body'] = stream_get_contents($response['body']); - $this->lastRequest['response']['body'] = $response['body']; - } - - if ($response['status'] >= 400 && $response['status'] < 500) { - $ignore = $request['client']['ignore'] ?? []; - // Skip 404 if succeeded true in the body (e.g. clear_scroll) - $body = $response['body'] ?? ''; - if (strpos($body, '"succeeded":true') !== false) { - $ignore[] = 404; - } - $this->process4xxError($request, $response, $ignore); - } elseif ($response['status'] >= 500) { - $ignore = $request['client']['ignore'] ?? []; - $this->process5xxError($request, $response, $ignore); - } - - // No error, deserialize - $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); - } - $this->logRequestSuccess($request, $response); - - return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; - } - ); - - return $response; - }; - } - - /** - * @param array|null $params - */ - private function getURI(string $uri, ?array $params): string - { - if (isset($params) === true && !empty($params)) { - $params = array_map( - function ($value) { - if ($value === true) { - return 'true'; - } elseif ($value === false) { - return 'false'; - } - - return $value; - }, - $params - ); - - $uri .= '?' . http_build_query($params); - } - - if ($this->path !== null) { - $uri = $this->path . $uri; - } - - return $uri; - } - - /** - * @return array> - */ - public function getHeaders(): array - { - return $this->headers; - } - - public function logWarning(array $request, array $response): void - { - $this->log->warning('Deprecation', $response['headers']['Warning']); - } - - /** - * Log a successful request - * - * @param array $request - * @param array $response - * @return void - */ - public function logRequestSuccess(array $request, array $response): void - { - $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; - $uri = $this->addPortInUrl($response['effective_url'], (int) $port); - - $this->log->debug('Request Body', array($request['body'])); - $this->log->info( - 'Request Success:', - array( - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'headers' => $request['headers'], - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - ) - ); - $this->log->debug('Response', array($response['body'])); - - // Build the curl command for Trace. - $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); - $this->trace->info($curlCommand); - $this->trace->debug( - 'Response:', - array( - 'response' => $response['body'], - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - ) - ); - } - - /** - * Log a failed request - * - * @param array $request - * @param array $response - * @param \Throwable $exception - * - * @return void - */ - public function logRequestFail(array $request, array $response, \Throwable $exception): void - { - $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; - $uri = $this->addPortInUrl($response['effective_url'], (int) $port); - - $this->log->debug('Request Body', array($request['body'])); - $this->log->warning( - 'Request Failure:', - array( - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'headers' => $request['headers'], - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - 'error' => $exception->getMessage(), - ) - ); - $this->log->warning('Response', array($response['body'])); - - // Build the curl command for Trace. - $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); - $this->trace->info($curlCommand); - $this->trace->debug( - 'Response:', - array( - 'response' => $response, - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - ) - ); - } - - public function ping(): bool - { - $options = [ - 'client' => [ - 'timeout' => $this->pingTimeout, - 'never_retry' => true, - 'verbose' => true - ] - ]; - try { - $response = $this->performRequest('HEAD', '/', null, null, $options); - $response = $response->wait(); - } catch (TransportException $exception) { - $this->markDead(); - - return false; - } - - if ($response['status'] === 200) { - $this->markAlive(); - - return true; - } else { - $this->markDead(); - - return false; - } - } - - /** - * @return array|\GuzzleHttp\Ring\Future\FutureArray - */ - public function sniff() - { - $options = [ - 'client' => [ - 'timeout' => $this->pingTimeout, - 'never_retry' => true - ] - ]; - - return $this->performRequest('GET', '/_nodes/', null, null, $options); - } - - public function isAlive(): bool - { - return $this->isAlive; - } - - public function markAlive(): void - { - $this->failedPings = 0; - $this->isAlive = true; - $this->lastPing = time(); - } - - public function markDead(): void - { - $this->isAlive = false; - $this->failedPings += 1; - $this->lastPing = time(); - } - - public function getLastPing(): int - { - return $this->lastPing; - } - - public function getPingFailures(): int - { - return $this->failedPings; - } - - public function getHost(): string - { - return $this->host; - } - - public function getUserPass(): ?string - { - return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null; - } - - public function getPath(): ?string - { - return $this->path; - } - - /** - * @return int - */ - public function getPort() - { - return $this->port; - } - - protected function getCurlRetryException(array $request, array $response): OpenSearchException - { - $exception = null; - $message = $response['error']->getMessage(); - $exception = new MaxRetriesException($message); - switch ($response['curl']['errno']) { - case 6: - $exception = new CouldNotResolveHostException($message, 0, $exception); - break; - case 7: - $exception = new CouldNotConnectToHost($message, 0, $exception); - break; - case 28: - $exception = new OperationTimeoutException($message, 0, $exception); - break; - } - - return $exception; - } - - /** - * Get the OS version using php_uname if available - * otherwise it returns an empty string - * - * @see https://github.com/elastic/elasticsearch-php/issues/922 - */ - private function getOSVersion(): string - { - if ($this->OSVersion === null) { - $this->OSVersion = strpos(strtolower(ini_get('disable_functions')), 'php_uname') !== false - ? '' - : php_uname("r"); - } - return $this->OSVersion; - } - - /** - * Add the port value in the URL if not present - */ - private function addPortInUrl(string $uri, int $port): string - { - if (strpos($uri, ':', 7) !== false) { - return $uri; - } - return preg_replace('#([^/])/([^/])#', sprintf("$1:%s/$2", $port), $uri, 1); - } - - /** - * Construct a string cURL command - */ - private function buildCurlCommand(string $method, string $url, ?string $body): string - { - if (strpos($url, '?') === false) { - $url .= '?pretty=true'; - } else { - str_replace('?', '?pretty=true', $url); - } - - $curlCommand = 'curl -X' . strtoupper($method); - $curlCommand .= " '" . $url . "'"; - - if (isset($body) === true && $body !== '') { - $curlCommand .= " -d '" . $body . "'"; - } - - return $curlCommand; - } - - /** - * @throws OpenSearchException - */ - private function process4xxError(array $request, array $response, array $ignore): void - { - $statusCode = $response['status']; - - /** - * @var \Exception $exception - */ - $exception = $this->tryDeserialize400Error($response); - - if (array_search($response['status'], $ignore) !== false) { - return; - } - - $responseBody = $this->convertBodyToString($response['body'], $statusCode, $exception); - if ($statusCode === 401) { - $exception = new Unauthorized401Exception($responseBody, $statusCode); - } elseif ($statusCode === 403) { - $exception = new Forbidden403Exception($responseBody, $statusCode); - } elseif ($statusCode === 404) { - $exception = new Missing404Exception($responseBody, $statusCode); - } elseif ($statusCode === 409) { - $exception = new Conflict409Exception($responseBody, $statusCode); - } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) { - $exception = new ScriptLangNotSupportedException($responseBody. $statusCode); - } elseif ($statusCode === 408) { - $exception = new RequestTimeout408Exception($responseBody, $statusCode); - } else { - $exception = new BadRequest400Exception($responseBody, $statusCode); - } - - $this->logRequestFail($request, $response, $exception); - - throw $exception; - } - - /** - * @throws OpenSearchException - */ - private function process5xxError(array $request, array $response, array $ignore): void - { - $statusCode = (int) $response['status']; - $responseBody = $response['body']; - - /** - * @var \Exception $exception - */ - $exception = $this->tryDeserialize500Error($response); - - $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage(); - $this->log->error($exceptionText); - $this->log->error($exception->getTraceAsString()); - - if (array_search($statusCode, $ignore) !== false) { - return; - } - - if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) { - $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception); - } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) { - $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception); - } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) { - $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception); - } else { - $exception = new ServerErrorResponseException( - $this->convertBodyToString($responseBody, $statusCode, $exception), - $statusCode - ); - } - - $this->logRequestFail($request, $response, $exception); - - throw $exception; - } - - private function convertBodyToString($body, int $statusCode, Exception $exception): string - { - if (empty($body)) { - return sprintf( - "Unknown %d error from OpenSearch %s", - $statusCode, - $exception->getMessage() - ); - } - // if body is not string, we convert it so it can be used as Exception message - if (!is_string($body)) { - return json_encode($body); - } - return $body; - } - - private function tryDeserialize400Error(array $response): OpenSearchException - { - return $this->tryDeserializeError($response, BadRequest400Exception::class); - } - - private function tryDeserialize500Error(array $response): OpenSearchException - { - return $this->tryDeserializeError($response, ServerErrorResponseException::class); - } - - private function tryDeserializeError(array $response, string $errorClass): OpenSearchException - { - $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']); - if (is_array($error) === true) { - if (isset($error['error']) === false) { - // <2.0 "i just blew up" nonstructured exception - // $error is an array but we don't know the format, reuse the response body instead - // added json_encode to convert into a string - return new $errorClass(json_encode($response['body']), (int) $response['status']); - } - - // 2.0 structured exceptions - if (is_array($error['error']) && array_key_exists('reason', $error['error']) === true) { - // Try to use root cause first (only grabs the first root cause) - $info = $error['error']['root_cause'][0] ?? $error['error']; - $cause = $info['reason']; - $type = $info['type']; - // added json_encode to convert into a string - $original = new $errorClass(json_encode($response['body']), $response['status']); - - return new $errorClass("$type: $cause", (int) $response['status'], $original); - } - // <2.0 semi-structured exceptions - // added json_encode to convert into a string - $original = new $errorClass(json_encode($response['body']), $response['status']); - - $errorEncoded = $error['error']; - if (is_array($errorEncoded)) { - $errorEncoded = json_encode($errorEncoded); - } - return new $errorClass($errorEncoded, (int) $response['status'], $original); - } - - // if responseBody is not string, we convert it so it can be used as Exception message - $responseBody = $response['body']; - if (!is_string($responseBody)) { - $responseBody = json_encode($responseBody); - } - - // <2.0 "i just blew up" nonstructured exception - return new $errorClass($responseBody); - } -} diff --git a/src/OpenSearch/Connections/ConnectionFactory.php b/src/OpenSearch/Connections/ConnectionFactory.php deleted file mode 100644 index f95c2f29f..000000000 --- a/src/OpenSearch/Connections/ConnectionFactory.php +++ /dev/null @@ -1,81 +0,0 @@ ->, curl?: array}} $connectionParams - */ - public function __construct(callable $handler, array $connectionParams, SerializerInterface $serializer, LoggerInterface $logger, LoggerInterface $tracer) - { - $this->handler = $handler; - $this->connectionParams = $connectionParams; - $this->logger = $logger; - $this->tracer = $tracer; - $this->serializer = $serializer; - } - - public function create(array $hostDetails): ConnectionInterface - { - if (isset($hostDetails['path'])) { - $hostDetails['path'] = rtrim($hostDetails['path'], '/'); - } - - return new Connection( - $this->handler, - $hostDetails, - $this->connectionParams, - $this->serializer, - $this->logger, - $this->tracer - ); - } -} diff --git a/src/OpenSearch/Connections/ConnectionFactoryInterface.php b/src/OpenSearch/Connections/ConnectionFactoryInterface.php index da209abd9..8929c9ea4 100644 --- a/src/OpenSearch/Connections/ConnectionFactoryInterface.php +++ b/src/OpenSearch/Connections/ConnectionFactoryInterface.php @@ -21,6 +21,11 @@ namespace OpenSearch\Connections; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ interface ConnectionFactoryInterface { /** diff --git a/src/OpenSearch/Connections/ConnectionInterface.php b/src/OpenSearch/Connections/ConnectionInterface.php index 0e6280aec..1d9d5a9bd 100644 --- a/src/OpenSearch/Connections/ConnectionInterface.php +++ b/src/OpenSearch/Connections/ConnectionInterface.php @@ -21,10 +21,13 @@ namespace OpenSearch\Connections; -use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Transport; -use Psr\Log\LoggerInterface; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ interface ConnectionInterface { /** diff --git a/src/OpenSearch/Endpoints/AbstractEndpoint.php b/src/OpenSearch/Endpoints/AbstractEndpoint.php index 0b049daeb..6718ee6f3 100644 --- a/src/OpenSearch/Endpoints/AbstractEndpoint.php +++ b/src/OpenSearch/Endpoints/AbstractEndpoint.php @@ -22,26 +22,27 @@ namespace OpenSearch\Endpoints; use OpenSearch\Common\Exceptions\UnexpectedValueException; +use OpenSearch\EndpointInterface; use OpenSearch\Serializers\SerializerInterface; use function array_filter; -abstract class AbstractEndpoint +abstract class AbstractEndpoint implements EndpointInterface { /** * @var array */ - protected $params = []; + protected array $params = []; /** * @var string|null */ - protected $index = null; + protected ?string $index = null; /** * @var string|int|null */ - protected $id = null; + protected string|int|null $id = null; /** * @var string|null @@ -51,17 +52,17 @@ abstract class AbstractEndpoint /** * @var string|array|null */ - protected $body = null; + protected string|array|null $body = null; /** * @var array */ - private $options = []; + private array $options = []; /** * @var SerializerInterface */ - protected $serializer; + protected SerializerInterface $serializer; /** * @return string[] @@ -83,9 +84,8 @@ abstract public function getMethod(): string; * Set the parameters for this endpoint * * @param mixed[] $params Array of parameters - * @return $this */ - public function setParams(array $params) + public function setParams(array $params): static { $this->extractOptions($params); $this->checkUserParams($params); @@ -117,7 +117,7 @@ public function getIndex(): ?string * * @return $this */ - public function setIndex($index) + public function setIndex($index): static { if ($index === null) { return $this; @@ -134,12 +134,7 @@ public function setIndex($index) return $this; } - /** - * @param int|string|null $docID - * - * @return $this - */ - public function setId($docID) + public function setId(int|string|null $docID): static { if ($docID === null) { return $this; @@ -154,16 +149,12 @@ public function setId($docID) return $this; } - /** - * @return array|string - */ - public function getBody() + public function getBody(): array|string { return $this->body; } - - public function setBody(array $body) + public function setBody(array|string $body): static { $this->body = $body; diff --git a/src/OpenSearch/Handlers/SigV4Handler.php b/src/OpenSearch/Handlers/SigV4Handler.php deleted file mode 100644 index e45639c75..000000000 --- a/src/OpenSearch/Handlers/SigV4Handler.php +++ /dev/null @@ -1,151 +0,0 @@ ->, body: string|resource|null, client?: array} - */ -class SigV4Handler -{ - /** - * @var SignatureV4 - */ - private $signer; - /** - * @var callable - */ - private $credentialProvider; - /** - * @var callable - */ - private $wrappedHandler; - - /** - * A handler that applies an AWS V4 signature before dispatching requests. - * - * @param string $region The region of your Amazon - * OpenSearch Service domain - * @param string $service The Service of your Amazon - * OpenSearch Service domain - * @param callable|null $credentialProvider A callable that returns a - * promise that is fulfilled - * with an instance of - * Aws\Credentials\Credentials - * @param callable|null $wrappedHandler A RingPHP handler - */ - public function __construct( - string $region, - string $service, - ?callable $credentialProvider = null, - ?callable $wrappedHandler = null - ) { - self::assertDependenciesInstalled(); - $this->signer = new SignatureV4($service, $region); - $this->wrappedHandler = $wrappedHandler - ?: ClientBuilder::defaultHandler(); - $this->credentialProvider = $credentialProvider - ?: CredentialProvider::defaultProvider(); - } - - /** - * @phpstan-param RingPhpRequest $request - */ - public function __invoke(array $request) - { - $creds = call_user_func($this->credentialProvider)->wait(); - - $psr7Request = $this->createPsr7Request($request); - $psr7Request = $psr7Request->withHeader('x-amz-content-sha256', Utils::hash($psr7Request->getBody(), 'sha256')); - $signedRequest = $this->signer - ->signRequest($psr7Request, $creds); - return call_user_func($this->wrappedHandler, $this->createRingRequest($signedRequest, $request)); - } - - public static function assertDependenciesInstalled(): void - { - if (!class_exists(SignatureV4::class)) { - throw new RuntimeException( - 'The AWS SDK for PHP must be installed in order to use the SigV4 signing handler' - ); - } - } - - /** - * @phpstan-param RingPhpRequest $ringPhpRequest - */ - private function createPsr7Request(array $ringPhpRequest): Request - { - // fix for uppercase 'Host' array key in elasticsearch-php 5.3.1 and backward compatible - // https://github.com/aws/aws-sdk-php/issues/1225 - $hostKey = isset($ringPhpRequest['headers']['Host']) ? 'Host' : 'host'; - - // Amazon ES/OS listens on standard ports (443 for HTTPS, 80 for HTTP). - // Consequently, the port should be stripped from the host header. - $parsedUrl = parse_url($ringPhpRequest['headers'][$hostKey][0]); - if (isset($parsedUrl['host'])) { - $ringPhpRequest['headers'][$hostKey][0] = $parsedUrl['host']; - } - - // Create a PSR-7 URI from the array passed to the handler - $uri = (new Uri($ringPhpRequest['uri'])) - ->withScheme($ringPhpRequest['scheme']) - ->withHost($ringPhpRequest['headers'][$hostKey][0]); - if (isset($ringPhpRequest['query_string'])) { - $uri = $uri->withQuery($ringPhpRequest['query_string']); - } - - // Create a PSR-7 request from the array passed to the handler - return new Request( - $ringPhpRequest['http_method'], - $uri, - $ringPhpRequest['headers'], - $ringPhpRequest['body'] - ); - } - - /** - * @phpstan-param RingPhpRequest $originalRequest - * - * @phpstan-return RingPhpRequest - */ - private function createRingRequest(RequestInterface $request, array $originalRequest): array - { - $uri = $request->getUri(); - $body = (string) $request->getBody(); - - // RingPHP currently expects empty message bodies to be null: - // https://github.com/guzzle/RingPHP/blob/4c8fe4c48a0fb7cc5e41ef529e43fecd6da4d539/src/Client/CurlFactory.php#L202 - if (empty($body)) { - $body = null; - } - - // Reset the explicit port in the URL - $client = $originalRequest['client']; - unset($client['curl'][CURLOPT_PORT]); - - $ringRequest = [ - 'http_method' => $request->getMethod(), - 'scheme' => $uri->getScheme(), - 'uri' => $uri->getPath(), - 'body' => $body, - 'headers' => $request->getHeaders(), - 'client' => $client - ]; - if ($uri->getQuery()) { - $ringRequest['query_string'] = $uri->getQuery(); - } - - return $ringRequest; - } -} diff --git a/src/OpenSearch/Namespaces/AbstractNamespace.php b/src/OpenSearch/Namespaces/AbstractNamespace.php index ec908dca1..26b9a40e3 100644 --- a/src/OpenSearch/Namespaces/AbstractNamespace.php +++ b/src/OpenSearch/Namespaces/AbstractNamespace.php @@ -21,10 +21,14 @@ namespace OpenSearch\Namespaces; +use Http\Promise\Promise; use OpenSearch\EndpointFactoryInterface; +use OpenSearch\EndpointInterface; use OpenSearch\Endpoints\AbstractEndpoint; use OpenSearch\LegacyEndpointFactory; use OpenSearch\Transport; +use OpenSearch\TransportInterface; +use Psr\Http\Message\ResponseInterface; abstract class AbstractNamespace { @@ -35,6 +39,8 @@ abstract class AbstractNamespace protected EndpointFactoryInterface $endpointFactory; + protected bool $isAsync = false; + /** * @var callable * @@ -59,6 +65,20 @@ public function __construct(Transport $transport, callable|EndpointFactoryInterf $this->endpointFactory = $endpointFactory; } + public function isAsync(): bool + { + return $this->isAsync; + } + + /** + * Set the client to run in async mode. + */ + public function setAsync(bool $isAsync): static + { + $this->isAsync = $isAsync; + return $this; + } + /** * @return null|mixed */ @@ -73,16 +93,18 @@ public function extractArgument(array &$params, string $arg) } } - protected function performRequest(AbstractEndpoint $endpoint) + protected function performRequest(AbstractEndpoint $endpoint): Promise|ResponseInterface { - $response = $this->transport->performRequest( + $request = $this->transport->createRequest( $endpoint->getMethod(), $endpoint->getURI(), $endpoint->getParams(), $endpoint->getBody(), - $endpoint->getOptions() ); - - return $this->transport->resultOrFuture($response, $endpoint->getOptions()); + if ($this->isAsync()) { + return $this->transport->sendAsyncRequest($request); + } + return $this->transport->sendRequest($request); } + } diff --git a/src/OpenSearch/RequestFactory.php b/src/OpenSearch/RequestFactory.php new file mode 100644 index 000000000..2bcf00cae --- /dev/null +++ b/src/OpenSearch/RequestFactory.php @@ -0,0 +1,137 @@ +getUriFactory()->createUri($uri); + $uri = $uri->withQuery(http_build_query($params)); + $request = $this->getRequestFactory()->createRequest($method, $uri); + if ($body !== null) { + $bodyJson = $this->getSerializer()->serialize($body); + $bodyStream = $this->getStreamFactory()->createStream($bodyJson); + $request = $request->withBody($bodyStream); + } + foreach ($headers as $name => $value) { + $request = $request->withHeader($name, $value); + } + return $request; + } + + /** + * Get the serializer to use for serializing request and response bodies. + */ + public function getSerializer(): ?SerializerInterface + { + if ($this->serializer) { + return $this->serializer; + } + return new SmartSerializer(); + } + + /** + * Set the serializer to use for serializing request and response bodies. + */ + public function setSerializer(?SerializerInterface $serializer): self + { + $this->serializer = $serializer; + return $this; + } + + /** + * Get the request factory to use for creating requests. + * + * If no request factory is set, the discovery mechanism will be used to find + * a request factory. + * + * @throws \Http\Discovery\Exception\NotFoundException + */ + private function getRequestFactory(): PsrRequestFactoryInterface + { + if ($this->requestFactory) { + return $this->requestFactory; + } + + return $this->requestFactory = Psr17FactoryDiscovery::findRequestFactory(); + } + + /** + * Set the request factory to use for creating requests. + */ + public function setRequestFactory(PsrRequestFactoryInterface $requestFactory): self + { + $this->requestFactory = $requestFactory; + return $this; + } + + /** + * Get the stream factory to use for creating streams. + * + * If no stream factory is set, the discovery mechanism will be used to find + * a stream factory. + * + * @throws \Http\Discovery\Exception\NotFoundException + */ + private function getStreamFactory(): StreamFactoryInterface + { + if ($this->streamFactory) { + return $this->streamFactory; + } + return $this->streamFactory = Psr17FactoryDiscovery::findStreamFactory(); + } + + /** + * Set the stream factory to use for creating streams. + */ + public function setStreamFactory(?StreamFactoryInterface $streamFactory): self + { + $this->streamFactory = $streamFactory; + return $this; + } + + /** + * Get the URI factory to use for creating URIs. + * + * If no URI factory is set, the discovery mechanism will be used to find + * a URI factory. + * + * @throws \Http\Discovery\Exception\NotFoundException + */ + private function getUriFactory(): UriFactoryInterface + { + if ($this->uriFactory) { + return $this->uriFactory; + } + return $this->uriFactory = Psr17FactoryDiscovery::findUriFactory(); + } + + /** + * Set the URI factory to use for creating URIs. + */ + public function setUriFactory(?UriFactoryInterface $uriFactory): self + { + $this->uriFactory = $uriFactory; + return $this; + } + +} diff --git a/src/OpenSearch/RequestFactoryInterface.php b/src/OpenSearch/RequestFactoryInterface.php new file mode 100644 index 000000000..291132be2 --- /dev/null +++ b/src/OpenSearch/RequestFactoryInterface.php @@ -0,0 +1,19 @@ +requestFactory->createRequest($method, $uri, $params, $body); + } /** - * Transport class is responsible for dispatching requests to the - * underlying cluster connections - * - * @param int $retries - * @param bool $sniffOnStart - * @param ConnectionPool\AbstractConnectionPool $connectionPool - * @param \Psr\Log\LoggerInterface $log Monolog logger object + * {@inheritdoc} */ - public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false) + public function sendAsyncRequest(RequestInterface $request): Promise { - $this->log = $log; - $this->connectionPool = $connectionPool; - $this->retries = $retries; - - if ($sniffOnStart === true) { - $this->log->notice('Sniff on Start.'); - $this->connectionPool->scheduleCheck(); - } + $httpAsyncClient = $this->getAsyncClient(); + return $httpAsyncClient->sendAsyncRequest($request); } /** - * Returns a single connection from the connection pool - * Potentially performs a sniffing step before returning + * {@inheritdoc} */ - public function getConnection(): ConnectionInterface + public function sendRequest(RequestInterface $request): ResponseInterface { - return $this->connectionPool->nextConnection(); + return $this->client->sendRequest($request); } /** - * Perform a request to the Cluster - * - * @param string $method HTTP method to use - * @param string $uri HTTP URI to send request to - * @param array $params Optional query parameters - * @param mixed|null $body Optional query body - * @param array $options - * - * @throws Common\Exceptions\NoNodesAvailableException|\Exception + * Set the async client to use for async requests. */ - public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface + public function setAsyncClient(HttpAsyncClient $asyncClient): self { - try { - $connection = $this->getConnection(); - } catch (Exceptions\NoNodesAvailableException $exception) { - $this->log->critical('No alive nodes found in cluster'); - throw $exception; - } - - $response = []; - $caughtException = null; - $this->lastConnection = $connection; - - $future = $connection->performRequest( - $method, - $uri, - $params, - $body, - $options, - $this - ); - - $future->promise()->then( - //onSuccess - function ($response) { - $this->retryAttempts = 0; - // Note, this could be a 4xx or 5xx error - }, - //onFailure - function ($response) { - $code = $response->getCode(); - // Ignore 400 level errors, as that means the server responded just fine - if ($code < 400 || $code >= 500) { - // Otherwise schedule a check - $this->connectionPool->scheduleCheck(); - } - } - ); - - return $future; + $this->asyncClient = $asyncClient; + return $this; } /** - * @param FutureArrayInterface $result Response of a request (promise) - * @param array $options Options for transport + * Get the async client to use for async requests. + * + * If no async client is set, the discovery mechanism will be used to find + * an async client. * - * @return callable|array + * @throws NoAsyncClientException */ - public function resultOrFuture(FutureArrayInterface $result, array $options = []) + private function getAsyncClient(): HttpAsyncClient { - $response = null; - $async = isset($options['client']['future']) ? $options['client']['future'] : null; - if (is_null($async) || $async === false) { - do { - $result = $result->wait(); - } while ($result instanceof FutureArrayInterface); + if ($this->asyncClient) { + return $this->asyncClient; } - return $result; - } - - public function shouldRetry(array $request): bool - { - if ($this->retryAttempts < $this->retries) { - $this->retryAttempts += 1; - return true; + if ($this->client instanceof HttpAsyncClient) { + return $this->asyncClient = $this->client; } - return false; + try { + return $this->asyncClient = HttpAsyncClientDiscovery::find(); + } catch (\Exception $e) { + throw new NoAsyncClientException('No async HTTP client found. Install a package providing "php-http/async-client-implementation"', 0, $e); + } } - /** - * Returns the last used connection so that it may be inspected. Mainly - * for debugging/testing purposes. - */ - public function getLastConnection(): ConnectionInterface - { - return $this->lastConnection; - } } diff --git a/src/OpenSearch/TransportInterface.php b/src/OpenSearch/TransportInterface.php new file mode 100644 index 000000000..d017ac0a3 --- /dev/null +++ b/src/OpenSearch/TransportInterface.php @@ -0,0 +1,21 @@ + [ - 'verbose' => true - ] - ]; - $client = ClientBuilder::create() - ->setConnectionParams($params) - ->setHosts([$url]) - ->includePortInHostHeader(true) - ->build(); + $transport = $this->createMock(TransportInterface::class); + $client = (new ClientBuilder($transport))->build(); $this->assertInstanceOf(Client::class, $client); - - try { - $result = $client->info(); - } catch (OpenSearchException $e) { - $request = $client->transport->getLastConnection()->getLastRequestInfo(); - $this->assertTrue(isset($request['request']['headers']['Host'][0])); - $this->assertEquals($url, $request['request']['headers']['Host'][0]); - } } /** diff --git a/tests/ClientTest.php b/tests/ClientTest.php index b580bae5c..ec02976a1 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -21,443 +21,110 @@ namespace OpenSearch\Tests; -use GuzzleHttp\Ring\Client\MockHandler; -use GuzzleHttp\Ring\Future\FutureArray; -use Mockery as m; -use OpenSearch; use OpenSearch\Client; -use OpenSearch\ClientBuilder; -use OpenSearch\Common\Exceptions\MaxRetriesException; +use OpenSearch\Common\Exceptions\RuntimeException; +use OpenSearch\EndpointFactoryInterface; +use OpenSearch\Endpoints\Delete; +use OpenSearch\TransportInterface; +use PHPUnit\Framework\MockObject\MockObject; +use PHPUnit\Framework\TestCase; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; /** * Class ClientTest * * @subpackage Tests + * @coversDefaultClass \OpenSearch\Client */ -class ClientTest extends \PHPUnit\Framework\TestCase +class ClientTest extends TestCase { - public function tearDown(): void - { - m::close(); - } + /** + * The client under test. + */ + private Client $client; - public function testConstructorIllegalPort() - { - $this->expectException(\OpenSearch\Common\Exceptions\InvalidArgumentException::class); - $this->expectExceptionMessage('Could not parse URI'); + private EndpointFactoryInterface|MockObject $endpointFactory; - $client = OpenSearch\ClientBuilder::create()->setHosts(['localhost:abc'])->build(); - } + private TransportInterface|MockObject $transport; - public function testFromConfig() + /** + * {@inheritdoc} + */ + public function setUp(): void { - $params = [ - 'hosts' => [ - 'localhost:9200' - ], - 'retries' => 2, - 'handler' => ClientBuilder::multiHandler() - ]; - $client = ClientBuilder::fromConfig($params); - - $this->assertInstanceOf(Client::class, $client); + parent::setUp(); + $this->transport = $this->createMock(TransportInterface::class); + $this->endpointFactory = $this->createMock(EndpointFactoryInterface::class); + $registeredNamespaces = []; + $this->client = new Client($this->transport, $this->endpointFactory, $registeredNamespaces); } - public function testFromConfigBadParam() + /** + * @covers ::__call + */ + public function testUnknownNamespace(): void { - $params = [ - 'hosts' => [ - 'localhost:9200' - ], - 'retries' => 2, - 'imNotReal' => 5 - ]; - - $this->expectException(\OpenSearch\Common\Exceptions\RuntimeException::class); - $this->expectExceptionMessage('Unknown parameters provided: imNotReal'); - - $client = ClientBuilder::fromConfig($params); + $this->expectException(\BadMethodCallException::class); + $this->client->foo(); } - public function testFromConfigBadParamQuiet() - { - $params = [ - 'hosts' => [ - 'localhost:9200' - ], - 'retries' => 2, - 'imNotReal' => 5 - ]; - $client = ClientBuilder::fromConfig($params, true); - - $this->assertInstanceOf(Client::class, $client); - } public function testIndexCannotBeNullForDelete() { - $client = ClientBuilder::create()->build(); + $this->endpointFactory->expects($this->once()) + ->method('getEndpoint') + ->with(Delete::class) + ->willReturn(new Delete()); - $this->expectException(OpenSearch\Common\Exceptions\RuntimeException::class); + $this->expectException(RuntimeException::class); $this->expectExceptionMessage('index is required for delete'); - $client->delete( + $this->client->delete( [ - 'index' => null, - 'id' => 'test' + 'index' => null, + 'id' => 'test' ] ); } public function testIdCannotBeNullForDelete() { - $client = ClientBuilder::create()->build(); + $this->endpointFactory->expects($this->once()) + ->method('getEndpoint') + ->with(Delete::class) + ->willReturn(new Delete()); - $this->expectException(OpenSearch\Common\Exceptions\RuntimeException::class); + $this->expectException(RuntimeException::class); $this->expectExceptionMessage('id is required for delete'); - $client->delete( + $this->client->delete( [ - 'index' => 'test', - 'id' => null + 'index' => 'test', + 'id' => null ] ); } - public function testMaxRetriesException() + /** + * @covers ::request + */ + public function testSendRawRequest(): void { - $client = OpenSearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - - $searchParams = [ - 'index' => 'test', - 'body' => [ - 'query' => [ - 'match_all' => [] - ] - ] - ]; - - $client = OpenSearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - - try { - $client->search($searchParams); - $this->fail("Should have thrown CouldNotConnectToHost"); - } catch (OpenSearch\Common\Exceptions\Curl\CouldNotConnectToHost $e) { - // All good - $previous = $e->getPrevious(); - $this->assertInstanceOf(MaxRetriesException::class, $previous); - } catch (\Exception $e) { - throw $e; - } - - - $client = OpenSearch\ClientBuilder::create() - ->setHosts(["localhost:1"]) - ->setRetries(0) - ->build(); - - try { - $client->search($searchParams); - $this->fail("Should have thrown TransportException"); - } catch (OpenSearch\Common\Exceptions\TransportException $e) { - // All good - $previous = $e->getPrevious(); - $this->assertInstanceOf(MaxRetriesException::class, $previous); - } catch (\Exception $e) { - throw $e; - } - } - - public function testInlineHosts() - { - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'localhost:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("localhost", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'http://localhost:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("localhost", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'http://foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'https://foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'https://user:pass@foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - $this->assertSame("user:pass", $host->getUserPass()); - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'https://user:pass@the_foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("the_foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - $this->assertSame("user:pass", $host->getUserPass()); - } - - public function testExtendedHosts() - { - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'localhost', - 'port' => 9200, - 'scheme' => 'http' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("localhost", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'port' => 9200, - 'scheme' => 'http' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'port' => 9200, - 'scheme' => 'https' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'scheme' => 'http' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); + $this->transport->expects($this->once()) + ->method('createRequest') + ->with('GET', '/', ['foo' => 'bar'], 'whizz') + ->willReturn($this->createMock(RequestInterface::class)); + $this->transport->expects($this->once()) + ->method('sendRequest') + ->with($this->isInstanceOf(RequestInterface::class)) + ->willReturn($this->createMock(ResponseInterface::class)); - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'port' => 9500, - 'scheme' => 'https' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9500, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - - - try { - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'port' => 9200, - 'scheme' => 'http' - ] - ] - )->build(); - $this->fail("Expected RuntimeException from missing host, none thrown"); - } catch (OpenSearch\Common\Exceptions\RuntimeException $e) { - // good - } - - // Underscore host, questionably legal - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'the_foo.com' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("the_foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - // Special characters in user/pass, would break inline - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'user' => 'user', - 'pass' => 'abc#$@?%!abc' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - $this->assertSame("user:abc#$@?%!abc", $host->getUserPass()); - } - - public function testClientLazy() - { - $handler = new MockHandler([ - 'status' => 200, - 'transfer_stats' => [ - 'total_time' => 100 - ], - 'body' => '{test}', - 'effective_url' => 'localhost' + $this->client->request('GET', '/', [ + 'params' => ['foo' => 'bar'], + 'body' => 'whizz', ]); - $builder = ClientBuilder::create(); - $builder->setHosts(['somehost']); - $builder->setHandler($handler); - $client = $builder->build(); - - $params = [ - 'client' => [ - 'future' => 'lazy', - ] - ]; - $result = $client->info($params); - $this->assertInstanceOf(FutureArray::class, $result); - } - - public function testExtractArgumentIterable() - { - $client = OpenSearch\ClientBuilder::create()->build(); - // array iterator can be casted to array back, so make more real with IteratorIterator - $body = new \IteratorIterator(new \ArrayIterator([1, 2, 3])); - $params = ['body' => $body]; - $argument = $client->extractArgument($params, 'body'); - $this->assertEquals($body, $argument); - $this->assertCount(0, $params); - $this->assertInstanceOf(\IteratorIterator::class, $argument); - } - - /** @test */ - public function sendRawRequest(): void - { - $transport = $this->createMock(OpenSearch\Transport::class); - $endpointFactory = $this->createMock(OpenSearch\EndpointFactoryInterface::class); - $client = new OpenSearch\Client($transport, $endpointFactory, []); - - $transport->expects($this->once())->method('performRequest')->with('GET', '/', [], null, []); - - $client->request('GET', '/'); } - /** @test */ - public function sendRawRequestWithBody(): void - { - $transport = $this->createMock(OpenSearch\Transport::class); - $endpointFactory = $this->createMock(OpenSearch\EndpointFactoryInterface::class); - $client = new OpenSearch\Client($transport, $endpointFactory, []); - $body = ['query' => ['match' => ['text_entry' => 'long live king']]]; - - $transport->expects($this->once())->method('performRequest')->with('GET', '/shakespeare/_search', [], $body, []); - - $client->request('GET', '/shakespeare/_search', compact('body')); - } - - /** @test */ - public function sendRawRequestWithParams(): void - { - $transport = $this->createMock(OpenSearch\Transport::class); - $endpointFactory = $this->createMock(OpenSearch\EndpointFactoryInterface::class); - $client = new OpenSearch\Client($transport, $endpointFactory, []); - $params = ['foo' => 'bar']; - - $transport->expects($this->once())->method('performRequest')->with('GET', '/_search', $params, null, []); - - $client->request('GET', '/_search', compact('params')); - } - - /** @test */ - public function sendRawRequestWithOptions(): void - { - $transport = $this->createMock(OpenSearch\Transport::class); - $endpointFactory = $this->createMock(OpenSearch\EndpointFactoryInterface::class); - $client = new OpenSearch\Client($transport, $endpointFactory, []); - $options = ['client' => ['future' => 'lazy']]; - - $transport->expects($this->once())->method('performRequest')->with('GET', '/', [], null, $options); - - $client->request('GET', '/', compact('options')); - } } diff --git a/tests/Endpoints/AbstractEndpointTest.php b/tests/Endpoints/AbstractEndpointTest.php index 7a3e7b872..fd5b7c207 100644 --- a/tests/Endpoints/AbstractEndpointTest.php +++ b/tests/Endpoints/AbstractEndpointTest.php @@ -22,6 +22,7 @@ namespace OpenSearch\Tests\Endpoints; use OpenSearch\Endpoints\AbstractEndpoint; +use OpenSearch\Endpoints\EndpointInterface; use PHPUnit\Framework\MockObject\MockObject; /** @@ -30,7 +31,7 @@ class AbstractEndpointTest extends \PHPUnit\Framework\TestCase { /** - * @var AbstractEndpoint&MockObject + * @var EndpointInterface&MockObject */ private $endpoint; diff --git a/tests/RegisteredNamespaceTest.php b/tests/RegisteredNamespaceTest.php index 117b550fb..daee0b756 100644 --- a/tests/RegisteredNamespaceTest.php +++ b/tests/RegisteredNamespaceTest.php @@ -68,7 +68,7 @@ public function getName(): string return "foo"; } - public function getObject(Transport $transport, SerializerInterface $serializer) + public function getObject(OpenSearch\TransportInterface $transport, SerializerInterface $serializer) { return new FooNamespace(); } diff --git a/tests/TransportTest.php b/tests/TransportTest.php index 96469261b..fb8e9ec8a 100644 --- a/tests/TransportTest.php +++ b/tests/TransportTest.php @@ -21,79 +21,86 @@ namespace OpenSearch\Tests; -use OpenSearch\Common\Exceptions\ServerErrorResponseException; -use OpenSearch\ConnectionPool\AbstractConnectionPool; -use OpenSearch\Connections\Connection; -use OpenSearch\Serializers\SerializerInterface; +use GuzzleHttp\Client as GuzzleClient; +use GuzzleHttp\Exception\RequestException; +use GuzzleHttp\Handler\MockHandler; +use GuzzleHttp\HandlerStack; +use GuzzleHttp\Psr7\Request; +use GuzzleHttp\Psr7\Response; +use Http\Adapter\Guzzle7\Client as GuzzleAdapter; +use Http\Promise\Promise; +use OpenSearch\RequestFactoryInterface; use OpenSearch\Transport; -use GuzzleHttp\Ring\Future\FutureArray; -use GuzzleHttp\Ring\Future\FutureArrayInterface; -use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use React\Promise\Deferred; +use Psr\Http\Client\RequestExceptionInterface; +use Psr\Http\Message\RequestInterface; +/** + * Class TransportTest + * + * @coversDefaultClass \OpenSearch\Transport + */ class TransportTest extends TestCase { /** - * @var Connection|MockObject + * The transport instance under test. */ - private $connection; - /** - * @var AbstractConnectionPool|MockObject - */ - private $connectionPool; - /** - * @var MockObject|LoggerInterface - */ - private $logger; + private Transport $transport; public function setUp(): void { - $this->logger = $this->createMock(LoggerInterface::class); - $this->connectionPool = $this->createMock(AbstractConnectionPool::class); - $this->connection = $this->createMock(Connection::class); - } - - public function testPerformRequestWithServerErrorResponseException404Result() - { - $deferred = new Deferred(); - $deferred->reject(new ServerErrorResponseException('foo', 404)); - $future = new FutureArray($deferred->promise()); - - $this->connection->method('performRequest') - ->willReturn($future); + parent::setUp(); - $this->connectionPool->method('nextConnection') - ->willReturn($this->connection); + $mockHandler = new MockHandler([ + new Response(200, ["content-type" => "text/javascript; charset=utf-8"], '{"foo": "bar"}'), + new RequestException('Error Communicating with Server', $this->createMock(RequestInterface::class)), + ]); - $this->connectionPool->expects($this->never()) - ->method('scheduleCheck'); + $handlerStack = HandlerStack::create($mockHandler); + $httpClient = new GuzzleAdapter(new GuzzleClient(['handler' => $handlerStack])); - $transport = new Transport(1, $this->connectionPool, $this->logger); + $requestFactory = $this->createMock(RequestFactoryInterface::class); + $requestFactory->method('createRequest')->willReturn($this->createMock(RequestInterface::class)); - $result = $transport->performRequest('GET', '/'); - $this->assertInstanceOf(FutureArrayInterface::class, $result); + $this->transport = new Transport($httpClient, $requestFactory); } - public function testPerformRequestWithServerErrorResponseException500Result() + /** + * @covers ::sendRequest + */ + public function testSendRequest(): void { - $deferred = new Deferred(); - $deferred->reject(new ServerErrorResponseException('foo', 500)); - $future = new FutureArray($deferred->promise()); - - $this->connection->method('performRequest') - ->willReturn($future); - - $this->connectionPool->method('nextConnection') - ->willReturn($this->connection); - - $this->connectionPool->expects($this->once()) - ->method('scheduleCheck'); - - $transport = new Transport(1, $this->connectionPool, $this->logger); + $request = new Request('GET', 'http://localhost:9200'); + $response = $this->transport->sendRequest($request); + $this->assertInstanceOf(Response::class, $response); + $this->assertEquals(200, $response->getStatusCode()); + $this->assertEquals('text/javascript; charset=utf-8', $response->getHeaderLine('content-type')); + $this->assertEquals('{"foo": "bar"}', $response->getBody()->getContents()); + + $this->expectException(RequestExceptionInterface::class); + $this->expectExceptionMessage('Error Communicating with Server'); + $this->transport->sendRequest($request); + } - $result = $transport->performRequest('GET', '/'); - $this->assertInstanceOf(FutureArrayInterface::class, $result); + /** + * @covers ::sendAsyncRequest + */ + public function testSendAsyncRequest(): void + { + $request = new Request('GET', 'http://localhost:9200'); + $promise = $this->transport->sendAsyncRequest($request); + $this->assertInstanceOf(Promise::class, $promise); + $response = $promise->wait(); + $this->assertInstanceOf(Response::class, $response); + $this->assertEquals(200, $response->getStatusCode()); + $this->assertEquals('text/javascript; charset=utf-8', $response->getHeaderLine('content-type')); + $this->assertEquals('{"foo": "bar"}', $response->getBody()->getContents()); + + $promise = $this->transport->sendAsyncRequest($request); + $this->assertInstanceOf(Promise::class, $promise); + $this->expectException(RequestExceptionInterface::class); + $this->expectExceptionMessage('Error Communicating with Server'); + $promise->wait(); } + } diff --git a/util/Endpoint.php b/util/Endpoint.php index c91ab10f5..4b7fc1899 100644 --- a/util/Endpoint.php +++ b/util/Endpoint.php @@ -544,15 +544,18 @@ private function extractParamsDescription(int $space): string if (in_array($param, $this->addedPartInDoc)) { continue; } + $type = $values['type'] ?? 'any'; + // var_dump($type); + // var_dump($values); $result .= sprintf( " * \$params['%s']%s = (%s) %s%s%s%s\n", $param, str_repeat(' ', $space - strlen($param)), - $values['type'] ?? 'any', + $type, $values['description'] ?? '', isset($values['required']) && $values['required'] ? ' (Required)' : '', isset($values['options']) ? sprintf(" (Options = %s)", implode(',', $values['options'])) : '', - isset($values['default']) ? sprintf(" (Default = %s)", $values['type'] === 'boolean' ? ($values['default'] ? 'true' : 'false') : (is_array($values['default']) ? implode(',', $values['default']) : $values['default'])) : '' + isset($values['default']) ? sprintf(" (Default = %s)", ($type === 'boolean') ? ($values['default'] ? 'true' : 'false') : (is_array($values['default']) ? implode(',', $values['default']) : $values['default'])) : '' ); } return $result; diff --git a/util/template/client-class b/util/template/client-class index 999710866..922b02512 100644 --- a/util/template/client-class +++ b/util/template/client-class @@ -21,13 +21,14 @@ declare(strict_types=1); namespace OpenSearch; +use Http\Promise\Promise; use OpenSearch\Common\Exceptions\BadMethodCallException; use OpenSearch\Common\Exceptions\NoNodesAvailableException; use OpenSearch\Endpoints\AbstractEndpoint; -use OpenSearch\Namespaces\NamespaceBuilderInterface; use OpenSearch\Namespaces\BooleanRequestWrapper; +use OpenSearch\Namespaces\NamespaceBuilderInterface; :use-namespaces -use OpenSearch\Traits\DeprecatedPropertyTrait; +use Psr\Http\Message\ResponseInterface; /** * Class Client @@ -63,9 +64,12 @@ class Client */ protected $registeredNamespaces = []; + protected bool $isAsync = false; + + :namespace_properties /** - * Client constructor + * Creates a new client instance. * * @param Transport $transport * @param callable|EndpointFactoryInterface $endpointFactory @@ -133,34 +137,57 @@ class Client } /** - * Sends a raw request to the cluster - * @return callable|array - * @throws NoNodesAvailableException + * Check if the client is running in async mode. + */ + public function isAsync(): bool + { + return $this->isAsync; + } + + /** + * Set the client to run in async mode. + */ + public function setAsync(bool $isAsync): static + { + $this->isAsync = $isAsync; + return $this; + } + + /** + * Sends a raw request to the cluster. + * + * @throws \Psr\Http\Client\ClientExceptionInterface|\Exception */ - public function request(string $method, string $uri, array $attributes = []) + public function sendRawRequest(string $method, string $uri, array $params = [], ?string $body = null): Promise|ResponseInterface { $params = $attributes['params'] ?? []; $body = $attributes['body'] ?? null; - $options = $attributes['options'] ?? []; - $promise = $this->transport->performRequest($method, $uri, $params, $body, $options); + $request = $this->transport->createRequest($method, $uri, $params, $body); - return $this->transport->resultOrFuture($promise, $options); + if ($this->isAsync()) { + return $this->transport->sendAsyncRequest($request); + } + return $this->transport->sendRequest($request); } /** - * @return callable|array + * Perform the request. + * + * @throws \Psr\Http\Client\ClientExceptionInterface|\Exception */ - private function performRequest(AbstractEndpoint $endpoint) + private function performRequest(EndpointInterface $endpoint): Promise|ResponseInterface { - $promise = $this->transport->performRequest( + $request = $this->transport->createRequest( $endpoint->getMethod(), $endpoint->getURI(), $endpoint->getParams(), $endpoint->getBody(), - $endpoint->getOptions() ); - - return $this->transport->resultOrFuture($promise, $endpoint->getOptions()); + if ($this->isAsync()) { + return $this->transport->sendAsyncRequest($request); + } + return $this->transport->sendRequest($request); } + } diff --git a/util/template/namespace-class b/util/template/namespace-class index 7ea1ddd82..6a1c0d238 100644 --- a/util/template/namespace-class +++ b/util/template/namespace-class @@ -4,8 +4,6 @@ declare(strict_types=1); namespace OpenSearch\Namespaces; -use OpenSearch\Namespaces\AbstractNamespace; - /** * Class :namespace * diff --git a/util/template/namespace-property b/util/template/namespace-property index 7b88dd7fe..96fce500b 100644 --- a/util/template/namespace-property +++ b/util/template/namespace-property @@ -1,5 +1 @@ - /** - * @var :namespaceNamespace - */ - protected $:var_namespace; - \ No newline at end of file + protected :namespaceNamespace $:var_namespace;