From faf6f5cb4cd3a2d845eea33c8bd9ce840a8d57dc Mon Sep 17 00:00:00 2001 From: Kim Pepper Date: Tue, 19 Nov 2024 15:30:18 +1100 Subject: [PATCH] Add a new transport approach and BC Signed-off-by: Kim Pepper --- composer.json | 12 +- src/OpenSearch/Aws/SigningRequestFactory.php | 43 + src/OpenSearch/Client.php | 41 +- src/OpenSearch/ClientBuilder.php | 677 +++++++++++++-- src/OpenSearch/ClientFactory.php | 19 + src/OpenSearch/Common/EmptyLogger.php | 4 + .../AuthenticationConfigException.php | 5 + .../Exceptions/BadMethodCallException.php | 4 + .../Exceptions/BadRequest400Exception.php | 5 + .../ClientErrorResponseException.php | 5 + .../Exceptions/Conflict409Exception.php | 5 + .../Exceptions/Curl/CouldNotConnectToHost.php | 5 + .../Curl/CouldNotResolveHostException.php | 5 + .../Curl/OperationTimeoutException.php | 5 + .../Exceptions/Forbidden403Exception.php | 5 + .../Exceptions/InvalidArgumentException.php | 5 + .../Common/Exceptions/MaxRetriesException.php | 5 + .../Common/Exceptions/Missing404Exception.php | 5 + .../Exceptions/NoDocumentsToGetException.php | 5 + .../Exceptions/RequestTimeout408Exception.php | 5 + .../Selectors/RandomSelector.php} | 22 +- .../Selectors/RoundRobinSelector.php | 51 ++ .../Selectors/StickyRoundRobinSelector.php | 62 ++ .../ConnectionPool/SimpleConnectionPool.php | 52 ++ .../ConnectionPool/SniffingConnectionPool.php | 186 ++++ .../ConnectionPool/StaticConnectionPool.php | 110 +++ .../StaticNoPingConnectionPool.php | 93 ++ src/OpenSearch/Connections/Connection.php | 794 ++++++++++++++++++ .../Connections/ConnectionFactory.php | 86 ++ src/OpenSearch/Handlers/SigV4Handler.php | 155 ++++ src/OpenSearch/HttpTransport.php | 44 + src/OpenSearch/LegacyTransportWrapper.php | 34 + .../Namespaces/AbstractNamespace.php | 26 +- src/OpenSearch/RequestFactory.php | 123 +-- src/OpenSearch/Transport.php | 173 ++-- src/OpenSearch/TransportFactory.php | 131 +++ src/OpenSearch/TransportInterface.php | 18 +- util/template/client-class | 45 +- 38 files changed, 2781 insertions(+), 289 deletions(-) create mode 100644 src/OpenSearch/Aws/SigningRequestFactory.php create mode 100644 src/OpenSearch/ClientFactory.php rename src/OpenSearch/{Common/Exceptions/NoAsyncClientException.php => ConnectionPool/Selectors/RandomSelector.php} (52%) create mode 100644 src/OpenSearch/ConnectionPool/Selectors/RoundRobinSelector.php create mode 100644 src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php create mode 100644 src/OpenSearch/ConnectionPool/SimpleConnectionPool.php create mode 100644 src/OpenSearch/ConnectionPool/SniffingConnectionPool.php create mode 100644 src/OpenSearch/ConnectionPool/StaticConnectionPool.php create mode 100644 src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php create mode 100644 src/OpenSearch/Connections/Connection.php create mode 100644 src/OpenSearch/Connections/ConnectionFactory.php create mode 100644 src/OpenSearch/Handlers/SigV4Handler.php create mode 100644 src/OpenSearch/HttpTransport.php create mode 100644 src/OpenSearch/LegacyTransportWrapper.php create mode 100644 src/OpenSearch/TransportFactory.php diff --git a/composer.json b/composer.json index 1d06fc9d..91425979 100644 --- a/composer.json +++ b/composer.json @@ -23,16 +23,16 @@ "require": { "php": "^8.0", "ext-json": ">=1.3.7", - "php-http/async-client-implementation": "^1.0", + "ext-curl": "*", + "ezimuel/ringphp": "^1.2.2", "php-http/discovery": "^1.20", - "php-http/guzzle7-adapter": "^1.0", "psr/http-client": "^1.0", - "psr/http-client-implementation": "^1.0", + "psr/http-client-implementation": "*", "psr/http-factory": "^1.1", - "psr/http-factory-implementation": "^2.4", + "psr/http-factory-implementation": "*", "psr/http-message": "^2.0", - "psr/http-message-implementation": "^1.0", - "psr/log": "^1|^2|^3", + "psr/http-message-implementation": "*", + "psr/log": "^2|^3", "symfony/yaml": "*" }, "require-dev": { diff --git a/src/OpenSearch/Aws/SigningRequestFactory.php b/src/OpenSearch/Aws/SigningRequestFactory.php new file mode 100644 index 00000000..0f48f7dc --- /dev/null +++ b/src/OpenSearch/Aws/SigningRequestFactory.php @@ -0,0 +1,43 @@ +requestFactory->createRequest($method, $uri, $params, $body, $headers); + $request = $request->withHeader('x-amz-content-sha256', hash('sha256', (string) $request->getBody())); + return $this->signer->signRequest($request, $this->credentials); + } + +} diff --git a/src/OpenSearch/Client.php b/src/OpenSearch/Client.php index 43d5aa57..2a63bccc 100644 --- a/src/OpenSearch/Client.php +++ b/src/OpenSearch/Client.php @@ -73,6 +73,8 @@ class Client */ public $transport; + private TransportInterface $httpTransport; + /** * @var array */ @@ -251,13 +253,19 @@ class Client /** * Client constructor * - * @param Transport $transport + * @param \OpenSearch\TransportInterface|\OpenSearch\Transport $transport * @param callable|EndpointFactoryInterface $endpointFactory * @param NamespaceBuilderInterface[] $registeredNamespaces */ - public function __construct(Transport $transport, callable|EndpointFactoryInterface $endpointFactory, array $registeredNamespaces) + public function __construct(TransportInterface|Transport $transport, callable|EndpointFactoryInterface $endpointFactory, array $registeredNamespaces) { - $this->transport = $transport; + if (!$transport instanceof TransportInterface) { + @trigger_error('Passing an instance of \OpenSearch\Transport to ' . __METHOD__ . '() is deprecated in 2.3.2 and will be removed in 3.0.0. Pass an instance of \OpenSearch\TransportInterface instead.', E_USER_DEPRECATED); + $this->transport = $transport; + $this->httpTransport = new LegacyTransportWrapper($transport); + } else { + $this->httpTransport = $transport; + } if (is_callable($endpointFactory)) { @trigger_error('Passing a callable as the $endpointFactory param in ' . __METHOD__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0. Pass an instance of \OpenSearch\EndpointFactoryInterface instead.', E_USER_DEPRECATED); $endpoints = $endpointFactory; @@ -2032,33 +2040,36 @@ public function extractArgument(array &$params, string $arg) /** * Sends a raw request to the cluster - * @return callable|array - * @throws NoNodesAvailableException + * @return array|string|null + * @throws \Exception */ - public function request(string $method, string $uri, array $attributes = []) + public function request(string $method, string $uri, array $attributes = []): array|string|null { $params = $attributes['params'] ?? []; $body = $attributes['body'] ?? null; $options = $attributes['options'] ?? []; - $promise = $this->transport->performRequest($method, $uri, $params, $body, $options); - - return $this->transport->resultOrFuture($promise, $options); + return $this->httpTransport->sendRequest($method, $uri, $params, $body, $options['headers'] ?? []); } /** - * @return callable|array + * Sends a request for the given endpoint. + * + * @param \OpenSearch\Endpoints\AbstractEndpoint $endpoint + * + * @return array|string|null + * + * @throws \Exception */ - private function performRequest(AbstractEndpoint $endpoint) + private function performRequest(AbstractEndpoint $endpoint): array|string|null { - $promise = $this->transport->performRequest( + $options = $endpoint->getOptions(); + return $this->httpTransport->sendRequest( $endpoint->getMethod(), $endpoint->getURI(), $endpoint->getParams(), $endpoint->getBody(), - $endpoint->getOptions() + $options['headers'] ?? [] ); - - return $this->transport->resultOrFuture($promise, $endpoint->getOptions()); } } diff --git a/src/OpenSearch/ClientBuilder.php b/src/OpenSearch/ClientBuilder.php index c1c9b9b7..1349e2c4 100644 --- a/src/OpenSearch/ClientBuilder.php +++ b/src/OpenSearch/ClientBuilder.php @@ -21,57 +21,167 @@ namespace OpenSearch; +use Aws\Credentials\CredentialProvider; +use Aws\Credentials\Credentials; +use Aws\Credentials\CredentialsInterface; +use GuzzleHttp\Ring\Client\CurlHandler; +use GuzzleHttp\Ring\Client\CurlMultiHandler; +use GuzzleHttp\Ring\Client\Middleware; +use OpenSearch\Common\Exceptions\AuthenticationConfigException; +use OpenSearch\Common\Exceptions\InvalidArgumentException; +use OpenSearch\Common\Exceptions\RuntimeException; use OpenSearch\ConnectionPool\AbstractConnectionPool; +use OpenSearch\ConnectionPool\Selectors\RoundRobinSelector; +use OpenSearch\ConnectionPool\Selectors\SelectorInterface; +use OpenSearch\ConnectionPool\StaticNoPingConnectionPool; +use OpenSearch\Connections\ConnectionFactory; use OpenSearch\Connections\ConnectionFactoryInterface; +use OpenSearch\Connections\ConnectionInterface; +use OpenSearch\Handlers\SigV4Handler; use OpenSearch\Namespaces\NamespaceBuilderInterface; use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Serializers\SmartSerializer; use Psr\Log\LoggerInterface; +use Psr\Log\NullLogger; +use ReflectionClass; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class ClientBuilder { + public const ALLOWED_METHODS_FROM_CONFIG = ['includePortInHostHeader']; + /** - * The serializer. + * @var Transport|null */ - private ?SerializerInterface $serializer = null; + private $transport; + + private ?EndpointFactoryInterface $endpointFactory = null; /** - * The endpoint factory. + * @var NamespaceBuilderInterface[] */ - private ?EndpointFactoryInterface $endpointFactory = null; + private $registeredNamespacesBuilders = []; /** - * The transport. + * @var ConnectionFactoryInterface|null */ - private ?TransportInterface $transport = null; + private $connectionFactory; /** - * @var NamespaceBuilderInterface[] + * @var callable|null */ - private array $registeredNamespacesBuilders = []; + private $handler; - public function __construct(TransportInterface $transport) - { - $this->transport = $transport; - } + /** + * @var LoggerInterface|null + */ + private $logger; + + /** + * @var LoggerInterface|null + */ + private $tracer; + + /** + * @var string|AbstractConnectionPool + */ + private $connectionPool = StaticNoPingConnectionPool::class; + + /** + * @var string|SerializerInterface|null + */ + private $serializer = SmartSerializer::class; + + /** + * @var string|SelectorInterface|null + */ + private $selector = RoundRobinSelector::class; + + /** + * @var array + */ + private $connectionPoolArgs = [ + 'randomizeHosts' => true + ]; + + /** + * @var array|null + */ + private $hosts; + + /** + * @var array + */ + private $connectionParams; + + /** + * @var int|null + */ + private $retries; + + /** + * @var null|callable + */ + private $sigV4CredentialProvider; + + /** + * @var null|string + */ + private $sigV4Region; + + /** + * @var null|string + */ + private $sigV4Service; + + /** + * @var bool + */ + private $sniffOnStart = false; + + /** + * @var null|array + */ + private $sslCert; + + /** + * @var null|array + */ + private $sslKey; + + /** + * @var null|bool|string + */ + private $sslVerification; + + /** + * @var bool + */ + private $includePortInHostHeader = false; + + /** + * @var string|null + */ + private $basicAuthentication = null; /** * Create an instance of ClientBuilder - * - * @deprecated in 2.3.1 and will be removed in 3.0.0. */ - public static function create(): ?ClientBuilder + public static function create(): ClientBuilder { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); - return null; + return new self(); } /** - * Gets the serializer instance. If not set, it will create a new instance of SmartSerializer. + * Can supply first param to Client::__construct() when invoking manually or with dependency injection */ - private function getSerializer(): SerializerInterface + public function getTransport(): Transport { - return $this->serializer ?? $this->serializer = new SmartSerializer(); + return $this->transport; } /** @@ -109,13 +219,28 @@ public function getRegisteredNamespacesBuilders(): array * @param bool $quiet False if unknown settings throw exception, true to silently * ignore unknown settings * @throws Common\Exceptions\RuntimeException - * - * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public static function fromConfig(array $config, bool $quiet = false): Client { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); $builder = new self(); + foreach ($config as $key => $value) { + $method = in_array($key, self::ALLOWED_METHODS_FROM_CONFIG, true) ? $key : "set$key"; + $reflection = new ReflectionClass($builder); + if ($reflection->hasMethod($method)) { + $func = $reflection->getMethod($method); + if ($func->getNumberOfParameters() > 1) { + $builder->$method(...$value); + } else { + $builder->$method($value); + } + unset($config[$key]); + } + } + + if ($quiet === false && count($config) > 0) { + $unknown = implode(array_keys($config)); + throw new RuntimeException("Unknown parameters provided: $unknown"); + } return $builder->build(); } @@ -124,13 +249,24 @@ public static function fromConfig(array $config, bool $quiet = false): Client * * @param array $multiParams * @param array $singleParams - * - * @deprecated in 2.3.1 and will be removed in 3.0.0. + * @throws \RuntimeException */ public static function defaultHandler(array $multiParams = [], array $singleParams = []): callable { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); - return fn() => @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + $future = null; + if (extension_loaded('curl')) { + $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); + if (function_exists('curl_reset')) { + $default = new CurlHandler($singleParams); + $future = new CurlMultiHandler($config); + } else { + $default = new CurlMultiHandler($config); + } + } else { + throw new \RuntimeException('OpenSearch-PHP requires cURL, or a custom HTTP handler.'); + } + + return $future ? Middleware::wrapFuture($default, $future) : $default; } /** @@ -138,10 +274,13 @@ public static function defaultHandler(array $multiParams = [], array $singlePara * * @throws \RuntimeException */ - public static function multiHandler(array $params = []): ?CurlMultiHandler + public static function multiHandler(array $params = []): CurlMultiHandler { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); - return null; + if (function_exists('curl_multi_init')) { + return new CurlMultiHandler(array_merge([ 'mh' => curl_multi_init() ], $params)); + } + + throw new \RuntimeException('CurlMulti handler requires cURL.'); } /** @@ -149,10 +288,13 @@ public static function multiHandler(array $params = []): ?CurlMultiHandler * * @throws \RuntimeException */ - public static function singleHandler(): ?CurlHandler + public static function singleHandler(): CurlHandler { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); - return null; + if (function_exists('curl_reset')) { + return new CurlHandler(); + } + + throw new \RuntimeException('CurlSingle handler requires cURL.'); } /** @@ -162,7 +304,8 @@ public static function singleHandler(): ?CurlHandler */ public function setConnectionFactory(ConnectionFactoryInterface $connectionFactory): ClientBuilder { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + $this->connectionFactory = $connectionFactory; + return $this; } @@ -175,7 +318,15 @@ public function setConnectionFactory(ConnectionFactoryInterface $connectionFacto */ public function setConnectionPool($connectionPool, array $args = []): ClientBuilder { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + if (is_string($connectionPool)) { + $this->connectionPool = $connectionPool; + $this->connectionPoolArgs = $args; + } elseif (is_object($connectionPool)) { + $this->connectionPool = $connectionPool; + } else { + throw new InvalidArgumentException("Serializer must be a class path or instantiated object extending AbstractConnectionPool"); + } + return $this; } @@ -216,32 +367,22 @@ public function registerNamespace(NamespaceBuilderInterface $namespaceBuilder): * Set the transport * * @param Transport $transport - * - * @deprecated in 2.3.1 and will be removed in 3.0.0. */ - public function setTransport(TransportInterface $transport): ClientBuilder + public function setTransport(Transport $transport): ClientBuilder { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); $this->transport = $transport; return $this; } - public function getTransport(): TransportInterface - { - return $this->transport; - } - /** * Set the HTTP handler (cURL is default) * * @param mixed $handler - * - * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public function setHandler($handler): ClientBuilder { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + $this->handler = $handler; return $this; } @@ -250,12 +391,10 @@ public function setHandler($handler): ClientBuilder * Set the PSR-3 Logger * * @param LoggerInterface $logger - * - * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public function setLogger(LoggerInterface $logger): ClientBuilder { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + $this->logger = $logger; return $this; } @@ -264,32 +403,190 @@ public function setLogger(LoggerInterface $logger): ClientBuilder * Set the PSR-3 tracer * * @param LoggerInterface $tracer - * - * @deprecated in 2.3.1 and will be removed in 3.0.0. */ public function setTracer(LoggerInterface $tracer): ClientBuilder { - @trigger_error(__METHOD__ . '() is deprecated in 2.3.1 and will be removed in 3.0.0.', E_USER_DEPRECATED); + $this->tracer = $tracer; return $this; } /** * Set the serializer + * + * @param \OpenSearch\Serializers\SerializerInterface|string $serializer + */ + public function setSerializer($serializer): ClientBuilder + { + $this->parseStringOrObject($serializer, $this->serializer, 'SerializerInterface'); + + return $this; + } + + /** + * Set the hosts (nodes) + * + * @param array $hosts + */ + public function setHosts(array $hosts): ClientBuilder + { + $this->hosts = $hosts; + + return $this; + } + + /** + * Set Basic access authentication + * + * @see https://en.wikipedia.org/wiki/Basic_access_authentication + * @param string $username + * @param string $password + * + * @throws AuthenticationConfigException + */ + public function setBasicAuthentication(string $username, string $password): ClientBuilder + { + $this->basicAuthentication = $username.':'.$password; + + return $this; + } + + /** + * Set connection parameters + * + * @param array $params + */ + public function setConnectionParams(array $params): ClientBuilder + { + $this->connectionParams = $params; + + return $this; + } + + /** + * Set number or retries (default is equal to number of nodes) + * + * @param int $retries + */ + public function setRetries(int $retries): ClientBuilder + { + $this->retries = $retries; + + return $this; + } + + /** + * Set the selector algorithm + * + * @param \OpenSearch\ConnectionPool\Selectors\SelectorInterface|string $selector */ - public function setSerializer(SerializerInterface $serializer): static + public function setSelector($selector): ClientBuilder { - $this->serializer = $serializer; + $this->parseStringOrObject($selector, $this->selector, 'SelectorInterface'); return $this; } - private function getEndpointFactory(): EndpointFactoryInterface + /** + * Set the credential provider for SigV4 request signing. The value provider should be a + * callable object that will return + * + * @param callable|bool|array|CredentialsInterface|null $credentialProvider + */ + public function setSigV4CredentialProvider($credentialProvider): ClientBuilder { - if ($this->endpointFactory) { - return $this->endpointFactory; + if ($credentialProvider !== null && $credentialProvider !== false) { + $this->sigV4CredentialProvider = $this->normalizeCredentialProvider($credentialProvider); } - return $this->endpointFactory = new EndpointFactory($this->getSerializer()); + + return $this; + } + + /** + * Set the region for SigV4 signing. + * + * @param string|null $region + */ + public function setSigV4Region($region): ClientBuilder + { + $this->sigV4Region = $region; + + return $this; + } + + /** + * Set the service for SigV4 signing. + * + * @param string|null $service + */ + public function setSigV4Service($service): ClientBuilder + { + $this->sigV4Service = $service; + + return $this; + } + + /** + * Set sniff on start + * + * @param bool $sniffOnStart enable or disable sniff on start + */ + + public function setSniffOnStart(bool $sniffOnStart): ClientBuilder + { + $this->sniffOnStart = $sniffOnStart; + + return $this; + } + + /** + * Set SSL certificate + * + * @param string $cert The name of a file containing a PEM formatted certificate. + * @param string $password if the certificate requires a password + */ + public function setSSLCert(string $cert, ?string $password = null): ClientBuilder + { + $this->sslCert = [$cert, $password]; + + return $this; + } + + /** + * Set SSL key + * + * @param string $key The name of a file containing a private SSL key + * @param string $password if the private key requires a password + */ + public function setSSLKey(string $key, ?string $password = null): ClientBuilder + { + $this->sslKey = [$key, $password]; + + return $this; + } + + /** + * Set SSL verification + * + * @param bool|string $value + */ + public function setSSLVerification($value = true): ClientBuilder + { + $this->sslVerification = $value; + + return $this; + } + + /** + * Include the port in Host header + * + * @see https://github.com/elastic/elasticsearch-php/issues/993 + */ + public function includePortInHostHeader(bool $enable): ClientBuilder + { + $this->includePortInHostHeader = $enable; + + return $this; } /** @@ -297,7 +594,267 @@ private function getEndpointFactory(): EndpointFactoryInterface */ public function build(): Client { - return new Client($this->getTransport(), $this->getEndpointFactory(), $this->getRegisteredNamespacesBuilders()); + $this->buildLoggers(); + + if (is_null($this->handler)) { + $this->handler = ClientBuilder::defaultHandler(); + } + + if (!is_null($this->sigV4CredentialProvider)) { + if (is_null($this->sigV4Region)) { + throw new RuntimeException("A region must be supplied for SigV4 request signing."); + } + + if (is_null($this->sigV4Service)) { + $this->setSigV4Service("es"); + } + + $this->handler = new SigV4Handler($this->sigV4Region, $this->sigV4Service, $this->sigV4CredentialProvider, $this->handler); + } + + $sslOptions = null; + if (isset($this->sslKey)) { + $sslOptions['ssl_key'] = $this->sslKey; + } + if (isset($this->sslCert)) { + $sslOptions['cert'] = $this->sslCert; + } + if (isset($this->sslVerification)) { + $sslOptions['verify'] = $this->sslVerification; + } + + if (!is_null($sslOptions)) { + $sslHandler = function (callable $handler, array $sslOptions) { + return function (array $request) use ($handler, $sslOptions) { + // Add our custom headers + foreach ($sslOptions as $key => $value) { + $request['client'][$key] = $value; + } + + // Send the request using the handler and return the response. + return $handler($request); + }; + }; + $this->handler = $sslHandler($this->handler, $sslOptions); + } + + if (is_null($this->serializer)) { + $this->serializer = new SmartSerializer(); + } elseif (is_string($this->serializer)) { + $this->serializer = new $this->serializer(); + } + + $this->connectionParams['client']['port_in_header'] = $this->includePortInHostHeader; + + if (! is_null($this->basicAuthentication)) { + if (isset($this->connectionParams['client']['curl']) === false) { + $this->connectionParams['client']['curl'] = []; + } + + $this->connectionParams['client']['curl'] += [ + CURLOPT_HTTPAUTH => CURLAUTH_BASIC, + CURLOPT_USERPWD => $this->basicAuthentication + ]; + } + + if (is_null($this->connectionFactory)) { + // Make sure we are setting Content-Type and Accept (unless the user has explicitly + // overridden it + if (! isset($this->connectionParams['client']['headers'])) { + $this->connectionParams['client']['headers'] = []; + } + if (! isset($this->connectionParams['client']['headers']['Content-Type'])) { + $this->connectionParams['client']['headers']['Content-Type'] = ['application/json']; + } + if (! isset($this->connectionParams['client']['headers']['Accept'])) { + $this->connectionParams['client']['headers']['Accept'] = ['application/json']; + } + + $this->connectionFactory = new ConnectionFactory($this->handler, $this->connectionParams, $this->serializer, $this->logger, $this->tracer); + } + + if (is_null($this->hosts)) { + $this->hosts = $this->getDefaultHost(); + } + + if (is_null($this->selector)) { + $this->selector = new RoundRobinSelector(); + } elseif (is_string($this->selector)) { + $this->selector = new $this->selector(); + } + + $this->buildTransport(); + + if (is_null($this->endpointFactory)) { + $this->endpointFactory = new EndpointFactory($this->serializer); + } + + $registeredNamespaces = []; + foreach ($this->registeredNamespacesBuilders as $builder) { + /** + * @var NamespaceBuilderInterface $builder + */ + $registeredNamespaces[$builder->getName()] = $builder->getObject($this->transport, $this->serializer); + } + + return $this->instantiate($this->transport, $this->endpointFactory, $registeredNamespaces); + } + + protected function instantiate(Transport $transport, EndpointFactoryInterface $endpointFactory, array $registeredNamespaces): Client + { + return new Client($transport, $endpointFactory, $registeredNamespaces); + } + + private function buildLoggers(): void + { + if (is_null($this->logger)) { + $this->logger = new NullLogger(); + } + + if (is_null($this->tracer)) { + $this->tracer = new NullLogger(); + } + } + + private function buildTransport(): void + { + $connections = $this->buildConnectionsFromHosts($this->hosts); + + if (is_string($this->connectionPool)) { + $this->connectionPool = new $this->connectionPool( + $connections, + $this->selector, + $this->connectionFactory, + $this->connectionPoolArgs + ); + } + + if (is_null($this->retries)) { + $this->retries = count($connections); + } + + if (is_null($this->transport)) { + $this->transport = new Transport($this->retries, $this->connectionPool, $this->logger, $this->sniffOnStart); + } + } + + private function parseStringOrObject($arg, &$destination, $interface): void + { + if (is_string($arg)) { + $destination = new $arg(); + } elseif (is_object($arg)) { + $destination = $arg; + } else { + throw new InvalidArgumentException("Serializer must be a class path or instantiated object implementing $interface"); + } + } + + private function getDefaultHost(): array + { + return ['localhost:9200']; + } + + /** + * @return ConnectionInterface[] + * @throws RuntimeException + */ + private function buildConnectionsFromHosts(array $hosts): array + { + $connections = []; + foreach ($hosts as $host) { + if (is_string($host)) { + $host = $this->prependMissingScheme($host); + $host = $this->extractURIParts($host); + } elseif (is_array($host)) { + $host = $this->normalizeExtendedHost($host); + } else { + $this->logger->error("Could not parse host: ".print_r($host, true)); + throw new RuntimeException("Could not parse host: ".print_r($host, true)); + } + + $connections[] = $this->connectionFactory->create($host); + } + + return $connections; + } + + /** + * @throws RuntimeException + */ + private function normalizeExtendedHost(array $host): array + { + if (isset($host['host']) === false) { + $this->logger->error("Required 'host' was not defined in extended format: ".print_r($host, true)); + throw new RuntimeException("Required 'host' was not defined in extended format: ".print_r($host, true)); + } + + if (isset($host['scheme']) === false) { + $host['scheme'] = 'http'; + } + if (isset($host['port']) === false) { + $host['port'] = 9200; + } + return $host; + } + + /** + * @throws InvalidArgumentException + */ + private function extractURIParts(string $host): array + { + $parts = parse_url($host); + + if ($parts === false) { + throw new InvalidArgumentException(sprintf('Could not parse URI: "%s"', $host)); + } + + if (isset($parts['port']) !== true) { + $parts['port'] = 9200; + } + + return $parts; } + private function prependMissingScheme(string $host): string + { + if (!preg_match("/^https?:\/\//", $host)) { + $host = 'http://' . $host; + } + + return $host; + } + + private function normalizeCredentialProvider($provider): ?callable + { + if ($provider === null || $provider === false) { + return null; + } + + if (is_callable($provider)) { + return $provider; + } + + SigV4Handler::assertDependenciesInstalled(); + + if ($provider === true) { + return CredentialProvider::defaultProvider(); + } + + if ($provider instanceof CredentialsInterface) { + return CredentialProvider::fromCredentials($provider); + } elseif (is_array($provider) && isset($provider['key']) && isset($provider['secret'])) { + return CredentialProvider::fromCredentials( + new Credentials( + $provider['key'], + $provider['secret'], + isset($provider['token']) ? $provider['token'] : null, + isset($provider['expires']) ? $provider['expires'] : null + ) + ); + } + + throw new InvalidArgumentException('Credentials must be an instance of Aws\Credentials\CredentialsInterface, an' + . ' associative array that contains "key", "secret", and an optional "token" key-value pairs, a credentials' + . ' provider function, or true.'); + } } diff --git a/src/OpenSearch/ClientFactory.php b/src/OpenSearch/ClientFactory.php new file mode 100644 index 00000000..f94f2605 --- /dev/null +++ b/src/OpenSearch/ClientFactory.php @@ -0,0 +1,19 @@ +transport, $this->endpointFactory, $this->registeredNamespaces); + } + +} diff --git a/src/OpenSearch/Common/EmptyLogger.php b/src/OpenSearch/Common/EmptyLogger.php index c2bfeaf4..c3a44c18 100644 --- a/src/OpenSearch/Common/EmptyLogger.php +++ b/src/OpenSearch/Common/EmptyLogger.php @@ -24,11 +24,15 @@ use Psr\Log\AbstractLogger; use Psr\Log\LoggerInterface; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0. Use Psr\Log\NullLogger instead', E_USER_DEPRECATED); + /** * Class EmptyLogger * * Logger that doesn't do anything. Similar to Monolog's NullHandler, * but avoids the overhead of partially loading Monolog + * + * @deprecated in 2.3.2 and will be removed in 3.0.0. Use Psr\Log\NullLogger instead. */ class EmptyLogger extends AbstractLogger implements LoggerInterface { diff --git a/src/OpenSearch/Common/Exceptions/AuthenticationConfigException.php b/src/OpenSearch/Common/Exceptions/AuthenticationConfigException.php index 7b070a1a..a75cecc0 100644 --- a/src/OpenSearch/Common/Exceptions/AuthenticationConfigException.php +++ b/src/OpenSearch/Common/Exceptions/AuthenticationConfigException.php @@ -22,6 +22,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class AuthenticationConfigException extends \RuntimeException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/BadMethodCallException.php b/src/OpenSearch/Common/Exceptions/BadMethodCallException.php index 62b6551d..a562f01d 100644 --- a/src/OpenSearch/Common/Exceptions/BadMethodCallException.php +++ b/src/OpenSearch/Common/Exceptions/BadMethodCallException.php @@ -21,10 +21,14 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + /** * BadMethodCallException * * Denote problems with a method call (e.g. incorrect number of arguments) + * + * @deprecated in 2.3.2 and will be removed in 3.0.0. */ class BadMethodCallException extends \BadMethodCallException implements OpenSearchException { diff --git a/src/OpenSearch/Common/Exceptions/BadRequest400Exception.php b/src/OpenSearch/Common/Exceptions/BadRequest400Exception.php index 11480bfd..4ea5798d 100644 --- a/src/OpenSearch/Common/Exceptions/BadRequest400Exception.php +++ b/src/OpenSearch/Common/Exceptions/BadRequest400Exception.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class BadRequest400Exception extends \Exception implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/ClientErrorResponseException.php b/src/OpenSearch/Common/Exceptions/ClientErrorResponseException.php index 3982d81f..a2a56caa 100644 --- a/src/OpenSearch/Common/Exceptions/ClientErrorResponseException.php +++ b/src/OpenSearch/Common/Exceptions/ClientErrorResponseException.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class ClientErrorResponseException extends TransportException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/Conflict409Exception.php b/src/OpenSearch/Common/Exceptions/Conflict409Exception.php index 215b8035..fe7e6a6d 100644 --- a/src/OpenSearch/Common/Exceptions/Conflict409Exception.php +++ b/src/OpenSearch/Common/Exceptions/Conflict409Exception.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class Conflict409Exception extends \Exception implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/Curl/CouldNotConnectToHost.php b/src/OpenSearch/Common/Exceptions/Curl/CouldNotConnectToHost.php index 63ab1834..cd7ad8b4 100644 --- a/src/OpenSearch/Common/Exceptions/Curl/CouldNotConnectToHost.php +++ b/src/OpenSearch/Common/Exceptions/Curl/CouldNotConnectToHost.php @@ -25,6 +25,11 @@ use OpenSearch\Common\Exceptions\OpenSearchException; use OpenSearch\Common\Exceptions\TransportException; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class CouldNotConnectToHost extends TransportException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/Curl/CouldNotResolveHostException.php b/src/OpenSearch/Common/Exceptions/Curl/CouldNotResolveHostException.php index ac4cc9c0..6e6c91d5 100644 --- a/src/OpenSearch/Common/Exceptions/Curl/CouldNotResolveHostException.php +++ b/src/OpenSearch/Common/Exceptions/Curl/CouldNotResolveHostException.php @@ -24,6 +24,11 @@ use OpenSearch\Common\Exceptions\OpenSearchException; use OpenSearch\Common\Exceptions\TransportException; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class CouldNotResolveHostException extends TransportException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/Curl/OperationTimeoutException.php b/src/OpenSearch/Common/Exceptions/Curl/OperationTimeoutException.php index f009d132..137a4ba9 100644 --- a/src/OpenSearch/Common/Exceptions/Curl/OperationTimeoutException.php +++ b/src/OpenSearch/Common/Exceptions/Curl/OperationTimeoutException.php @@ -24,6 +24,11 @@ use OpenSearch\Common\Exceptions\OpenSearchException; use OpenSearch\Common\Exceptions\TransportException; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class OperationTimeoutException extends TransportException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/Forbidden403Exception.php b/src/OpenSearch/Common/Exceptions/Forbidden403Exception.php index e6cc8d6a..d7572e22 100644 --- a/src/OpenSearch/Common/Exceptions/Forbidden403Exception.php +++ b/src/OpenSearch/Common/Exceptions/Forbidden403Exception.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class Forbidden403Exception extends \Exception implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/InvalidArgumentException.php b/src/OpenSearch/Common/Exceptions/InvalidArgumentException.php index 78a12f1e..f9ffb68b 100644 --- a/src/OpenSearch/Common/Exceptions/InvalidArgumentException.php +++ b/src/OpenSearch/Common/Exceptions/InvalidArgumentException.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class InvalidArgumentException extends \InvalidArgumentException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/MaxRetriesException.php b/src/OpenSearch/Common/Exceptions/MaxRetriesException.php index fc1eb71c..296dc69b 100644 --- a/src/OpenSearch/Common/Exceptions/MaxRetriesException.php +++ b/src/OpenSearch/Common/Exceptions/MaxRetriesException.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class MaxRetriesException extends TransportException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/Missing404Exception.php b/src/OpenSearch/Common/Exceptions/Missing404Exception.php index 7105f475..35d16e35 100644 --- a/src/OpenSearch/Common/Exceptions/Missing404Exception.php +++ b/src/OpenSearch/Common/Exceptions/Missing404Exception.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class Missing404Exception extends \Exception implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/NoDocumentsToGetException.php b/src/OpenSearch/Common/Exceptions/NoDocumentsToGetException.php index 8bd30eed..7d43c652 100644 --- a/src/OpenSearch/Common/Exceptions/NoDocumentsToGetException.php +++ b/src/OpenSearch/Common/Exceptions/NoDocumentsToGetException.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class NoDocumentsToGetException extends ServerErrorResponseException implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/RequestTimeout408Exception.php b/src/OpenSearch/Common/Exceptions/RequestTimeout408Exception.php index 80e26aa7..31025246 100644 --- a/src/OpenSearch/Common/Exceptions/RequestTimeout408Exception.php +++ b/src/OpenSearch/Common/Exceptions/RequestTimeout408Exception.php @@ -21,6 +21,11 @@ namespace OpenSearch\Common\Exceptions; +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ class RequestTimeout408Exception extends BadRequest400Exception implements OpenSearchException { } diff --git a/src/OpenSearch/Common/Exceptions/NoAsyncClientException.php b/src/OpenSearch/ConnectionPool/Selectors/RandomSelector.php similarity index 52% rename from src/OpenSearch/Common/Exceptions/NoAsyncClientException.php rename to src/OpenSearch/ConnectionPool/Selectors/RandomSelector.php index b95ce053..819adf70 100644 --- a/src/OpenSearch/Common/Exceptions/NoAsyncClientException.php +++ b/src/OpenSearch/ConnectionPool/Selectors/RandomSelector.php @@ -1,5 +1,7 @@ current % count($connections)]; + + $this->current += 1; + + return $returnConnection; + } +} diff --git a/src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php b/src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php new file mode 100644 index 00000000..aef1588f --- /dev/null +++ b/src/OpenSearch/ConnectionPool/Selectors/StickyRoundRobinSelector.php @@ -0,0 +1,62 @@ +current]->isAlive()) { + return $connections[$this->current]; + } + + $this->currentCounter += 1; + $this->current = $this->currentCounter % count($connections); + + return $connections[$this->current]; + } +} diff --git a/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php b/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php new file mode 100644 index 00000000..26592d28 --- /dev/null +++ b/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php @@ -0,0 +1,52 @@ + $connectionPoolParams + */ + public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) + { + parent::__construct($connections, $selector, $factory, $connectionPoolParams); + } + + public function nextConnection(bool $force = false): ConnectionInterface + { + return $this->selector->select($this->connections); + } + + public function scheduleCheck(): void + { + } +} diff --git a/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php b/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php new file mode 100644 index 00000000..64bd7d85 --- /dev/null +++ b/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php @@ -0,0 +1,186 @@ + $connectionPoolParams + */ + public function __construct( + $connections, + SelectorInterface $selector, + ConnectionFactoryInterface $factory, + $connectionPoolParams + ) { + parent::__construct($connections, $selector, $factory, $connectionPoolParams); + + $this->setConnectionPoolParams($connectionPoolParams); + $this->nextSniff = time() + $this->sniffingInterval; + } + + public function nextConnection(bool $force = false): ConnectionInterface + { + $this->sniff($force); + + $size = count($this->connections); + while ($size--) { + /** + * @var Connection $connection + */ + $connection = $this->selector->select($this->connections); + if ($connection->isAlive() === true || $connection->ping() === true) { + return $connection; + } + } + + if ($force === true) { + throw new NoNodesAvailableException("No alive nodes found in your cluster"); + } + + return $this->nextConnection(true); + } + + public function scheduleCheck(): void + { + $this->nextSniff = -1; + } + + private function sniff(bool $force = false): void + { + if ($force === false && $this->nextSniff > time()) { + return; + } + + $total = count($this->connections); + + while ($total--) { + /** + * @var Connection $connection + */ + $connection = $this->selector->select($this->connections); + + if ($connection->isAlive() xor $force) { + continue; + } + + if ($this->sniffConnection($connection) === true) { + return; + } + } + + if ($force === true) { + return; + } + + foreach ($this->seedConnections as $connection) { + /** + * @var Connection $connection + */ + if ($this->sniffConnection($connection) === true) { + return; + } + } + } + + private function sniffConnection(Connection $connection): bool + { + try { + $response = $connection->sniff(); + } catch (OperationTimeoutException $exception) { + return false; + } + + $nodes = $this->parseClusterState($response); + + if (count($nodes) === 0) { + return false; + } + + $this->connections = []; + + foreach ($nodes as $node) { + $nodeDetails = [ + 'host' => $node['host'], + 'port' => $node['port'], + ]; + $this->connections[] = $this->connectionFactory->create($nodeDetails); + } + + $this->nextSniff = time() + $this->sniffingInterval; + + return true; + } + + /** + * @return list + */ + private function parseClusterState($nodeInfo): array + { + $pattern = '/([^:]*):(\d+)/'; + $hosts = []; + + foreach ($nodeInfo['nodes'] as $node) { + if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) { + if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) { + $hosts[] = [ + 'host' => $match[1], + 'port' => (int)$match[2], + ]; + } + } + } + + return $hosts; + } + + /** + * @param array $connectionPoolParams + */ + private function setConnectionPoolParams(array $connectionPoolParams): void + { + $this->sniffingInterval = (int)($connectionPoolParams['sniffingInterval'] ?? 300); + } +} diff --git a/src/OpenSearch/ConnectionPool/StaticConnectionPool.php b/src/OpenSearch/ConnectionPool/StaticConnectionPool.php new file mode 100644 index 00000000..8d62c62b --- /dev/null +++ b/src/OpenSearch/ConnectionPool/StaticConnectionPool.php @@ -0,0 +1,110 @@ + $connectionPoolParams + */ + public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) + { + parent::__construct($connections, $selector, $factory, $connectionPoolParams); + $this->scheduleCheck(); + } + + public function nextConnection(bool $force = false): ConnectionInterface + { + $skipped = []; + + $total = count($this->connections); + while ($total--) { + /** + * @var Connection $connection + */ + $connection = $this->selector->select($this->connections); + if ($connection->isAlive() === true) { + return $connection; + } + + if ($this->readyToRevive($connection) === true) { + if ($connection->ping() === true) { + return $connection; + } + } else { + $skipped[] = $connection; + } + } + + // All "alive" nodes failed, force pings on "dead" nodes + foreach ($skipped as $connection) { + if ($connection->ping() === true) { + return $connection; + } + } + + throw new NoNodesAvailableException("No alive nodes found in your cluster"); + } + + public function scheduleCheck(): void + { + foreach ($this->connections as $connection) { + $connection->markDead(); + } + } + + private function readyToRevive(Connection $connection): bool + { + $timeout = min( + $this->pingTimeout * pow(2, $connection->getPingFailures()), + $this->maxPingTimeout + ); + + if ($connection->getLastPing() + $timeout < time()) { + return true; + } else { + return false; + } + } +} diff --git a/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php b/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php new file mode 100644 index 00000000..643aeb35 --- /dev/null +++ b/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php @@ -0,0 +1,93 @@ + $connectionPoolParams + */ + public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) + { + parent::__construct($connections, $selector, $factory, $connectionPoolParams); + } + + public function nextConnection(bool $force = false): ConnectionInterface + { + $total = count($this->connections); + while ($total--) { + /** + * @var Connection $connection +*/ + $connection = $this->selector->select($this->connections); + if ($connection->isAlive() === true) { + return $connection; + } + + if ($this->readyToRevive($connection) === true) { + return $connection; + } + } + + throw new NoNodesAvailableException("No alive nodes found in your cluster"); + } + + public function scheduleCheck(): void + { + } + + private function readyToRevive(Connection $connection): bool + { + $timeout = min( + $this->pingTimeout * pow(2, $connection->getPingFailures()), + $this->maxPingTimeout + ); + + if ($connection->getLastPing() + $timeout < time()) { + return true; + } else { + return false; + } + } +} diff --git a/src/OpenSearch/Connections/Connection.php b/src/OpenSearch/Connections/Connection.php new file mode 100644 index 00000000..4ff592ed --- /dev/null +++ b/src/OpenSearch/Connections/Connection.php @@ -0,0 +1,794 @@ +> + */ + protected $headers = []; + + /** + * @var bool + */ + protected $isAlive = false; + + /** + * @var float + */ + private $pingTimeout = 1; //TODO expose this + + /** + * @var int + */ + private $lastPing = 0; + + /** + * @var int + */ + private $failedPings = 0; + + /** + * @var mixed[] + */ + private $lastRequest = array(); + + /** + * @var string + */ + private $OSVersion = null; + + /** + * @param array{host: string, port?: int, scheme?: string, user?: string, pass?: string, path?: string} $hostDetails + * @param array{client?: array{headers?: array>, curl?: array}} $connectionParams + */ + public function __construct( + callable $handler, + array $hostDetails, + array $connectionParams, + SerializerInterface $serializer, + LoggerInterface $log, + LoggerInterface $trace + ) { + if (isset($hostDetails['port']) !== true) { + $hostDetails['port'] = 9200; + } + + if (isset($hostDetails['scheme'])) { + $this->transportSchema = $hostDetails['scheme']; + } + + // Only Set the Basic if API Key is not set and setBasicAuthentication was not called prior + if (isset($connectionParams['client']['headers']['Authorization']) === false + && isset($connectionParams['client']['curl'][CURLOPT_HTTPAUTH]) === false + && isset($hostDetails['user']) + && isset($hostDetails['pass']) + ) { + $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC; + $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass']; + } + + $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port']; + + if (isset($connectionParams['client']['headers'])) { + $this->headers = $connectionParams['client']['headers']; + unset($connectionParams['client']['headers']); + } + + // Add the User-Agent using the format: / (metadata-values) + $this->headers['User-Agent'] = [sprintf( + 'opensearch-php/%s (%s %s; PHP %s)', + Client::VERSION, + PHP_OS, + $this->getOSVersion(), + PHP_VERSION + )]; + + $host = $hostDetails['host']; + $path = null; + if (isset($hostDetails['path']) === true) { + $path = $hostDetails['path']; + } + $port = $hostDetails['port']; + + $this->host = $host; + $this->path = $path; + $this->port = $port; + $this->log = $log; + $this->trace = $trace; + $this->connectionParams = $connectionParams; + $this->serializer = $serializer; + + $this->handler = $this->wrapHandler($handler); + } + + /** + * @param string $method + * @param string $uri + * @param null|array $params + * @param mixed $body + * @param array $options + * @param Transport|null $transport + * @return mixed + */ + public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null) + { + if ($body !== null) { + $body = $this->serializer->serialize($body); + } + + $headers = $this->headers; + if (isset($options['client']['headers']) && is_array($options['client']['headers'])) { + $headers = array_merge($this->headers, $options['client']['headers']); + } + + $host = $this->host; + if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) { + $host .= ':' . $this->port; + } + + $request = [ + 'http_method' => $method, + 'scheme' => $this->transportSchema, + 'uri' => $this->getURI($uri, $params), + 'body' => $body, + 'headers' => array_merge( + [ + 'Host' => [$host] + ], + $headers + ) + ]; + + $request = array_replace_recursive($request, $this->connectionParams, $options); + + // RingPHP does not like if client is empty + if (empty($request['client'])) { + unset($request['client']); + } + + $handler = $this->handler; + $future = $handler($request, $this, $transport, $options); + + return $future; + } + + public function getTransportSchema(): string + { + return $this->transportSchema; + } + + public function getLastRequestInfo(): array + { + return $this->lastRequest; + } + + private function wrapHandler(callable $handler): callable + { + return function (array $request, Connection $connection, ?Transport $transport, $options) use ($handler) { + $this->lastRequest = []; + $this->lastRequest['request'] = $request; + + // Send the request using the wrapped handler. + $response = Core::proxy( + $handler($request), + function ($response) use ($connection, $transport, $request, $options) { + $this->lastRequest['response'] = $response; + + if (isset($response['error']) === true) { + if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { + $this->log->warning("Curl exception encountered."); + + $exception = $this->getCurlRetryException($request, $response); + + $this->logRequestFail($request, $response, $exception); + + $node = $connection->getHost(); + $this->log->warning("Marking node $node dead."); + $connection->markDead(); + + // If the transport has not been set, we are inside a Ping or Sniff, + // so we don't want to retrigger retries anyway. + // + // TODO this could be handled better, but we are limited because connectionpools do not + // have access to Transport. Architecturally, all of this needs to be refactored + if (isset($transport) === true) { + $transport->connectionPool->scheduleCheck(); + + $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; + $shouldRetry = $transport->shouldRetry($request); + $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; + + $this->log->warning("Retries left? $shouldRetryText"); + if ($shouldRetry && !$neverRetry) { + return $transport->performRequest( + $request['http_method'], + $request['uri'], + [], + $request['body'], + $options + ); + } + } + + $this->log->warning("Out of retries, throwing exception from $node"); + // Only throw if we run out of retries + throw $exception; + } else { + // Something went seriously wrong, bail + $exception = new TransportException($response['error']->getMessage()); + $this->logRequestFail($request, $response, $exception); + throw $exception; + } + } else { + $connection->markAlive(); + + if (isset($response['headers']['Warning'])) { + $this->logWarning($request, $response); + } + if (isset($response['body']) === true) { + $response['body'] = stream_get_contents($response['body']); + $this->lastRequest['response']['body'] = $response['body']; + } + + if ($response['status'] >= 400 && $response['status'] < 500) { + $ignore = $request['client']['ignore'] ?? []; + // Skip 404 if succeeded true in the body (e.g. clear_scroll) + $body = $response['body'] ?? ''; + if (strpos($body, '"succeeded":true') !== false) { + $ignore[] = 404; + } + $this->process4xxError($request, $response, $ignore); + } elseif ($response['status'] >= 500) { + $ignore = $request['client']['ignore'] ?? []; + $this->process5xxError($request, $response, $ignore); + } + + // No error, deserialize + $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); + } + $this->logRequestSuccess($request, $response); + + return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; + } + ); + + return $response; + }; + } + + /** + * @param array|null $params + */ + private function getURI(string $uri, ?array $params): string + { + if (isset($params) === true && !empty($params)) { + $params = array_map( + function ($value) { + if ($value === true) { + return 'true'; + } elseif ($value === false) { + return 'false'; + } + + return $value; + }, + $params + ); + + $uri .= '?' . http_build_query($params); + } + + if ($this->path !== null) { + $uri = $this->path . $uri; + } + + return $uri; + } + + /** + * @return array> + */ + public function getHeaders(): array + { + return $this->headers; + } + + public function logWarning(array $request, array $response): void + { + $this->log->warning('Deprecation', $response['headers']['Warning']); + } + + /** + * Log a successful request + * + * @param array $request + * @param array $response + * @return void + */ + public function logRequestSuccess(array $request, array $response): void + { + $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; + $uri = $this->addPortInUrl($response['effective_url'], (int) $port); + + $this->log->debug('Request Body', array($request['body'])); + $this->log->info( + 'Request Success:', + array( + 'method' => $request['http_method'], + 'uri' => $uri, + 'port' => $port, + 'headers' => $request['headers'], + 'HTTP code' => $response['status'], + 'duration' => $response['transfer_stats']['total_time'], + ) + ); + $this->log->debug('Response', array($response['body'])); + + // Build the curl command for Trace. + $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); + $this->trace->info($curlCommand); + $this->trace->debug( + 'Response:', + array( + 'response' => $response['body'], + 'method' => $request['http_method'], + 'uri' => $uri, + 'port' => $port, + 'HTTP code' => $response['status'], + 'duration' => $response['transfer_stats']['total_time'], + ) + ); + } + + /** + * Log a failed request + * + * @param array $request + * @param array $response + * @param \Throwable $exception + * + * @return void + */ + public function logRequestFail(array $request, array $response, \Throwable $exception): void + { + $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; + $uri = $this->addPortInUrl($response['effective_url'], (int) $port); + + $this->log->debug('Request Body', array($request['body'])); + $this->log->warning( + 'Request Failure:', + array( + 'method' => $request['http_method'], + 'uri' => $uri, + 'port' => $port, + 'headers' => $request['headers'], + 'HTTP code' => $response['status'], + 'duration' => $response['transfer_stats']['total_time'], + 'error' => $exception->getMessage(), + ) + ); + $this->log->warning('Response', array($response['body'])); + + // Build the curl command for Trace. + $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); + $this->trace->info($curlCommand); + $this->trace->debug( + 'Response:', + array( + 'response' => $response, + 'method' => $request['http_method'], + 'uri' => $uri, + 'port' => $port, + 'HTTP code' => $response['status'], + 'duration' => $response['transfer_stats']['total_time'], + ) + ); + } + + public function ping(): bool + { + $options = [ + 'client' => [ + 'timeout' => $this->pingTimeout, + 'never_retry' => true, + 'verbose' => true + ] + ]; + try { + $response = $this->performRequest('HEAD', '/', null, null, $options); + $response = $response->wait(); + } catch (TransportException $exception) { + $this->markDead(); + + return false; + } + + if ($response['status'] === 200) { + $this->markAlive(); + + return true; + } else { + $this->markDead(); + + return false; + } + } + + /** + * @return array|\GuzzleHttp\Ring\Future\FutureArray + */ + public function sniff() + { + $options = [ + 'client' => [ + 'timeout' => $this->pingTimeout, + 'never_retry' => true + ] + ]; + + return $this->performRequest('GET', '/_nodes/', null, null, $options); + } + + public function isAlive(): bool + { + return $this->isAlive; + } + + public function markAlive(): void + { + $this->failedPings = 0; + $this->isAlive = true; + $this->lastPing = time(); + } + + public function markDead(): void + { + $this->isAlive = false; + $this->failedPings += 1; + $this->lastPing = time(); + } + + public function getLastPing(): int + { + return $this->lastPing; + } + + public function getPingFailures(): int + { + return $this->failedPings; + } + + public function getHost(): string + { + return $this->host; + } + + public function getUserPass(): ?string + { + return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null; + } + + public function getPath(): ?string + { + return $this->path; + } + + /** + * @return int + */ + public function getPort() + { + return $this->port; + } + + protected function getCurlRetryException(array $request, array $response): OpenSearchException + { + $exception = null; + $message = $response['error']->getMessage(); + $exception = new MaxRetriesException($message); + switch ($response['curl']['errno']) { + case 6: + $exception = new CouldNotResolveHostException($message, 0, $exception); + break; + case 7: + $exception = new CouldNotConnectToHost($message, 0, $exception); + break; + case 28: + $exception = new OperationTimeoutException($message, 0, $exception); + break; + } + + return $exception; + } + + /** + * Get the OS version using php_uname if available + * otherwise it returns an empty string + * + * @see https://github.com/elastic/elasticsearch-php/issues/922 + */ + private function getOSVersion(): string + { + if ($this->OSVersion === null) { + $this->OSVersion = strpos(strtolower(ini_get('disable_functions')), 'php_uname') !== false + ? '' + : php_uname("r"); + } + return $this->OSVersion; + } + + /** + * Add the port value in the URL if not present + */ + private function addPortInUrl(string $uri, int $port): string + { + if (strpos($uri, ':', 7) !== false) { + return $uri; + } + return preg_replace('#([^/])/([^/])#', sprintf("$1:%s/$2", $port), $uri, 1); + } + + /** + * Construct a string cURL command + */ + private function buildCurlCommand(string $method, string $url, ?string $body): string + { + if (strpos($url, '?') === false) { + $url .= '?pretty=true'; + } else { + str_replace('?', '?pretty=true', $url); + } + + $curlCommand = 'curl -X' . strtoupper($method); + $curlCommand .= " '" . $url . "'"; + + if (isset($body) === true && $body !== '') { + $curlCommand .= " -d '" . $body . "'"; + } + + return $curlCommand; + } + + /** + * @throws OpenSearchException + */ + private function process4xxError(array $request, array $response, array $ignore): void + { + $statusCode = $response['status']; + + /** + * @var \Exception $exception + */ + $exception = $this->tryDeserialize400Error($response); + + if (array_search($response['status'], $ignore) !== false) { + return; + } + + $responseBody = $this->convertBodyToString($response['body'], $statusCode, $exception); + if ($statusCode === 401) { + $exception = new Unauthorized401Exception($responseBody, $statusCode); + } elseif ($statusCode === 403) { + $exception = new Forbidden403Exception($responseBody, $statusCode); + } elseif ($statusCode === 404) { + $exception = new Missing404Exception($responseBody, $statusCode); + } elseif ($statusCode === 409) { + $exception = new Conflict409Exception($responseBody, $statusCode); + } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) { + $exception = new ScriptLangNotSupportedException($responseBody. $statusCode); + } elseif ($statusCode === 408) { + $exception = new RequestTimeout408Exception($responseBody, $statusCode); + } else { + $exception = new BadRequest400Exception($responseBody, $statusCode); + } + + $this->logRequestFail($request, $response, $exception); + + throw $exception; + } + + /** + * @throws OpenSearchException + */ + private function process5xxError(array $request, array $response, array $ignore): void + { + $statusCode = (int) $response['status']; + $responseBody = $response['body']; + + /** + * @var \Exception $exception + */ + $exception = $this->tryDeserialize500Error($response); + + $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage(); + $this->log->error($exceptionText); + $this->log->error($exception->getTraceAsString()); + + if (array_search($statusCode, $ignore) !== false) { + return; + } + + if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) { + $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception); + } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) { + $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception); + } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) { + $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception); + } else { + $exception = new ServerErrorResponseException( + $this->convertBodyToString($responseBody, $statusCode, $exception), + $statusCode + ); + } + + $this->logRequestFail($request, $response, $exception); + + throw $exception; + } + + private function convertBodyToString($body, int $statusCode, Exception $exception): string + { + if (empty($body)) { + return sprintf( + "Unknown %d error from OpenSearch %s", + $statusCode, + $exception->getMessage() + ); + } + // if body is not string, we convert it so it can be used as Exception message + if (!is_string($body)) { + return json_encode($body); + } + return $body; + } + + private function tryDeserialize400Error(array $response): OpenSearchException + { + return $this->tryDeserializeError($response, BadRequest400Exception::class); + } + + private function tryDeserialize500Error(array $response): OpenSearchException + { + return $this->tryDeserializeError($response, ServerErrorResponseException::class); + } + + private function tryDeserializeError(array $response, string $errorClass): OpenSearchException + { + $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']); + if (is_array($error) === true) { + if (isset($error['error']) === false) { + // <2.0 "i just blew up" nonstructured exception + // $error is an array but we don't know the format, reuse the response body instead + // added json_encode to convert into a string + return new $errorClass(json_encode($response['body']), (int) $response['status']); + } + + // 2.0 structured exceptions + if (is_array($error['error']) && array_key_exists('reason', $error['error']) === true) { + // Try to use root cause first (only grabs the first root cause) + $info = $error['error']['root_cause'][0] ?? $error['error']; + $cause = $info['reason']; + $type = $info['type']; + // added json_encode to convert into a string + $original = new $errorClass(json_encode($response['body']), $response['status']); + + return new $errorClass("$type: $cause", (int) $response['status'], $original); + } + // <2.0 semi-structured exceptions + // added json_encode to convert into a string + $original = new $errorClass(json_encode($response['body']), $response['status']); + + $errorEncoded = $error['error']; + if (is_array($errorEncoded)) { + $errorEncoded = json_encode($errorEncoded); + } + return new $errorClass($errorEncoded, (int) $response['status'], $original); + } + + // if responseBody is not string, we convert it so it can be used as Exception message + $responseBody = $response['body']; + if (!is_string($responseBody)) { + $responseBody = json_encode($responseBody); + } + + // <2.0 "i just blew up" nonstructured exception + return new $errorClass($responseBody); + } +} diff --git a/src/OpenSearch/Connections/ConnectionFactory.php b/src/OpenSearch/Connections/ConnectionFactory.php new file mode 100644 index 00000000..5a02c441 --- /dev/null +++ b/src/OpenSearch/Connections/ConnectionFactory.php @@ -0,0 +1,86 @@ +>, curl?: array}} $connectionParams + */ + public function __construct(callable $handler, array $connectionParams, SerializerInterface $serializer, LoggerInterface $logger, LoggerInterface $tracer) + { + $this->handler = $handler; + $this->connectionParams = $connectionParams; + $this->logger = $logger; + $this->tracer = $tracer; + $this->serializer = $serializer; + } + + public function create(array $hostDetails): ConnectionInterface + { + if (isset($hostDetails['path'])) { + $hostDetails['path'] = rtrim($hostDetails['path'], '/'); + } + + return new Connection( + $this->handler, + $hostDetails, + $this->connectionParams, + $this->serializer, + $this->logger, + $this->tracer + ); + } +} diff --git a/src/OpenSearch/Handlers/SigV4Handler.php b/src/OpenSearch/Handlers/SigV4Handler.php new file mode 100644 index 00000000..3b88f6aa --- /dev/null +++ b/src/OpenSearch/Handlers/SigV4Handler.php @@ -0,0 +1,155 @@ +>, body: string|resource|null, client?: array} + * + * @deprecated in 2.3.2 and will be removed in 3.0.0. Use \OpenSearch\Aws\SigV4RequestFactory instead. + */ +class SigV4Handler +{ + /** + * @var SignatureV4 + */ + private $signer; + /** + * @var callable + */ + private $credentialProvider; + /** + * @var callable + */ + private $wrappedHandler; + + /** + * A handler that applies an AWS V4 signature before dispatching requests. + * + * @param string $region The region of your Amazon + * OpenSearch Service domain + * @param string $service The Service of your Amazon + * OpenSearch Service domain + * @param callable|null $credentialProvider A callable that returns a + * promise that is fulfilled + * with an instance of + * Aws\Credentials\Credentials + * @param callable|null $wrappedHandler A RingPHP handler + */ + public function __construct( + string $region, + string $service, + ?callable $credentialProvider = null, + ?callable $wrappedHandler = null + ) { + self::assertDependenciesInstalled(); + $this->signer = new SignatureV4($service, $region); + $this->wrappedHandler = $wrappedHandler + ?: ClientBuilder::defaultHandler(); + $this->credentialProvider = $credentialProvider + ?: CredentialProvider::defaultProvider(); + } + + /** + * @phpstan-param RingPhpRequest $request + */ + public function __invoke(array $request) + { + $creds = call_user_func($this->credentialProvider)->wait(); + + $psr7Request = $this->createPsr7Request($request); + $psr7Request = $psr7Request->withHeader('x-amz-content-sha256', Utils::hash($psr7Request->getBody(), 'sha256')); + $signedRequest = $this->signer + ->signRequest($psr7Request, $creds); + return call_user_func($this->wrappedHandler, $this->createRingRequest($signedRequest, $request)); + } + + public static function assertDependenciesInstalled(): void + { + if (!class_exists(SignatureV4::class)) { + throw new RuntimeException( + 'The AWS SDK for PHP must be installed in order to use the SigV4 signing handler' + ); + } + } + + /** + * @phpstan-param RingPhpRequest $ringPhpRequest + */ + private function createPsr7Request(array $ringPhpRequest): Request + { + // fix for uppercase 'Host' array key in elasticsearch-php 5.3.1 and backward compatible + // https://github.com/aws/aws-sdk-php/issues/1225 + $hostKey = isset($ringPhpRequest['headers']['Host']) ? 'Host' : 'host'; + + // Amazon ES/OS listens on standard ports (443 for HTTPS, 80 for HTTP). + // Consequently, the port should be stripped from the host header. + $parsedUrl = parse_url($ringPhpRequest['headers'][$hostKey][0]); + if (isset($parsedUrl['host'])) { + $ringPhpRequest['headers'][$hostKey][0] = $parsedUrl['host']; + } + + // Create a PSR-7 URI from the array passed to the handler + $uri = (new Uri($ringPhpRequest['uri'])) + ->withScheme($ringPhpRequest['scheme']) + ->withHost($ringPhpRequest['headers'][$hostKey][0]); + if (isset($ringPhpRequest['query_string'])) { + $uri = $uri->withQuery($ringPhpRequest['query_string']); + } + + // Create a PSR-7 request from the array passed to the handler + return new Request( + $ringPhpRequest['http_method'], + $uri, + $ringPhpRequest['headers'], + $ringPhpRequest['body'] + ); + } + + /** + * @phpstan-param RingPhpRequest $originalRequest + * + * @phpstan-return RingPhpRequest + */ + private function createRingRequest(RequestInterface $request, array $originalRequest): array + { + $uri = $request->getUri(); + $body = (string) $request->getBody(); + + // RingPHP currently expects empty message bodies to be null: + // https://github.com/guzzle/RingPHP/blob/4c8fe4c48a0fb7cc5e41ef529e43fecd6da4d539/src/Client/CurlFactory.php#L202 + if (empty($body)) { + $body = null; + } + + // Reset the explicit port in the URL + $client = $originalRequest['client']; + unset($client['curl'][CURLOPT_PORT]); + + $ringRequest = [ + 'http_method' => $request->getMethod(), + 'scheme' => $uri->getScheme(), + 'uri' => $uri->getPath(), + 'body' => $body, + 'headers' => $request->getHeaders(), + 'client' => $client + ]; + if ($uri->getQuery()) { + $ringRequest['query_string'] = $uri->getQuery(); + } + + return $ringRequest; + } +} diff --git a/src/OpenSearch/HttpTransport.php b/src/OpenSearch/HttpTransport.php new file mode 100644 index 00000000..c7c4608e --- /dev/null +++ b/src/OpenSearch/HttpTransport.php @@ -0,0 +1,44 @@ +requestFactory->createRequest($method, $uri, $params, $body, $headers); + } + + /** + * {@inheritdoc} + */ + public function sendRequest( + string $method, + string $uri, + array $params = [], + mixed $body = null, + array $headers = [], + ): array|string|null { + $request = $this->createRequest($method, $uri, $params, $body, $headers); + $response = $this->client->sendRequest($request); + return $this->serializer->deserialize($response->getBody()->getContents(), $response->getHeaders()); + } + +} diff --git a/src/OpenSearch/LegacyTransportWrapper.php b/src/OpenSearch/LegacyTransportWrapper.php new file mode 100644 index 00000000..2cadab6e --- /dev/null +++ b/src/OpenSearch/LegacyTransportWrapper.php @@ -0,0 +1,34 @@ +transport->performRequest($method, $uri, $params, $body); + return $this->transport->resultOrFuture($promise); + } + +} diff --git a/src/OpenSearch/Namespaces/AbstractNamespace.php b/src/OpenSearch/Namespaces/AbstractNamespace.php index 26b9a40e..1c2e6a55 100644 --- a/src/OpenSearch/Namespaces/AbstractNamespace.php +++ b/src/OpenSearch/Namespaces/AbstractNamespace.php @@ -65,20 +65,6 @@ public function __construct(Transport $transport, callable|EndpointFactoryInterf $this->endpointFactory = $endpointFactory; } - public function isAsync(): bool - { - return $this->isAsync; - } - - /** - * Set the client to run in async mode. - */ - public function setAsync(bool $isAsync): static - { - $this->isAsync = $isAsync; - return $this; - } - /** * @return null|mixed */ @@ -93,18 +79,16 @@ public function extractArgument(array &$params, string $arg) } } - protected function performRequest(AbstractEndpoint $endpoint): Promise|ResponseInterface + protected function performRequest(AbstractEndpoint $endpoint) { - $request = $this->transport->createRequest( + $response = $this->transport->performRequest( $endpoint->getMethod(), $endpoint->getURI(), $endpoint->getParams(), $endpoint->getBody(), + $endpoint->getOptions() ); - if ($this->isAsync()) { - return $this->transport->sendAsyncRequest($request); - } - return $this->transport->sendRequest($request); - } + return $this->transport->resultOrFuture($response, $endpoint->getOptions()); + } } diff --git a/src/OpenSearch/RequestFactory.php b/src/OpenSearch/RequestFactory.php index 2bcf00ca..e0729b97 100644 --- a/src/OpenSearch/RequestFactory.php +++ b/src/OpenSearch/RequestFactory.php @@ -2,21 +2,28 @@ namespace OpenSearch; -use Http\Discovery\Psr17FactoryDiscovery; use OpenSearch\Serializers\SerializerInterface; -use OpenSearch\Serializers\SmartSerializer; use Psr\Http\Message\RequestFactoryInterface as PsrRequestFactoryInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\UriFactoryInterface; +/** + * Request factory that uses PSR-7, PSR-17 and PSR-18 interfaces. + */ final class RequestFactory implements RequestFactoryInterface { - private ?SerializerInterface $serializer = null; - private ?PsrRequestFactoryInterface $requestFactory = null; - private ?StreamFactoryInterface $streamFactory = null; - private ?UriFactoryInterface $uriFactory = null; + public function __construct( + protected PsrRequestFactoryInterface $psrRequestFactory, + protected StreamFactoryInterface $streamFactory, + protected UriFactoryInterface $uriFactory, + protected SerializerInterface $serializer, + ) { + } + /** + * {@inheritdoc} + */ public function createRequest( string $method, string $uri, @@ -24,12 +31,12 @@ public function createRequest( string|array|null $body = null, array $headers = [], ): RequestInterface { - $uri = $this->getUriFactory()->createUri($uri); + $uri = $this->uriFactory->createUri($uri); $uri = $uri->withQuery(http_build_query($params)); - $request = $this->getRequestFactory()->createRequest($method, $uri); + $request = $this->psrRequestFactory->createRequest($method, $uri); if ($body !== null) { - $bodyJson = $this->getSerializer()->serialize($body); - $bodyStream = $this->getStreamFactory()->createStream($bodyJson); + $bodyJson = $this->serializer->serialize($body); + $bodyStream = $this->streamFactory->createStream($bodyJson); $request = $request->withBody($bodyStream); } foreach ($headers as $name => $value) { @@ -38,100 +45,4 @@ public function createRequest( return $request; } - /** - * Get the serializer to use for serializing request and response bodies. - */ - public function getSerializer(): ?SerializerInterface - { - if ($this->serializer) { - return $this->serializer; - } - return new SmartSerializer(); - } - - /** - * Set the serializer to use for serializing request and response bodies. - */ - public function setSerializer(?SerializerInterface $serializer): self - { - $this->serializer = $serializer; - return $this; - } - - /** - * Get the request factory to use for creating requests. - * - * If no request factory is set, the discovery mechanism will be used to find - * a request factory. - * - * @throws \Http\Discovery\Exception\NotFoundException - */ - private function getRequestFactory(): PsrRequestFactoryInterface - { - if ($this->requestFactory) { - return $this->requestFactory; - } - - return $this->requestFactory = Psr17FactoryDiscovery::findRequestFactory(); - } - - /** - * Set the request factory to use for creating requests. - */ - public function setRequestFactory(PsrRequestFactoryInterface $requestFactory): self - { - $this->requestFactory = $requestFactory; - return $this; - } - - /** - * Get the stream factory to use for creating streams. - * - * If no stream factory is set, the discovery mechanism will be used to find - * a stream factory. - * - * @throws \Http\Discovery\Exception\NotFoundException - */ - private function getStreamFactory(): StreamFactoryInterface - { - if ($this->streamFactory) { - return $this->streamFactory; - } - return $this->streamFactory = Psr17FactoryDiscovery::findStreamFactory(); - } - - /** - * Set the stream factory to use for creating streams. - */ - public function setStreamFactory(?StreamFactoryInterface $streamFactory): self - { - $this->streamFactory = $streamFactory; - return $this; - } - - /** - * Get the URI factory to use for creating URIs. - * - * If no URI factory is set, the discovery mechanism will be used to find - * a URI factory. - * - * @throws \Http\Discovery\Exception\NotFoundException - */ - private function getUriFactory(): UriFactoryInterface - { - if ($this->uriFactory) { - return $this->uriFactory; - } - return $this->uriFactory = Psr17FactoryDiscovery::findUriFactory(); - } - - /** - * Set the URI factory to use for creating URIs. - */ - public function setUriFactory(?UriFactoryInterface $uriFactory): self - { - $this->uriFactory = $uriFactory; - return $this; - } - } diff --git a/src/OpenSearch/Transport.php b/src/OpenSearch/Transport.php index 0d6d369e..d3dbcd40 100644 --- a/src/OpenSearch/Transport.php +++ b/src/OpenSearch/Transport.php @@ -21,85 +21,162 @@ namespace OpenSearch; -use Http\Client\HttpAsyncClient; -use Http\Discovery\HttpAsyncClientDiscovery; -use Http\Promise\Promise; -use OpenSearch\Common\Exceptions\NoAsyncClientException; -use Psr\Http\Client\ClientInterface; -use Psr\Http\Message\RequestInterface; -use Psr\Http\Message\ResponseInterface; - -final class Transport implements TransportInterface +use GuzzleHttp\Ring\Future\FutureArrayInterface; +use OpenSearch\Common\Exceptions; +use OpenSearch\ConnectionPool\AbstractConnectionPool; +use OpenSearch\Connections\ConnectionInterface; +use Psr\Log\LoggerInterface; + +@trigger_error(__CLASS__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0.', E_USER_DEPRECATED); + +/** + * @deprecated in 2.3.2 and will be removed in 3.0.0. + */ +class Transport { - private ?HttpAsyncClient $asyncClient = null; + /** + * @var AbstractConnectionPool + */ + public $connectionPool; /** - * Transport class is responsible for dispatching requests to the - * underlying cluster connections + * @var LoggerInterface */ - public function __construct( - protected ClientInterface $client, - protected RequestFactoryInterface $requestFactory, - ) { - } + private $log; /** - * Create a new request. + * @var int */ - public function createRequest(string $method, string $uri, array $params = [], mixed $body = null): RequestInterface - { - return $this->requestFactory->createRequest($method, $uri, $params, $body); - } + public $retryAttempts = 0; /** - * {@inheritdoc} + * @var ConnectionInterface */ - public function sendAsyncRequest(RequestInterface $request): Promise - { - $httpAsyncClient = $this->getAsyncClient(); - return $httpAsyncClient->sendAsyncRequest($request); - } + public $lastConnection; + + /** + * @var int + */ + public $retries; /** - * {@inheritdoc} + * Transport class is responsible for dispatching requests to the + * underlying cluster connections + * + * @param int $retries + * @param bool $sniffOnStart + * @param ConnectionPool\AbstractConnectionPool $connectionPool + * @param \Psr\Log\LoggerInterface $log Monolog logger object */ - public function sendRequest(RequestInterface $request): ResponseInterface + public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false) { - return $this->client->sendRequest($request); + $this->log = $log; + $this->connectionPool = $connectionPool; + $this->retries = $retries; + + if ($sniffOnStart === true) { + $this->log->notice('Sniff on Start.'); + $this->connectionPool->scheduleCheck(); + } } /** - * Set the async client to use for async requests. + * Returns a single connection from the connection pool + * Potentially performs a sniffing step before returning */ - public function setAsyncClient(HttpAsyncClient $asyncClient): self + public function getConnection(): ConnectionInterface { - $this->asyncClient = $asyncClient; - return $this; + return $this->connectionPool->nextConnection(); } /** - * Get the async client to use for async requests. + * Perform a request to the Cluster * - * If no async client is set, the discovery mechanism will be used to find - * an async client. + * @param string $method HTTP method to use + * @param string $uri HTTP URI to send request to + * @param array $params Optional query parameters + * @param mixed|null $body Optional query body + * @param array $options * - * @throws NoAsyncClientException + * @throws Common\Exceptions\NoNodesAvailableException|\Exception */ - private function getAsyncClient(): HttpAsyncClient + public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface { - if ($this->asyncClient) { - return $this->asyncClient; + try { + $connection = $this->getConnection(); + } catch (Exceptions\NoNodesAvailableException $exception) { + $this->log->critical('No alive nodes found in cluster'); + throw $exception; } - if ($this->client instanceof HttpAsyncClient) { - return $this->asyncClient = $this->client; + $response = []; + $caughtException = null; + $this->lastConnection = $connection; + + $future = $connection->performRequest( + $method, + $uri, + $params, + $body, + $options, + $this + ); + + $future->promise()->then( + //onSuccess + function ($response) { + $this->retryAttempts = 0; + // Note, this could be a 4xx or 5xx error + }, + //onFailure + function ($response) { + $code = $response->getCode(); + // Ignore 400 level errors, as that means the server responded just fine + if ($code < 400 || $code >= 500) { + // Otherwise schedule a check + $this->connectionPool->scheduleCheck(); + } + } + ); + + return $future; + } + + /** + * @param FutureArrayInterface $result Response of a request (promise) + * @param array $options Options for transport + * + * @return callable|array + */ + public function resultOrFuture(FutureArrayInterface $result, array $options = []) + { + $response = null; + $async = isset($options['client']['future']) ? $options['client']['future'] : null; + if (is_null($async) || $async === false) { + do { + $result = $result->wait(); + } while ($result instanceof FutureArrayInterface); } + return $result; + } - try { - return $this->asyncClient = HttpAsyncClientDiscovery::find(); - } catch (\Exception $e) { - throw new NoAsyncClientException('No async HTTP client found. Install a package providing "php-http/async-client-implementation"', 0, $e); + public function shouldRetry(array $request): bool + { + if ($this->retryAttempts < $this->retries) { + $this->retryAttempts += 1; + + return true; } + + return false; } + /** + * Returns the last used connection so that it may be inspected. Mainly + * for debugging/testing purposes. + */ + public function getLastConnection(): ConnectionInterface + { + return $this->lastConnection; + } } diff --git a/src/OpenSearch/TransportFactory.php b/src/OpenSearch/TransportFactory.php new file mode 100644 index 00000000..f488098f --- /dev/null +++ b/src/OpenSearch/TransportFactory.php @@ -0,0 +1,131 @@ +httpClient; + } + + public function setHttpClient(?ClientInterface $httpClient): static + { + $this->httpClient = $httpClient; + return $this; + } + + protected function getRequestFactory(): ?RequestFactoryInterface + { + return $this->requestFactory; + } + + public function setRequestFactory(?RequestFactoryInterface $requestFactory): static + { + $this->requestFactory = $requestFactory; + return $this; + } + + protected function getPsrRequestFactory(): PsrRequestFactoryInterface + { + if ($this->psrRequestFactory === null) { + $this->psrRequestFactory = Psr17FactoryDiscovery::findRequestFactory(); + } + return $this->psrRequestFactory; + } + + public function setPsrRequestFactory(PsrRequestFactoryInterface $psrRequestFactory): static + { + $this->psrRequestFactory = $psrRequestFactory; + return $this; + } + + protected function getStreamFactory(): StreamFactoryInterface + { + if ($this->streamFactory === null) { + $this->streamFactory = Psr17FactoryDiscovery::findStreamFactory(); + } + return $this->streamFactory; + } + + public function setStreamFactory(StreamFactoryInterface $streamFactory): static + { + $this->streamFactory = $streamFactory; + return $this; + } + + protected function getUriFactory(): UriFactoryInterface + { + if ($this->uriFactory === null) { + $this->uriFactory = Psr17FactoryDiscovery::findUriFactory(); + } + return $this->uriFactory; + } + + public function setUriFactory(UriFactoryInterface $uriFactory): static + { + $this->uriFactory = $uriFactory; + return $this; + } + + protected function getSerializer(): Serializers\SerializerInterface + { + if ($this->serializer === null) { + $this->serializer = new SmartSerializer(); + } + return $this->serializer; + } + + public function setSerializer(Serializers\SerializerInterface $serializer): static + { + $this->serializer = $serializer; + return $this; + } + + public function createTransport(): HttpTransport + { + if ($this->requestFactory === null) { + $psrRequestFactory = $this->getPsrRequestFactory(); + $streamFactory = $this->getStreamFactory(); + $uriFactory = $this->getUriFactory(); + $serializer = $this->getSerializer(); + + $this->requestFactory = new RequestFactory( + $psrRequestFactory, + $streamFactory, + $uriFactory, + $serializer + ); + } + if ($this->httpClient === null) { + $this->httpClient = Psr18ClientDiscovery::find(); + } + + return new HttpTransport($this->httpClient, $this->requestFactory, $this->serializer); + } + +} diff --git a/src/OpenSearch/TransportInterface.php b/src/OpenSearch/TransportInterface.php index d017ac0a..b6be9c31 100644 --- a/src/OpenSearch/TransportInterface.php +++ b/src/OpenSearch/TransportInterface.php @@ -2,20 +2,22 @@ namespace OpenSearch; -use Http\Client\HttpAsyncClient; -use Psr\Http\Client\ClientInterface; -use Psr\Http\Message\RequestInterface; - -interface TransportInterface extends ClientInterface, HttpAsyncClient +/** + * Provides an interface for sending OpenSearch requests. + */ +interface TransportInterface { /** * Create a new request. + * + * @throws \Exception */ - public function createRequest( + public function sendRequest( string $method, string $uri, array $params = [], - mixed $body = null - ): RequestInterface; + mixed $body = null, + array $headers = [], + ): array|string|null; } diff --git a/util/template/client-class b/util/template/client-class index 922b0251..0f3d2e2e 100644 --- a/util/template/client-class +++ b/util/template/client-class @@ -45,6 +45,8 @@ class Client */ public $transport; + private TransportInterface $psrTransport; + /** * @var array */ @@ -69,15 +71,21 @@ class Client :namespace_properties /** - * Creates a new client instance. + * Client constructor * - * @param Transport $transport + * @param \OpenSearch\TransportInterface|\OpenSearch\Transport $transport * @param callable|EndpointFactoryInterface $endpointFactory * @param NamespaceBuilderInterface[] $registeredNamespaces */ - public function __construct(Transport $transport, callable|EndpointFactoryInterface $endpointFactory, array $registeredNamespaces) + public function __construct(TransportInterface|Transport $transport, callable|EndpointFactoryInterface $endpointFactory, array $registeredNamespaces) { - $this->transport = $transport; + if (!$transport instanceof TransportInterface) { + @trigger_error('Passing an instance of \OpenSearch\Transport to ' . __METHOD__ . '() is deprecated in 2.3.2 and will be removed in 3.0.0. Pass an instance of \OpenSearch\TransportInterface instead.', E_USER_DEPRECATED); + $this->transport = $transport; + $this->psrTransport = new LegacyTransportWrapper($transport); + } else { + $this->psrTransport = $transport; + } if (is_callable($endpointFactory)) { @trigger_error('Passing a callable as the $endpointFactory param in ' . __METHOD__ . ' is deprecated in 2.3.2 and will be removed in 3.0.0. Pass an instance of \OpenSearch\EndpointFactoryInterface instead.', E_USER_DEPRECATED); $endpoints = $endpointFactory; @@ -154,40 +162,31 @@ class Client } /** - * Sends a raw request to the cluster. - * - * @throws \Psr\Http\Client\ClientExceptionInterface|\Exception + * Sends a raw request to the cluster + * @return callable|array + * @throws NoNodesAvailableException */ - public function sendRawRequest(string $method, string $uri, array $params = [], ?string $body = null): Promise|ResponseInterface + public function request(string $method, string $uri, array $attributes = []): array|string|null { $params = $attributes['params'] ?? []; $body = $attributes['body'] ?? null; + $options = $attributes['options'] ?? []; - $request = $this->transport->createRequest($method, $uri, $params, $body); - - if ($this->isAsync()) { - return $this->transport->sendAsyncRequest($request); - } - return $this->transport->sendRequest($request); + return $this->httpTransport->sendRequest($method, $uri, $params, $body, $options['headers'] ?? []); } /** - * Perform the request. - * - * @throws \Psr\Http\Client\ClientExceptionInterface|\Exception + * @return callable|array */ - private function performRequest(EndpointInterface $endpoint): Promise|ResponseInterface + private function performRequest(AbstractEndpoint $endpoint): array|string|null { - $request = $this->transport->createRequest( + return $this->httpTransport->sendRequest( $endpoint->getMethod(), $endpoint->getURI(), $endpoint->getParams(), $endpoint->getBody(), + $endpoint->getOptions() ); - if ($this->isAsync()) { - return $this->transport->sendAsyncRequest($request); - } - return $this->transport->sendRequest($request); } }