diff --git a/composer.json b/composer.json index 9efae1db..f708c855 100644 --- a/composer.json +++ b/composer.json @@ -22,14 +22,14 @@ ], "require": { "php": "^7.3 || ^8.0", - "ext-json": ">=1.3.7", "ext-curl": "*", - "ezimuel/ringphp": "^1.2.2", - "psr/log": "^1|^2|^3", - "symfony/yaml": "*" + "ext-json": ">=1.3.7", + "elastic/transport": "^8.10", + "psr/log": "^1|^2|^3" }, "require-dev": { "ext-zip": "*", + "symfony/yaml": "*", "aws/aws-sdk-php": "^3.0", "friendsofphp/php-cs-fixer": "^3.0", "mockery/mockery": "^1.2", @@ -54,7 +54,10 @@ } }, "config": { - "sort-packages": true + "sort-packages": true, + "allow-plugins": { + "php-http/discovery": false + } }, "scripts": { "php-cs": [ diff --git a/src/OpenSearch/Client.php b/src/OpenSearch/Client.php index e679a65e..e3d8f0be 100644 --- a/src/OpenSearch/Client.php +++ b/src/OpenSearch/Client.php @@ -1891,11 +1891,10 @@ public function request(string $method, string $uri, array $attributes = []) { $params = $attributes['params'] ?? []; $body = $attributes['body'] ?? null; - $options = $attributes['options'] ?? []; - $promise = $this->transport->performRequest($method, $uri, $params, $body, $options); + $promise = $this->transport->performRequest($method, $uri, $params, $body); - return $this->transport->resultOrFuture($promise, $options); + return $this->transport->resultOrFuture($promise); } /** @@ -1911,6 +1910,6 @@ private function performRequest(AbstractEndpoint $endpoint) $endpoint->getOptions() ); - return $this->transport->resultOrFuture($promise, $endpoint->getOptions()); + return $this->transport->resultOrFuture($promise); } } diff --git a/src/OpenSearch/ClientBuilder.php b/src/OpenSearch/ClientBuilder.php index 1865bc3d..8299b3d9 100644 --- a/src/OpenSearch/ClientBuilder.php +++ b/src/OpenSearch/ClientBuilder.php @@ -21,39 +21,18 @@ namespace OpenSearch; -use Aws\Credentials\CredentialProvider; -use Aws\Credentials\Credentials; use Aws\Credentials\CredentialsInterface; -use OpenSearch\Common\Exceptions\InvalidArgumentException; +use Elastic\Transport\TransportBuilder; use OpenSearch\Common\Exceptions\RuntimeException; use OpenSearch\Common\Exceptions\AuthenticationConfigException; -use OpenSearch\ConnectionPool\AbstractConnectionPool; -use OpenSearch\ConnectionPool\Selectors\RoundRobinSelector; -use OpenSearch\ConnectionPool\Selectors\SelectorInterface; -use OpenSearch\ConnectionPool\StaticNoPingConnectionPool; -use OpenSearch\Connections\ConnectionFactory; -use OpenSearch\Connections\ConnectionFactoryInterface; -use OpenSearch\Connections\ConnectionInterface; -use OpenSearch\Handlers\SigV4Handler; use OpenSearch\Namespaces\NamespaceBuilderInterface; -use OpenSearch\Serializers\SerializerInterface; use OpenSearch\Serializers\SmartSerializer; -use GuzzleHttp\Ring\Client\CurlHandler; -use GuzzleHttp\Ring\Client\CurlMultiHandler; -use GuzzleHttp\Ring\Client\Middleware; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use ReflectionClass; class ClientBuilder { - public const ALLOWED_METHODS_FROM_CONFIG = ['includePortInHostHeader']; - - /** - * @var Transport|null - */ - private $transport; - /** * @var callable|null */ @@ -62,109 +41,39 @@ class ClientBuilder /** * @var NamespaceBuilderInterface[] */ - private $registeredNamespacesBuilders = []; - - /** - * @var ConnectionFactoryInterface|null - */ - private $connectionFactory; - - /** - * @var callable|null - */ - private $handler; + private array $registeredNamespacesBuilders = []; - /** - * @var LoggerInterface|null - */ - private $logger; - - /** - * @var LoggerInterface|null - */ - private $tracer; - - /** - * @var string|AbstractConnectionPool - */ - private $connectionPool = StaticNoPingConnectionPool::class; - - /** - * @var string|SerializerInterface|null - */ - private $serializer = SmartSerializer::class; - - /** - * @var string|SelectorInterface|null - */ - private $selector = RoundRobinSelector::class; - - /** - * @var array - */ - private $connectionPoolArgs = [ - 'randomizeHosts' => true - ]; + private LoggerInterface $logger; /** * @var array|null */ - private $hosts; - - /** - * @var array - */ - private $connectionParams; + private $hosts = ['localhost:9200']; - /** - * @var int|null - */ - private $retries; - - /** - * @var null|callable - */ - private $sigV4CredentialProvider; - - /** - * @var null|string - */ - private $sigV4Region; - - /** - * @var null|string - */ - private $sigV4Service; - - /** - * @var bool - */ - private $sniffOnStart = false; + private ?int $retries = null; /** * @var null|array */ - private $sslCert; + private ?array $sslCert = null + ; /** * @var null|array */ private $sslKey; - /** - * @var null|bool|string - */ - private $sslVerification; + private bool $sslVerification = true; /** - * @var bool + * @var array */ - private $includePortInHostHeader = false; + private array $basicAuthentication = []; - /** - * @var string|null - */ - private $basicAuthentication = null; + public function __construct() + { + $this->logger = new NullLogger(); + } /** * Create an instance of ClientBuilder @@ -174,14 +83,6 @@ public static function create(): ClientBuilder return new self(); } - /** - * Can supply first param to Client::__construct() when invoking manually or with dependency injection - */ - public function getTransport(): Transport - { - return $this->transport; - } - /** * Can supply second param to Client::__construct() when invoking manually or with dependency injection */ @@ -219,7 +120,7 @@ public static function fromConfig(array $config, bool $quiet = false): Client { $builder = new self(); foreach ($config as $key => $value) { - $method = in_array($key, self::ALLOWED_METHODS_FROM_CONFIG, true) ? $key : "set$key"; + $method = "set$key"; $reflection = new ReflectionClass($builder); if ($reflection->hasMethod($method)) { $func = $reflection->getMethod($method); @@ -239,92 +140,6 @@ public static function fromConfig(array $config, bool $quiet = false): Client return $builder->build(); } - /** - * Get the default handler - * - * @param array $multiParams - * @param array $singleParams - * @throws \RuntimeException - */ - public static function defaultHandler(array $multiParams = [], array $singleParams = []): callable - { - $future = null; - if (extension_loaded('curl')) { - $config = array_merge([ 'mh' => curl_multi_init() ], $multiParams); - if (function_exists('curl_reset')) { - $default = new CurlHandler($singleParams); - $future = new CurlMultiHandler($config); - } else { - $default = new CurlMultiHandler($config); - } - } else { - throw new \RuntimeException('OpenSearch-PHP requires cURL, or a custom HTTP handler.'); - } - - return $future ? Middleware::wrapFuture($default, $future) : $default; - } - - /** - * Get the multi handler for async (CurlMultiHandler) - * - * @throws \RuntimeException - */ - public static function multiHandler(array $params = []): CurlMultiHandler - { - if (function_exists('curl_multi_init')) { - return new CurlMultiHandler(array_merge([ 'mh' => curl_multi_init() ], $params)); - } - - throw new \RuntimeException('CurlMulti handler requires cURL.'); - } - - /** - * Get the handler instance (CurlHandler) - * - * @throws \RuntimeException - */ - public static function singleHandler(): CurlHandler - { - if (function_exists('curl_reset')) { - return new CurlHandler(); - } - - throw new \RuntimeException('CurlSingle handler requires cURL.'); - } - - /** - * Set connection Factory - * - * @param ConnectionFactoryInterface $connectionFactory - */ - public function setConnectionFactory(ConnectionFactoryInterface $connectionFactory): ClientBuilder - { - $this->connectionFactory = $connectionFactory; - - return $this; - } - - /** - * Set the connection pool (default is StaticNoPingConnectionPool) - * - * @param AbstractConnectionPool|string $connectionPool - * @param array $args - * @throws \InvalidArgumentException - */ - public function setConnectionPool($connectionPool, array $args = []): ClientBuilder - { - if (is_string($connectionPool)) { - $this->connectionPool = $connectionPool; - $this->connectionPoolArgs = $args; - } elseif (is_object($connectionPool)) { - $this->connectionPool = $connectionPool; - } else { - throw new InvalidArgumentException("Serializer must be a class path or instantiated object extending AbstractConnectionPool"); - } - - return $this; - } - /** * Set the endpoint * @@ -349,30 +164,6 @@ public function registerNamespace(NamespaceBuilderInterface $namespaceBuilder): return $this; } - /** - * Set the transport - * - * @param Transport $transport - */ - public function setTransport(Transport $transport): ClientBuilder - { - $this->transport = $transport; - - return $this; - } - - /** - * Set the HTTP handler (cURL is default) - * - * @param mixed $handler - */ - public function setHandler($handler): ClientBuilder - { - $this->handler = $handler; - - return $this; - } - /** * Set the PSR-3 Logger * @@ -384,31 +175,6 @@ public function setLogger(LoggerInterface $logger): ClientBuilder return $this; } - - /** - * Set the PSR-3 tracer - * - * @param LoggerInterface $tracer - */ - public function setTracer(LoggerInterface $tracer): ClientBuilder - { - $this->tracer = $tracer; - - return $this; - } - - /** - * Set the serializer - * - * @param \OpenSearch\Serializers\SerializerInterface|string $serializer - */ - public function setSerializer($serializer): ClientBuilder - { - $this->parseStringOrObject($serializer, $this->serializer, 'SerializerInterface'); - - return $this; - } - /** * Set the hosts (nodes) * @@ -430,21 +196,9 @@ public function setHosts(array $hosts): ClientBuilder * * @throws AuthenticationConfigException */ - public function setBasicAuthentication(string $username, string $password): ClientBuilder - { - $this->basicAuthentication = $username.':'.$password; - - return $this; - } - - /** - * Set connection parameters - * - * @param array $params - */ - public function setConnectionParams(array $params): ClientBuilder + public function setBasicAuthentication(string $username, ?string $password = null): ClientBuilder { - $this->connectionParams = $params; + $this->basicAuthentication = array_filter([$username, $password]); return $this; } @@ -461,70 +215,6 @@ public function setRetries(int $retries): ClientBuilder return $this; } - /** - * Set the selector algorithm - * - * @param \OpenSearch\ConnectionPool\Selectors\SelectorInterface|string $selector - */ - public function setSelector($selector): ClientBuilder - { - $this->parseStringOrObject($selector, $this->selector, 'SelectorInterface'); - - return $this; - } - - /** - * Set the credential provider for SigV4 request signing. The value provider should be a - * callable object that will return - * - * @param callable|bool|array|CredentialsInterface|null $credentialProvider - */ - public function setSigV4CredentialProvider($credentialProvider): ClientBuilder - { - if ($credentialProvider !== null && $credentialProvider !== false) { - $this->sigV4CredentialProvider = $this->normalizeCredentialProvider($credentialProvider); - } - - return $this; - } - - /** - * Set the region for SigV4 signing. - * - * @param string|null $region - */ - public function setSigV4Region($region): ClientBuilder - { - $this->sigV4Region = $region; - - return $this; - } - - /** - * Set the service for SigV4 signing. - * - * @param string|null $service - */ - public function setSigV4Service($service): ClientBuilder - { - $this->sigV4Service = $service; - - return $this; - } - - /** - * Set sniff on start - * - * @param bool $sniffOnStart enable or disable sniff on start - */ - - public function setSniffOnStart(bool $sniffOnStart): ClientBuilder - { - $this->sniffOnStart = $sniffOnStart; - - return $this; - } - /** * Set SSL certificate * @@ -553,51 +243,18 @@ public function setSSLKey(string $key, ?string $password = null): ClientBuilder /** * Set SSL verification - * - * @param bool|string $value */ - public function setSSLVerification($value = true): ClientBuilder + public function setSSLVerification(bool $value = true): ClientBuilder { $this->sslVerification = $value; return $this; } - - /** - * Include the port in Host header - * - * @see https://github.com/elastic/elasticsearch-php/issues/993 - */ - public function includePortInHostHeader(bool $enable): ClientBuilder - { - $this->includePortInHostHeader = $enable; - - return $this; - } - /** * Build and returns the Client object */ public function build(): Client { - $this->buildLoggers(); - - if (is_null($this->handler)) { - $this->handler = ClientBuilder::defaultHandler(); - } - - if (!is_null($this->sigV4CredentialProvider)) { - if (is_null($this->sigV4Region)) { - throw new RuntimeException("A region must be supplied for SigV4 request signing."); - } - - if (is_null($this->sigV4Service)) { - $this->setSigV4Service("es"); - } - - $this->handler = new SigV4Handler($this->sigV4Region, $this->sigV4Service, $this->sigV4CredentialProvider, $this->handler); - } - $sslOptions = null; if (isset($this->sslKey)) { $sslOptions['ssl_key'] = $this->sslKey; @@ -621,70 +278,22 @@ public function build(): Client return $handler($request); }; }; - $this->handler = $sslHandler($this->handler, $sslOptions); - } - - if (is_null($this->serializer)) { - $this->serializer = new SmartSerializer(); - } elseif (is_string($this->serializer)) { - $this->serializer = new $this->serializer(); - } - - $this->connectionParams['client']['port_in_header'] = $this->includePortInHostHeader; - - if (! is_null($this->basicAuthentication)) { - if (isset($this->connectionParams['client']['curl']) === false) { - $this->connectionParams['client']['curl'] = []; - } - - $this->connectionParams['client']['curl'] += [ - CURLOPT_HTTPAUTH => CURLAUTH_BASIC, - CURLOPT_USERPWD => $this->basicAuthentication - ]; } - if (is_null($this->connectionFactory)) { - // Make sure we are setting Content-Type and Accept (unless the user has explicitly - // overridden it - if (! isset($this->connectionParams['client']['headers'])) { - $this->connectionParams['client']['headers'] = []; - } - if (! isset($this->connectionParams['client']['headers']['Content-Type'])) { - $this->connectionParams['client']['headers']['Content-Type'] = ['application/json']; - } - if (! isset($this->connectionParams['client']['headers']['Accept'])) { - $this->connectionParams['client']['headers']['Accept'] = ['application/json']; - } - - $this->connectionFactory = new ConnectionFactory($this->handler, $this->connectionParams, $this->serializer, $this->logger, $this->tracer); - } - - if (is_null($this->hosts)) { - $this->hosts = $this->getDefaultHost(); - } - - if (is_null($this->selector)) { - $this->selector = new RoundRobinSelector(); - } elseif (is_string($this->selector)) { - $this->selector = new $this->selector(); - } - - $this->buildTransport(); + $transport = $this->buildTransport(); if (is_null($this->endpoint)) { - $serializer = $this->serializer; - - $this->endpoint = function ($class) use ($serializer) { + $this->endpoint = function ($class) { $fullPath = '\\OpenSearch\\Endpoints\\' . $class; $reflection = new ReflectionClass($fullPath); $constructor = $reflection->getConstructor(); if ($constructor && $constructor->getParameters()) { - return new $fullPath($serializer); - } else { - return new $fullPath(); + return new $fullPath(new SmartSerializer()); } + + return new $fullPath(); }; } @@ -693,10 +302,10 @@ public function build(): Client /** * @var NamespaceBuilderInterface $builder */ - $registeredNamespaces[$builder->getName()] = $builder->getObject($this->transport, $this->serializer); + $registeredNamespaces[$builder->getName()] = $builder->getObject($transport, new SmartSerializer()); } - return $this->instantiate($this->transport, $this->endpoint, $registeredNamespaces); + return $this->instantiate($transport, $this->endpoint, $registeredNamespaces); } protected function instantiate(Transport $transport, callable $endpoint, array $registeredNamespaces): Client @@ -704,156 +313,24 @@ protected function instantiate(Transport $transport, callable $endpoint, array $ return new Client($transport, $endpoint, $registeredNamespaces); } - private function buildLoggers(): void + private function buildTransport(): Transport { - if (is_null($this->logger)) { - $this->logger = new NullLogger(); - } + $transport = TransportBuilder::create() + ->setHosts($this->hosts) + ->setLogger($this->logger) + ->build(); - if (is_null($this->tracer)) { - $this->tracer = new NullLogger(); - } - } + $transport->setUserAgent('opensearch-php', Client::VERSION); - private function buildTransport(): void - { - $connections = $this->buildConnectionsFromHosts($this->hosts); - - if (is_string($this->connectionPool)) { - $this->connectionPool = new $this->connectionPool( - $connections, - $this->selector, - $this->connectionFactory, - $this->connectionPoolArgs - ); + if ($this->basicAuthentication) { + $transport->setUserInfo(...$this->basicAuthentication); } - if (is_null($this->retries)) { - $this->retries = count($connections); + if ($this->retries) { + $transport->setRetries($this->retries); } - if (is_null($this->transport)) { - $this->transport = new Transport($this->retries, $this->connectionPool, $this->logger, $this->sniffOnStart); - } - } - - private function parseStringOrObject($arg, &$destination, $interface): void - { - if (is_string($arg)) { - $destination = new $arg(); - } elseif (is_object($arg)) { - $destination = $arg; - } else { - throw new InvalidArgumentException("Serializer must be a class path or instantiated object implementing $interface"); - } - } - - private function getDefaultHost(): array - { - return ['localhost:9200']; - } - - /** - * @return ConnectionInterface[] - * @throws RuntimeException - */ - private function buildConnectionsFromHosts(array $hosts): array - { - $connections = []; - foreach ($hosts as $host) { - if (is_string($host)) { - $host = $this->prependMissingScheme($host); - $host = $this->extractURIParts($host); - } elseif (is_array($host)) { - $host = $this->normalizeExtendedHost($host); - } else { - $this->logger->error("Could not parse host: ".print_r($host, true)); - throw new RuntimeException("Could not parse host: ".print_r($host, true)); - } - - $connections[] = $this->connectionFactory->create($host); - } - - return $connections; - } - - /** - * @throws RuntimeException - */ - private function normalizeExtendedHost(array $host): array - { - if (isset($host['host']) === false) { - $this->logger->error("Required 'host' was not defined in extended format: ".print_r($host, true)); - throw new RuntimeException("Required 'host' was not defined in extended format: ".print_r($host, true)); - } - - if (isset($host['scheme']) === false) { - $host['scheme'] = 'http'; - } - if (isset($host['port']) === false) { - $host['port'] = 9200; - } - return $host; - } - - /** - * @throws InvalidArgumentException - */ - private function extractURIParts(string $host): array - { - $parts = parse_url($host); - - if ($parts === false) { - throw new InvalidArgumentException(sprintf('Could not parse URI: "%s"', $host)); - } - - if (isset($parts['port']) !== true) { - $parts['port'] = 9200; - } - - return $parts; - } - - private function prependMissingScheme(string $host): string - { - if (!preg_match("/^https?:\/\//", $host)) { - $host = 'http://' . $host; - } - - return $host; - } - - private function normalizeCredentialProvider($provider): ?callable - { - if ($provider === null || $provider === false) { - return null; - } - - if (is_callable($provider)) { - return $provider; - } - - SigV4Handler::assertDependenciesInstalled(); - - if ($provider === true) { - return CredentialProvider::defaultProvider(); - } - - if ($provider instanceof CredentialsInterface) { - return CredentialProvider::fromCredentials($provider); - } elseif (is_array($provider) && isset($provider['key']) && isset($provider['secret'])) { - return CredentialProvider::fromCredentials( - new Credentials( - $provider['key'], - $provider['secret'], - isset($provider['token']) ? $provider['token'] : null, - isset($provider['expires']) ? $provider['expires'] : null - ) - ); - } - throw new InvalidArgumentException('Credentials must be an instance of Aws\Credentials\CredentialsInterface, an' - . ' associative array that contains "key", "secret", and an optional "token" key-value pairs, a credentials' - . ' provider function, or true.'); + return new Transport($transport); } } diff --git a/src/OpenSearch/Common/EmptyLogger.php b/src/OpenSearch/Common/EmptyLogger.php deleted file mode 100644 index c2bfeaf4..00000000 --- a/src/OpenSearch/Common/EmptyLogger.php +++ /dev/null @@ -1,42 +0,0 @@ - - */ - protected $connectionPoolParams; - - /** - * @var ConnectionFactoryInterface - */ - protected $connectionFactory; - - /** - * Constructor - * - * @param ConnectionInterface[] $connections The Connections to choose from - * @param SelectorInterface $selector A Selector instance to perform the selection logic for the available connections - * @param ConnectionFactoryInterface $factory ConnectionFactory instance - * @param array $connectionPoolParams - */ - public function __construct(array $connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, array $connectionPoolParams) - { - $paramList = array('connections', 'selector', 'connectionPoolParams'); - foreach ($paramList as $param) { - if (isset($$param) === false) { - throw new InvalidArgumentException('`' . $param . '` parameter must not be null'); - } - } - - if (isset($connectionPoolParams['randomizeHosts']) === true - && $connectionPoolParams['randomizeHosts'] === true - ) { - shuffle($connections); - } - - $this->connections = $connections; - $this->seedConnections = $connections; - $this->selector = $selector; - $this->connectionPoolParams = $connectionPoolParams; - $this->connectionFactory = $factory; - } - - abstract public function nextConnection(bool $force = false): ConnectionInterface; - - abstract public function scheduleCheck(): void; -} diff --git a/src/OpenSearch/ConnectionPool/ConnectionPoolInterface.php b/src/OpenSearch/ConnectionPool/ConnectionPoolInterface.php deleted file mode 100644 index 42917d51..00000000 --- a/src/OpenSearch/ConnectionPool/ConnectionPoolInterface.php +++ /dev/null @@ -1,31 +0,0 @@ -current % count($connections)]; - - $this->current += 1; - - return $returnConnection; - } -} diff --git a/src/OpenSearch/ConnectionPool/Selectors/SelectorInterface.php b/src/OpenSearch/ConnectionPool/Selectors/SelectorInterface.php deleted file mode 100644 index eeb29086..00000000 --- a/src/OpenSearch/ConnectionPool/Selectors/SelectorInterface.php +++ /dev/null @@ -1,34 +0,0 @@ -current]->isAlive()) { - return $connections[$this->current]; - } - - $this->currentCounter += 1; - $this->current = $this->currentCounter % count($connections); - - return $connections[$this->current]; - } -} diff --git a/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php b/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php deleted file mode 100644 index 29c14d26..00000000 --- a/src/OpenSearch/ConnectionPool/SimpleConnectionPool.php +++ /dev/null @@ -1,48 +0,0 @@ - $connectionPoolParams - */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) - { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - return $this->selector->select($this->connections); - } - - public function scheduleCheck(): void - { - } -} diff --git a/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php b/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php deleted file mode 100644 index 2c6d6b83..00000000 --- a/src/OpenSearch/ConnectionPool/SniffingConnectionPool.php +++ /dev/null @@ -1,181 +0,0 @@ - $connectionPoolParams - */ - public function __construct( - $connections, - SelectorInterface $selector, - ConnectionFactoryInterface $factory, - $connectionPoolParams - ) { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - - $this->setConnectionPoolParams($connectionPoolParams); - $this->nextSniff = time() + $this->sniffingInterval; - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - $this->sniff($force); - - $size = count($this->connections); - while ($size--) { - /** - * @var Connection $connection - */ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true || $connection->ping() === true) { - return $connection; - } - } - - if ($force === true) { - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - return $this->nextConnection(true); - } - - public function scheduleCheck(): void - { - $this->nextSniff = -1; - } - - private function sniff(bool $force = false): void - { - if ($force === false && $this->nextSniff > time()) { - return; - } - - $total = count($this->connections); - - while ($total--) { - /** - * @var Connection $connection - */ - $connection = $this->selector->select($this->connections); - - if ($connection->isAlive() xor $force) { - continue; - } - - if ($this->sniffConnection($connection) === true) { - return; - } - } - - if ($force === true) { - return; - } - - foreach ($this->seedConnections as $connection) { - /** - * @var Connection $connection - */ - if ($this->sniffConnection($connection) === true) { - return; - } - } - } - - private function sniffConnection(Connection $connection): bool - { - try { - $response = $connection->sniff(); - } catch (OperationTimeoutException $exception) { - return false; - } - - $nodes = $this->parseClusterState($response); - - if (count($nodes) === 0) { - return false; - } - - $this->connections = []; - - foreach ($nodes as $node) { - $nodeDetails = [ - 'host' => $node['host'], - 'port' => $node['port'], - ]; - $this->connections[] = $this->connectionFactory->create($nodeDetails); - } - - $this->nextSniff = time() + $this->sniffingInterval; - - return true; - } - - /** - * @return list - */ - private function parseClusterState($nodeInfo): array - { - $pattern = '/([^:]*):(\d+)/'; - $hosts = []; - - foreach ($nodeInfo['nodes'] as $node) { - if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) { - if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) { - $hosts[] = [ - 'host' => $match[1], - 'port' => (int)$match[2], - ]; - } - } - } - - return $hosts; - } - - /** - * @param array $connectionPoolParams - */ - private function setConnectionPoolParams(array $connectionPoolParams): void - { - $this->sniffingInterval = (int)($connectionPoolParams['sniffingInterval'] ?? 300); - } -} diff --git a/src/OpenSearch/ConnectionPool/StaticConnectionPool.php b/src/OpenSearch/ConnectionPool/StaticConnectionPool.php deleted file mode 100644 index c074e8c8..00000000 --- a/src/OpenSearch/ConnectionPool/StaticConnectionPool.php +++ /dev/null @@ -1,105 +0,0 @@ - $connectionPoolParams - */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) - { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - $this->scheduleCheck(); - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - $skipped = []; - - $total = count($this->connections); - while ($total--) { - /** - * @var Connection $connection - */ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true) { - return $connection; - } - - if ($this->readyToRevive($connection) === true) { - if ($connection->ping() === true) { - return $connection; - } - } else { - $skipped[] = $connection; - } - } - - // All "alive" nodes failed, force pings on "dead" nodes - foreach ($skipped as $connection) { - if ($connection->ping() === true) { - return $connection; - } - } - - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - public function scheduleCheck(): void - { - foreach ($this->connections as $connection) { - $connection->markDead(); - } - } - - private function readyToRevive(Connection $connection): bool - { - $timeout = min( - $this->pingTimeout * pow(2, $connection->getPingFailures()), - $this->maxPingTimeout - ); - - if ($connection->getLastPing() + $timeout < time()) { - return true; - } else { - return false; - } - } -} diff --git a/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php b/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php deleted file mode 100644 index a4a85e32..00000000 --- a/src/OpenSearch/ConnectionPool/StaticNoPingConnectionPool.php +++ /dev/null @@ -1,88 +0,0 @@ - $connectionPoolParams - */ - public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams) - { - parent::__construct($connections, $selector, $factory, $connectionPoolParams); - } - - public function nextConnection(bool $force = false): ConnectionInterface - { - $total = count($this->connections); - while ($total--) { - /** - * @var Connection $connection -*/ - $connection = $this->selector->select($this->connections); - if ($connection->isAlive() === true) { - return $connection; - } - - if ($this->readyToRevive($connection) === true) { - return $connection; - } - } - - throw new NoNodesAvailableException("No alive nodes found in your cluster"); - } - - public function scheduleCheck(): void - { - } - - private function readyToRevive(Connection $connection): bool - { - $timeout = min( - $this->pingTimeout * pow(2, $connection->getPingFailures()), - $this->maxPingTimeout - ); - - if ($connection->getLastPing() + $timeout < time()) { - return true; - } else { - return false; - } - } -} diff --git a/src/OpenSearch/Connections/Connection.php b/src/OpenSearch/Connections/Connection.php deleted file mode 100644 index 7279f56c..00000000 --- a/src/OpenSearch/Connections/Connection.php +++ /dev/null @@ -1,789 +0,0 @@ -> - */ - protected $headers = []; - - /** - * @var bool - */ - protected $isAlive = false; - - /** - * @var float - */ - private $pingTimeout = 1; //TODO expose this - - /** - * @var int - */ - private $lastPing = 0; - - /** - * @var int - */ - private $failedPings = 0; - - /** - * @var mixed[] - */ - private $lastRequest = array(); - - /** - * @var string - */ - private $OSVersion = null; - - /** - * @param array{host: string, port?: int, scheme?: string, user?: string, pass?: string, path?: string} $hostDetails - * @param array{client?: array{headers?: array>, curl?: array}} $connectionParams - */ - public function __construct( - callable $handler, - array $hostDetails, - array $connectionParams, - SerializerInterface $serializer, - LoggerInterface $log, - LoggerInterface $trace - ) { - if (isset($hostDetails['port']) !== true) { - $hostDetails['port'] = 9200; - } - - if (isset($hostDetails['scheme'])) { - $this->transportSchema = $hostDetails['scheme']; - } - - // Only Set the Basic if API Key is not set and setBasicAuthentication was not called prior - if (isset($connectionParams['client']['headers']['Authorization']) === false - && isset($connectionParams['client']['curl'][CURLOPT_HTTPAUTH]) === false - && isset($hostDetails['user']) - && isset($hostDetails['pass']) - ) { - $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC; - $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass']; - } - - $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port']; - - if (isset($connectionParams['client']['headers'])) { - $this->headers = $connectionParams['client']['headers']; - unset($connectionParams['client']['headers']); - } - - // Add the User-Agent using the format: / (metadata-values) - $this->headers['User-Agent'] = [sprintf( - 'opensearch-php/%s (%s %s; PHP %s)', - Client::VERSION, - PHP_OS, - $this->getOSVersion(), - PHP_VERSION - )]; - - $host = $hostDetails['host']; - $path = null; - if (isset($hostDetails['path']) === true) { - $path = $hostDetails['path']; - } - $port = $hostDetails['port']; - - $this->host = $host; - $this->path = $path; - $this->port = $port; - $this->log = $log; - $this->trace = $trace; - $this->connectionParams = $connectionParams; - $this->serializer = $serializer; - - $this->handler = $this->wrapHandler($handler); - } - - /** - * @param string $method - * @param string $uri - * @param null|array $params - * @param mixed $body - * @param array $options - * @param Transport|null $transport - * @return mixed - */ - public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null) - { - if ($body !== null) { - $body = $this->serializer->serialize($body); - } - - $headers = $this->headers; - if (isset($options['client']['headers']) && is_array($options['client']['headers'])) { - $headers = array_merge($this->headers, $options['client']['headers']); - } - - $host = $this->host; - if (isset($this->connectionParams['client']['port_in_header']) && $this->connectionParams['client']['port_in_header']) { - $host .= ':' . $this->port; - } - - $request = [ - 'http_method' => $method, - 'scheme' => $this->transportSchema, - 'uri' => $this->getURI($uri, $params), - 'body' => $body, - 'headers' => array_merge( - [ - 'Host' => [$host] - ], - $headers - ) - ]; - - $request = array_replace_recursive($request, $this->connectionParams, $options); - - // RingPHP does not like if client is empty - if (empty($request['client'])) { - unset($request['client']); - } - - $handler = $this->handler; - $future = $handler($request, $this, $transport, $options); - - return $future; - } - - public function getTransportSchema(): string - { - return $this->transportSchema; - } - - public function getLastRequestInfo(): array - { - return $this->lastRequest; - } - - private function wrapHandler(callable $handler): callable - { - return function (array $request, Connection $connection, ?Transport $transport, $options) use ($handler) { - $this->lastRequest = []; - $this->lastRequest['request'] = $request; - - // Send the request using the wrapped handler. - $response = Core::proxy( - $handler($request), - function ($response) use ($connection, $transport, $request, $options) { - $this->lastRequest['response'] = $response; - - if (isset($response['error']) === true) { - if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) { - $this->log->warning("Curl exception encountered."); - - $exception = $this->getCurlRetryException($request, $response); - - $this->logRequestFail($request, $response, $exception); - - $node = $connection->getHost(); - $this->log->warning("Marking node $node dead."); - $connection->markDead(); - - // If the transport has not been set, we are inside a Ping or Sniff, - // so we don't want to retrigger retries anyway. - // - // TODO this could be handled better, but we are limited because connectionpools do not - // have access to Transport. Architecturally, all of this needs to be refactored - if (isset($transport) === true) { - $transport->connectionPool->scheduleCheck(); - - $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false; - $shouldRetry = $transport->shouldRetry($request); - $shouldRetryText = ($shouldRetry) ? 'true' : 'false'; - - $this->log->warning("Retries left? $shouldRetryText"); - if ($shouldRetry && !$neverRetry) { - return $transport->performRequest( - $request['http_method'], - $request['uri'], - [], - $request['body'], - $options - ); - } - } - - $this->log->warning("Out of retries, throwing exception from $node"); - // Only throw if we run out of retries - throw $exception; - } else { - // Something went seriously wrong, bail - $exception = new TransportException($response['error']->getMessage()); - $this->logRequestFail($request, $response, $exception); - throw $exception; - } - } else { - $connection->markAlive(); - - if (isset($response['headers']['Warning'])) { - $this->logWarning($request, $response); - } - if (isset($response['body']) === true) { - $response['body'] = stream_get_contents($response['body']); - $this->lastRequest['response']['body'] = $response['body']; - } - - if ($response['status'] >= 400 && $response['status'] < 500) { - $ignore = $request['client']['ignore'] ?? []; - // Skip 404 if succeeded true in the body (e.g. clear_scroll) - $body = $response['body'] ?? ''; - if (strpos($body, '"succeeded":true') !== false) { - $ignore[] = 404; - } - $this->process4xxError($request, $response, $ignore); - } elseif ($response['status'] >= 500) { - $ignore = $request['client']['ignore'] ?? []; - $this->process5xxError($request, $response, $ignore); - } - - // No error, deserialize - $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']); - } - $this->logRequestSuccess($request, $response); - - return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body']; - } - ); - - return $response; - }; - } - - /** - * @param array|null $params - */ - private function getURI(string $uri, ?array $params): string - { - if (isset($params) === true && !empty($params)) { - $params = array_map( - function ($value) { - if ($value === true) { - return 'true'; - } elseif ($value === false) { - return 'false'; - } - - return $value; - }, - $params - ); - - $uri .= '?' . http_build_query($params); - } - - if ($this->path !== null) { - $uri = $this->path . $uri; - } - - return $uri; - } - - /** - * @return array> - */ - public function getHeaders(): array - { - return $this->headers; - } - - public function logWarning(array $request, array $response): void - { - $this->log->warning('Deprecation', $response['headers']['Warning']); - } - - /** - * Log a successful request - * - * @param array $request - * @param array $response - * @return void - */ - public function logRequestSuccess(array $request, array $response): void - { - $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; - $uri = $this->addPortInUrl($response['effective_url'], (int) $port); - - $this->log->debug('Request Body', array($request['body'])); - $this->log->info( - 'Request Success:', - array( - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'headers' => $request['headers'], - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - ) - ); - $this->log->debug('Response', array($response['body'])); - - // Build the curl command for Trace. - $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); - $this->trace->info($curlCommand); - $this->trace->debug( - 'Response:', - array( - 'response' => $response['body'], - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - ) - ); - } - - /** - * Log a failed request - * - * @param array $request - * @param array $response - * @param \Throwable $exception - * - * @return void - */ - public function logRequestFail(array $request, array $response, \Throwable $exception): void - { - $port = $request['client']['curl'][CURLOPT_PORT] ?? $response['transfer_stats']['primary_port'] ?? ''; - $uri = $this->addPortInUrl($response['effective_url'], (int) $port); - - $this->log->debug('Request Body', array($request['body'])); - $this->log->warning( - 'Request Failure:', - array( - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'headers' => $request['headers'], - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - 'error' => $exception->getMessage(), - ) - ); - $this->log->warning('Response', array($response['body'])); - - // Build the curl command for Trace. - $curlCommand = $this->buildCurlCommand($request['http_method'], $uri, $request['body']); - $this->trace->info($curlCommand); - $this->trace->debug( - 'Response:', - array( - 'response' => $response, - 'method' => $request['http_method'], - 'uri' => $uri, - 'port' => $port, - 'HTTP code' => $response['status'], - 'duration' => $response['transfer_stats']['total_time'], - ) - ); - } - - public function ping(): bool - { - $options = [ - 'client' => [ - 'timeout' => $this->pingTimeout, - 'never_retry' => true, - 'verbose' => true - ] - ]; - try { - $response = $this->performRequest('HEAD', '/', null, null, $options); - $response = $response->wait(); - } catch (TransportException $exception) { - $this->markDead(); - - return false; - } - - if ($response['status'] === 200) { - $this->markAlive(); - - return true; - } else { - $this->markDead(); - - return false; - } - } - - /** - * @return array|\GuzzleHttp\Ring\Future\FutureArray - */ - public function sniff() - { - $options = [ - 'client' => [ - 'timeout' => $this->pingTimeout, - 'never_retry' => true - ] - ]; - - return $this->performRequest('GET', '/_nodes/', null, null, $options); - } - - public function isAlive(): bool - { - return $this->isAlive; - } - - public function markAlive(): void - { - $this->failedPings = 0; - $this->isAlive = true; - $this->lastPing = time(); - } - - public function markDead(): void - { - $this->isAlive = false; - $this->failedPings += 1; - $this->lastPing = time(); - } - - public function getLastPing(): int - { - return $this->lastPing; - } - - public function getPingFailures(): int - { - return $this->failedPings; - } - - public function getHost(): string - { - return $this->host; - } - - public function getUserPass(): ?string - { - return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null; - } - - public function getPath(): ?string - { - return $this->path; - } - - /** - * @return int - */ - public function getPort() - { - return $this->port; - } - - protected function getCurlRetryException(array $request, array $response): OpenSearchException - { - $exception = null; - $message = $response['error']->getMessage(); - $exception = new MaxRetriesException($message); - switch ($response['curl']['errno']) { - case 6: - $exception = new CouldNotResolveHostException($message, 0, $exception); - break; - case 7: - $exception = new CouldNotConnectToHost($message, 0, $exception); - break; - case 28: - $exception = new OperationTimeoutException($message, 0, $exception); - break; - } - - return $exception; - } - - /** - * Get the OS version using php_uname if available - * otherwise it returns an empty string - * - * @see https://github.com/elastic/elasticsearch-php/issues/922 - */ - private function getOSVersion(): string - { - if ($this->OSVersion === null) { - $this->OSVersion = strpos(strtolower(ini_get('disable_functions')), 'php_uname') !== false - ? '' - : php_uname("r"); - } - return $this->OSVersion; - } - - /** - * Add the port value in the URL if not present - */ - private function addPortInUrl(string $uri, int $port): string - { - if (strpos($uri, ':', 7) !== false) { - return $uri; - } - return preg_replace('#([^/])/([^/])#', sprintf("$1:%s/$2", $port), $uri, 1); - } - - /** - * Construct a string cURL command - */ - private function buildCurlCommand(string $method, string $url, ?string $body): string - { - if (strpos($url, '?') === false) { - $url .= '?pretty=true'; - } else { - str_replace('?', '?pretty=true', $url); - } - - $curlCommand = 'curl -X' . strtoupper($method); - $curlCommand .= " '" . $url . "'"; - - if (isset($body) === true && $body !== '') { - $curlCommand .= " -d '" . $body . "'"; - } - - return $curlCommand; - } - - /** - * @throws OpenSearchException - */ - private function process4xxError(array $request, array $response, array $ignore): void - { - $statusCode = $response['status']; - - /** - * @var \Exception $exception - */ - $exception = $this->tryDeserialize400Error($response); - - if (array_search($response['status'], $ignore) !== false) { - return; - } - - $responseBody = $this->convertBodyToString($response['body'], $statusCode, $exception); - if ($statusCode === 401) { - $exception = new Unauthorized401Exception($responseBody, $statusCode); - } elseif ($statusCode === 403) { - $exception = new Forbidden403Exception($responseBody, $statusCode); - } elseif ($statusCode === 404) { - $exception = new Missing404Exception($responseBody, $statusCode); - } elseif ($statusCode === 409) { - $exception = new Conflict409Exception($responseBody, $statusCode); - } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) { - $exception = new ScriptLangNotSupportedException($responseBody. $statusCode); - } elseif ($statusCode === 408) { - $exception = new RequestTimeout408Exception($responseBody, $statusCode); - } else { - $exception = new BadRequest400Exception($responseBody, $statusCode); - } - - $this->logRequestFail($request, $response, $exception); - - throw $exception; - } - - /** - * @throws OpenSearchException - */ - private function process5xxError(array $request, array $response, array $ignore): void - { - $statusCode = (int) $response['status']; - $responseBody = $response['body']; - - /** - * @var \Exception $exception - */ - $exception = $this->tryDeserialize500Error($response); - - $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage(); - $this->log->error($exceptionText); - $this->log->error($exception->getTraceAsString()); - - if (array_search($statusCode, $ignore) !== false) { - return; - } - - if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) { - $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception); - } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) { - $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception); - } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) { - $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception); - } else { - $exception = new ServerErrorResponseException( - $this->convertBodyToString($responseBody, $statusCode, $exception), - $statusCode - ); - } - - $this->logRequestFail($request, $response, $exception); - - throw $exception; - } - - private function convertBodyToString($body, int $statusCode, Exception $exception): string - { - if (empty($body)) { - return sprintf( - "Unknown %d error from OpenSearch %s", - $statusCode, - $exception->getMessage() - ); - } - // if body is not string, we convert it so it can be used as Exception message - if (!is_string($body)) { - return json_encode($body); - } - return $body; - } - - private function tryDeserialize400Error(array $response): OpenSearchException - { - return $this->tryDeserializeError($response, BadRequest400Exception::class); - } - - private function tryDeserialize500Error(array $response): OpenSearchException - { - return $this->tryDeserializeError($response, ServerErrorResponseException::class); - } - - private function tryDeserializeError(array $response, string $errorClass): OpenSearchException - { - $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']); - if (is_array($error) === true) { - if (isset($error['error']) === false) { - // <2.0 "i just blew up" nonstructured exception - // $error is an array but we don't know the format, reuse the response body instead - // added json_encode to convert into a string - return new $errorClass(json_encode($response['body']), (int) $response['status']); - } - - // 2.0 structured exceptions - if (is_array($error['error']) && array_key_exists('reason', $error['error']) === true) { - // Try to use root cause first (only grabs the first root cause) - $info = $error['error']['root_cause'][0] ?? $error['error']; - $cause = $info['reason']; - $type = $info['type']; - // added json_encode to convert into a string - $original = new $errorClass(json_encode($response['body']), $response['status']); - - return new $errorClass("$type: $cause", (int) $response['status'], $original); - } - // <2.0 semi-structured exceptions - // added json_encode to convert into a string - $original = new $errorClass(json_encode($response['body']), $response['status']); - - $errorEncoded = $error['error']; - if (is_array($errorEncoded)) { - $errorEncoded = json_encode($errorEncoded); - } - return new $errorClass($errorEncoded, (int) $response['status'], $original); - } - - // if responseBody is not string, we convert it so it can be used as Exception message - $responseBody = $response['body']; - if (!is_string($responseBody)) { - $responseBody = json_encode($responseBody); - } - - // <2.0 "i just blew up" nonstructured exception - return new $errorClass($responseBody); - } -} diff --git a/src/OpenSearch/Connections/ConnectionFactory.php b/src/OpenSearch/Connections/ConnectionFactory.php deleted file mode 100644 index f95c2f29..00000000 --- a/src/OpenSearch/Connections/ConnectionFactory.php +++ /dev/null @@ -1,81 +0,0 @@ ->, curl?: array}} $connectionParams - */ - public function __construct(callable $handler, array $connectionParams, SerializerInterface $serializer, LoggerInterface $logger, LoggerInterface $tracer) - { - $this->handler = $handler; - $this->connectionParams = $connectionParams; - $this->logger = $logger; - $this->tracer = $tracer; - $this->serializer = $serializer; - } - - public function create(array $hostDetails): ConnectionInterface - { - if (isset($hostDetails['path'])) { - $hostDetails['path'] = rtrim($hostDetails['path'], '/'); - } - - return new Connection( - $this->handler, - $hostDetails, - $this->connectionParams, - $this->serializer, - $this->logger, - $this->tracer - ); - } -} diff --git a/src/OpenSearch/Connections/ConnectionFactoryInterface.php b/src/OpenSearch/Connections/ConnectionFactoryInterface.php deleted file mode 100644 index da209abd..00000000 --- a/src/OpenSearch/Connections/ConnectionFactoryInterface.php +++ /dev/null @@ -1,30 +0,0 @@ -|null $params - * @param mixed $body - * @return mixed - */ - public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null); -} diff --git a/src/OpenSearch/Handlers/SigV4Handler.php b/src/OpenSearch/Handlers/SigV4Handler.php deleted file mode 100644 index e45639c7..00000000 --- a/src/OpenSearch/Handlers/SigV4Handler.php +++ /dev/null @@ -1,151 +0,0 @@ ->, body: string|resource|null, client?: array} - */ -class SigV4Handler -{ - /** - * @var SignatureV4 - */ - private $signer; - /** - * @var callable - */ - private $credentialProvider; - /** - * @var callable - */ - private $wrappedHandler; - - /** - * A handler that applies an AWS V4 signature before dispatching requests. - * - * @param string $region The region of your Amazon - * OpenSearch Service domain - * @param string $service The Service of your Amazon - * OpenSearch Service domain - * @param callable|null $credentialProvider A callable that returns a - * promise that is fulfilled - * with an instance of - * Aws\Credentials\Credentials - * @param callable|null $wrappedHandler A RingPHP handler - */ - public function __construct( - string $region, - string $service, - ?callable $credentialProvider = null, - ?callable $wrappedHandler = null - ) { - self::assertDependenciesInstalled(); - $this->signer = new SignatureV4($service, $region); - $this->wrappedHandler = $wrappedHandler - ?: ClientBuilder::defaultHandler(); - $this->credentialProvider = $credentialProvider - ?: CredentialProvider::defaultProvider(); - } - - /** - * @phpstan-param RingPhpRequest $request - */ - public function __invoke(array $request) - { - $creds = call_user_func($this->credentialProvider)->wait(); - - $psr7Request = $this->createPsr7Request($request); - $psr7Request = $psr7Request->withHeader('x-amz-content-sha256', Utils::hash($psr7Request->getBody(), 'sha256')); - $signedRequest = $this->signer - ->signRequest($psr7Request, $creds); - return call_user_func($this->wrappedHandler, $this->createRingRequest($signedRequest, $request)); - } - - public static function assertDependenciesInstalled(): void - { - if (!class_exists(SignatureV4::class)) { - throw new RuntimeException( - 'The AWS SDK for PHP must be installed in order to use the SigV4 signing handler' - ); - } - } - - /** - * @phpstan-param RingPhpRequest $ringPhpRequest - */ - private function createPsr7Request(array $ringPhpRequest): Request - { - // fix for uppercase 'Host' array key in elasticsearch-php 5.3.1 and backward compatible - // https://github.com/aws/aws-sdk-php/issues/1225 - $hostKey = isset($ringPhpRequest['headers']['Host']) ? 'Host' : 'host'; - - // Amazon ES/OS listens on standard ports (443 for HTTPS, 80 for HTTP). - // Consequently, the port should be stripped from the host header. - $parsedUrl = parse_url($ringPhpRequest['headers'][$hostKey][0]); - if (isset($parsedUrl['host'])) { - $ringPhpRequest['headers'][$hostKey][0] = $parsedUrl['host']; - } - - // Create a PSR-7 URI from the array passed to the handler - $uri = (new Uri($ringPhpRequest['uri'])) - ->withScheme($ringPhpRequest['scheme']) - ->withHost($ringPhpRequest['headers'][$hostKey][0]); - if (isset($ringPhpRequest['query_string'])) { - $uri = $uri->withQuery($ringPhpRequest['query_string']); - } - - // Create a PSR-7 request from the array passed to the handler - return new Request( - $ringPhpRequest['http_method'], - $uri, - $ringPhpRequest['headers'], - $ringPhpRequest['body'] - ); - } - - /** - * @phpstan-param RingPhpRequest $originalRequest - * - * @phpstan-return RingPhpRequest - */ - private function createRingRequest(RequestInterface $request, array $originalRequest): array - { - $uri = $request->getUri(); - $body = (string) $request->getBody(); - - // RingPHP currently expects empty message bodies to be null: - // https://github.com/guzzle/RingPHP/blob/4c8fe4c48a0fb7cc5e41ef529e43fecd6da4d539/src/Client/CurlFactory.php#L202 - if (empty($body)) { - $body = null; - } - - // Reset the explicit port in the URL - $client = $originalRequest['client']; - unset($client['curl'][CURLOPT_PORT]); - - $ringRequest = [ - 'http_method' => $request->getMethod(), - 'scheme' => $uri->getScheme(), - 'uri' => $uri->getPath(), - 'body' => $body, - 'headers' => $request->getHeaders(), - 'client' => $client - ]; - if ($uri->getQuery()) { - $ringRequest['query_string'] = $uri->getQuery(); - } - - return $ringRequest; - } -} diff --git a/src/OpenSearch/Namespaces/AbstractNamespace.php b/src/OpenSearch/Namespaces/AbstractNamespace.php index 0b5a7a32..6b3b53c3 100644 --- a/src/OpenSearch/Namespaces/AbstractNamespace.php +++ b/src/OpenSearch/Namespaces/AbstractNamespace.php @@ -22,14 +22,12 @@ namespace OpenSearch\Namespaces; use OpenSearch\Endpoints\AbstractEndpoint; +use OpenSearch\Helper\RequestHelper; use OpenSearch\Transport; abstract class AbstractNamespace { - /** - * @var \OpenSearch\Transport - */ - protected $transport; + protected Transport $transport; /** * @var callable @@ -51,9 +49,9 @@ public function extractArgument(array &$params, string $arg) $val = $params[$arg]; unset($params[$arg]); return $val; - } else { - return null; } + + return null; } protected function performRequest(AbstractEndpoint $endpoint) @@ -66,6 +64,6 @@ protected function performRequest(AbstractEndpoint $endpoint) $endpoint->getOptions() ); - return $this->transport->resultOrFuture($response, $endpoint->getOptions()); + return $this->transport->resultOrFuture($response); } } diff --git a/src/OpenSearch/Namespaces/BooleanRequestWrapper.php b/src/OpenSearch/Namespaces/BooleanRequestWrapper.php index 9c29f656..59859e3f 100644 --- a/src/OpenSearch/Namespaces/BooleanRequestWrapper.php +++ b/src/OpenSearch/Namespaces/BooleanRequestWrapper.php @@ -24,39 +24,19 @@ use OpenSearch\Common\Exceptions\Missing404Exception; use OpenSearch\Common\Exceptions\RoutingMissingException; use OpenSearch\Endpoints\AbstractEndpoint; +use OpenSearch\Helper\RequestHelper; use OpenSearch\Transport; -use GuzzleHttp\Ring\Future\FutureArrayInterface; abstract class BooleanRequestWrapper { - /** - * Perform Request - * - * @throws Missing404Exception - * @throws RoutingMissingException - */ - public static function performRequest(AbstractEndpoint $endpoint, Transport $transport) + public static function performRequest(AbstractEndpoint $endpoint, Transport $transport): bool { try { - $response = $transport->performRequest( - $endpoint->getMethod(), - $endpoint->getURI(), - $endpoint->getParams(), - $endpoint->getBody(), - $endpoint->getOptions() - ); + $response = $transport->performRequest($endpoint->getMethod(), $endpoint->getURI(), $endpoint->getParams(), $endpoint->getBody()); - $response = $transport->resultOrFuture($response, $endpoint->getOptions()); - if (!($response instanceof FutureArrayInterface)) { - if ($response['status'] === 200) { - return true; - } else { - return false; - } - } else { - // async mode, can't easily resolve this...punt to user - return $response; - } + $transport->resultOrFuture($response); + + return true; } catch (Missing404Exception $exception) { return false; } catch (RoutingMissingException $exception) { diff --git a/src/OpenSearch/Serializers/EverythingToJSONSerializer.php b/src/OpenSearch/Serializers/EverythingToJSONSerializer.php index fc1c4051..2b4b963b 100644 --- a/src/OpenSearch/Serializers/EverythingToJSONSerializer.php +++ b/src/OpenSearch/Serializers/EverythingToJSONSerializer.php @@ -23,11 +23,6 @@ use OpenSearch\Common\Exceptions\RuntimeException; -if (!defined('JSON_INVALID_UTF8_SUBSTITUTE')) { - //PHP < 7.2 Define it as 0 so it does nothing - define('JSON_INVALID_UTF8_SUBSTITUTE', 0); -} - class EverythingToJSONSerializer implements SerializerInterface { /** diff --git a/src/OpenSearch/Transport.php b/src/OpenSearch/Transport.php index df6b5b98..3bb92430 100644 --- a/src/OpenSearch/Transport.php +++ b/src/OpenSearch/Transport.php @@ -1,178 +1,123 @@ log = $log; - $this->connectionPool = $connectionPool; - $this->retries = $retries; - - if ($sniffOnStart === true) { - $this->log->notice('Sniff on Start.'); - $this->connectionPool->scheduleCheck(); - } - } + private \Elastic\Transport\Transport $transport; - /** - * Returns a single connection from the connection pool - * Potentially performs a sniffing step before returning - */ - public function getConnection(): ConnectionInterface + public function __construct(\Elastic\Transport\Transport $transport) { - return $this->connectionPool->nextConnection(); + $this->transport = $transport; } - /** - * Perform a request to the Cluster - * - * @param string $method HTTP method to use - * @param string $uri HTTP URI to send request to - * @param array $params Optional query parameters - * @param mixed|null $body Optional query body - * @param array $options - * - * @throws Common\Exceptions\NoNodesAvailableException|\Exception - */ - public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface + public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): ResponseInterface { - try { - $connection = $this->getConnection(); - } catch (Exceptions\NoNodesAvailableException $exception) { - $this->log->critical('No alive nodes found in cluster'); - throw $exception; + if (!empty($params)) { + $uri .= '?' . http_build_query($params, '', '&', PHP_QUERY_RFC3986); } - $response = []; - $caughtException = null; - $this->lastConnection = $connection; + $requestFactory = Psr17FactoryDiscovery::findRequestFactory(); - $future = $connection->performRequest( + $request = $requestFactory->createRequest( $method, - $uri, - $params, - $body, - $options, - $this + $uri ); - $future->promise()->then( - //onSuccess - function ($response) { - $this->retryAttempts = 0; - // Note, this could be a 4xx or 5xx error - }, - //onFailure - function ($response) { - $code = $response->getCode(); - // Ignore 400 level errors, as that means the server responded just fine - if ($code < 400 || $code >= 500) { - // Otherwise schedule a check - $this->connectionPool->scheduleCheck(); - } - } - ); + if ($body) { + $request = $request->withHeader('Content-Type', 'application/json'); + $request = $request->withBody($requestFactory->createStream(json_encode($body, JSON_THROW_ON_ERROR))); + } - return $future; + return $this->transport->sendRequest($request->withHeader('Accept', 'application/json')); } - /** - * @param FutureArrayInterface $result Response of a request (promise) - * @param array $options Options for transport - * - * @return callable|array - */ - public function resultOrFuture(FutureArrayInterface $result, array $options = []) + public function resultOrFuture(ResponseInterface $response): ?array { - $response = null; - $async = isset($options['client']['future']) ? $options['client']['future'] : null; - if (is_null($async) || $async === false) { - do { - $result = $result->wait(); - } while ($result instanceof FutureArrayInterface); + if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500) { + $this->handleClientErrorResponse($response); } - return $result; + + if ($response->getStatusCode() >= 500) { + $this->handleServerErrorResponse($response); + } + + if (str_starts_with($response->getHeader('Content-Type')[0], 'application/json')) { + return json_decode($response->getBody()->getContents(), true); + } + + throw new ClientErrorResponseException( + $response->getBody()->getContents(), + $response->getStatusCode(), + ); } - public function shouldRetry(array $request): bool + private function handleClientErrorResponse(ResponseInterface $response): void { - if ($this->retryAttempts < $this->retries) { - $this->retryAttempts += 1; + $body = $response->getBody(); + + switch ($response->getStatusCode()) { + case 401: + throw new Unauthorized401Exception($body, $response->getStatusCode()); + case 403: + throw new Forbidden403Exception($body, $response->getStatusCode()); + case 404: + throw new Missing404Exception($body, $response->getStatusCode()); + case 409: + throw new Conflict409Exception($body, $response->getStatusCode()); + case 408: + throw new RequestTimeout408Exception($body, $response->getStatusCode()); + default: + if ($response->getStatusCode() === 400 && str_contains($body, 'script_lang not supported')) { + throw new ScriptLangNotSupportedException($body, $response->getStatusCode()); + } + + throw new BadRequest400Exception($body, $response->getStatusCode()); + } + } - return true; + private function handleServerErrorResponse(ResponseInterface $response): void + { + $responseBody = $response->getBody(); + $statusCode = $response->getStatusCode(); + + if ($statusCode === 500 && str_contains($responseBody, "RoutingMissingException")) { + throw new RoutingMissingException($responseBody, $statusCode); + } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) { + throw new NoDocumentsToGetException($responseBody, $statusCode); + } elseif ($statusCode === 500 && str_contains($responseBody, 'NoShardAvailableActionException')) { + throw new NoShardAvailableException($responseBody, $statusCode); + } else { + throw new ServerErrorResponseException( + $responseBody, + $statusCode + ); } + } - return false; + public function getLastResponse(): ResponseInterface + { + return $this->transport->getLastResponse(); } - /** - * Returns the last used connection so that it may be inspected. Mainly - * for debugging/testing purposes. - */ - public function getLastConnection(): ConnectionInterface + public function getLastRequest(): RequestInterface { - return $this->lastConnection; + return $this->transport->getLastRequest(); } } diff --git a/tests/ClientBuilderTest.php b/tests/ClientBuilderTest.php index 751e3895..28fef4c4 100644 --- a/tests/ClientBuilderTest.php +++ b/tests/ClientBuilderTest.php @@ -29,92 +29,6 @@ class ClientBuilderTest extends TestCase { - /** - * @see https://github.com/elastic/elasticsearch-php/issues/993 - */ - public function testIncludePortInHostHeader() - { - $host = "localhost"; - $url = "$host:1234"; - $params = [ - 'client' => [ - 'verbose' => true - ] - ]; - $client = ClientBuilder::create() - ->setConnectionParams($params) - ->setHosts([$url]) - ->includePortInHostHeader(true) - ->build(); - - $this->assertInstanceOf(Client::class, $client); - - try { - $result = $client->info(); - } catch (OpenSearchException $e) { - $request = $client->transport->getLastConnection()->getLastRequestInfo(); - $this->assertTrue(isset($request['request']['headers']['Host'][0])); - $this->assertEquals($url, $request['request']['headers']['Host'][0]); - } - } - - /** - * @see https://github.com/elastic/elasticsearch-php/issues/993 - */ - public function testNotIncludePortInHostHeaderAsDefault() - { - $host = "localhost"; - $url = "$host:1234"; - $params = [ - 'client' => [ - 'verbose' => true - ] - ]; - $client = ClientBuilder::create() - ->setConnectionParams($params) - ->setHosts([$url]) - ->build(); - - $this->assertInstanceOf(Client::class, $client); - - try { - $result = $client->info(); - } catch (OpenSearchException $e) { - $request = $client->transport->getLastConnection()->getLastRequestInfo(); - $this->assertTrue(isset($request['request']['headers']['Host'][0])); - $this->assertEquals($host, $request['request']['headers']['Host'][0]); - } - } - - /** - * @see https://github.com/elastic/elasticsearch-php/issues/993 - */ - public function testNotIncludePortInHostHeader() - { - $host = "localhost"; - $url = "$host:1234"; - $params = [ - 'client' => [ - 'verbose' => true - ] - ]; - $client = ClientBuilder::create() - ->setConnectionParams($params) - ->setHosts([$url]) - ->includePortInHostHeader(false) - ->build(); - - $this->assertInstanceOf(Client::class, $client); - - try { - $result = $client->info(); - } catch (OpenSearchException $e) { - $request = $client->transport->getLastConnection()->getLastRequestInfo(); - $this->assertTrue(isset($request['request']['headers']['Host'][0])); - $this->assertEquals($host, $request['request']['headers']['Host'][0]); - } - } - public function getConfig() { return [ @@ -161,16 +75,6 @@ public function testFromConfigQuiteFalseWithUnknownKey() ); } - public function testFromConfigUsingBasicAuthentication() - { - $config = [ - 'basicAuthentication' => ["foo", "bar"], - 'connectionParams' => [], - ]; - $client = ClientBuilder::fromConfig($config); - - $this->assertEquals('foo:bar', $client->transport->getConnection()->getUserPass()); - } public function testCompatibilityHeaderDefaultIsOff() { @@ -180,36 +84,9 @@ public function testCompatibilityHeaderDefaultIsOff() try { $client->info(); } catch (OpenSearchException $e) { - $request = $client->transport->getLastConnection()->getLastRequestInfo(); - $this->assertSame(['application/json'], $request['request']['headers']['Content-Type']); - $this->assertSame(['application/json'], $request['request']['headers']['Accept']); - } - } - - public function testFromConfigWithIncludePortInHostHeader() - { - $url = 'localhost:1234'; - $config = [ - 'hosts' => [$url], - 'includePortInHostHeader' => true, - 'connectionParams' => [ - 'client' => [ - 'verbose' => true - ] - ], - ]; - - $client = ClientBuilder::fromConfig($config); - - $this->assertInstanceOf(Client::class, $client); - - try { - $client->info(); - $this->assertTrue(false, 'Exception was not thrown!'); - } catch (OpenSearchException $e) { - $request = $client->transport->getLastConnection()->getLastRequestInfo(); - $this->assertTrue(isset($request['request']['headers']['Host'][0])); - $this->assertEquals($url, $request['request']['headers']['Host'][0]); + $request = $client->transport->getLastRequest(); + $this->assertSame(['application/json'], $request->getHeader('Content-Type')); + $this->assertSame(['application/json'], $request->getHeader('Accept')); } } } diff --git a/tests/ClientTest.php b/tests/ClientTest.php index 43470607..bea9bbb5 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -41,14 +41,6 @@ public function tearDown(): void m::close(); } - public function testConstructorIllegalPort() - { - $this->expectException(\OpenSearch\Common\Exceptions\InvalidArgumentException::class); - $this->expectExceptionMessage('Could not parse URI'); - - $client = OpenSearch\ClientBuilder::create()->setHosts(['localhost:abc'])->build(); - } - public function testFromConfig() { $params = [ @@ -56,7 +48,6 @@ public function testFromConfig() 'localhost:9200' ], 'retries' => 2, - 'handler' => ClientBuilder::multiHandler() ]; $client = ClientBuilder::fromConfig($params); @@ -173,231 +164,6 @@ public function testMaxRetriesException() } } - public function testInlineHosts() - { - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'localhost:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("localhost", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'http://localhost:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("localhost", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'http://foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'https://foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'https://user:pass@foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - $this->assertSame("user:pass", $host->getUserPass()); - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - 'https://user:pass@the_foo.com:9200' - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("the_foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - $this->assertSame("user:pass", $host->getUserPass()); - } - - public function testExtendedHosts() - { - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'localhost', - 'port' => 9200, - 'scheme' => 'http' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("localhost", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'port' => 9200, - 'scheme' => 'http' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'port' => 9200, - 'scheme' => 'https' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'scheme' => 'http' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'port' => 9500, - 'scheme' => 'https' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9500, $host->getPort()); - $this->assertSame("https", $host->getTransportSchema()); - - - try { - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'port' => 9200, - 'scheme' => 'http' - ] - ] - )->build(); - $this->fail("Expected RuntimeException from missing host, none thrown"); - } catch (OpenSearch\Common\Exceptions\RuntimeException $e) { - // good - } - - // Underscore host, questionably legal - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'the_foo.com' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("the_foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - - - // Special characters in user/pass, would break inline - $client = OpenSearch\ClientBuilder::create()->setHosts( - [ - [ - 'host' => 'foo.com', - 'user' => 'user', - 'pass' => 'abc#$@?%!abc' - ] - ] - )->build(); - $host = $client->transport->getConnection(); - $this->assertSame("foo.com", $host->getHost()); - $this->assertSame(9200, $host->getPort()); - $this->assertSame("http", $host->getTransportSchema()); - $this->assertSame("user:abc#$@?%!abc", $host->getUserPass()); - } - - public function testClientLazy() - { - $handler = new MockHandler([ - 'status' => 200, - 'transfer_stats' => [ - 'total_time' => 100 - ], - 'body' => '{test}', - 'effective_url' => 'localhost' - ]); - $builder = ClientBuilder::create(); - $builder->setHosts(['somehost']); - $builder->setHandler($handler); - $client = $builder->build(); - - $params = [ - 'client' => [ - 'future' => 'lazy', - ] - ]; - $result = $client->info($params); - $this->assertInstanceOf(FutureArray::class, $result); - } - public function testExtractArgumentIterable() { $client = OpenSearch\ClientBuilder::create()->build(); diff --git a/tests/ConnectionPool/ConnectionFactoryTest.php b/tests/ConnectionPool/ConnectionFactoryTest.php deleted file mode 100644 index 98e1e112..00000000 --- a/tests/ConnectionPool/ConnectionFactoryTest.php +++ /dev/null @@ -1,66 +0,0 @@ -create(['host' => 'localhost']); - static::assertNull($connection->getPath()); - } - - /** - * @dataProvider pathDataProvider - */ - public function testConnectionWithPath(string $path, string $expectedPath): void - { - $factory = new ConnectionFactory( - function () { - }, - [], - new ArrayToJSONSerializer(), - new NullLogger(), - new NullLogger() - ); - - $connection = $factory->create(['host' => 'localhost', 'path' => $path]); - static::assertSame($expectedPath, $connection->getPath()); - } - - public function pathDataProvider(): array - { - return [ - ['/', ''], - ['/foo', '/foo'], - ['/foo/', '/foo'], - ]; - } -} diff --git a/tests/ConnectionPool/Selectors/RoundRobinSelectorTest.php b/tests/ConnectionPool/Selectors/RoundRobinSelectorTest.php deleted file mode 100644 index b3c8c3e1..00000000 --- a/tests/ConnectionPool/Selectors/RoundRobinSelectorTest.php +++ /dev/null @@ -1,114 +0,0 @@ -getMockBuilder(ConnectionInterface::class) - ->disableOriginalConstructor() - ->getMock(); - } - - // select ten - $this->assertSame($mockConnections[0], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[1], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[2], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[3], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[4], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[5], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[6], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[7], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[8], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[9], $roundRobin->select($mockConnections)); - - // select five - should start from the first one (index: 0) - $this->assertSame($mockConnections[0], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[1], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[2], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[3], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[4], $roundRobin->select($mockConnections)); - } - - /** - * Add Ten connections, select five, remove three, test another 10 to check - * that the round-robining works after removing connections - * - * @covers \OpenSearch\ConnectionPool\Selectors\RoundRobinSelector::select - * - * @return void - */ - public function testAddTenConnectionsTestFiveRemoveThreeTestTen() - { - $roundRobin = new OpenSearch\ConnectionPool\Selectors\RoundRobinSelector(); - - $mockConnections = []; - foreach (range(0, 9) as $index) { - $mockConnections[$index] = $this->getMockBuilder(ConnectionInterface::class) - ->disableOriginalConstructor() - ->getMock(); - } - - // select five - $this->assertSame($mockConnections[0], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[1], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[2], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[3], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[4], $roundRobin->select($mockConnections)); - - // remove three - unset($mockConnections[8]); - unset($mockConnections[9]); - - // select ten after removal - $this->assertSame($mockConnections[5], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[6], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[7], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[0], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[1], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[2], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[3], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[4], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[5], $roundRobin->select($mockConnections)); - $this->assertSame($mockConnections[6], $roundRobin->select($mockConnections)); - } -} diff --git a/tests/ConnectionPool/Selectors/StickyRoundRobinSelectorTest.php b/tests/ConnectionPool/Selectors/StickyRoundRobinSelectorTest.php deleted file mode 100644 index af6fae3b..00000000 --- a/tests/ConnectionPool/Selectors/StickyRoundRobinSelectorTest.php +++ /dev/null @@ -1,80 +0,0 @@ -expects('isAlive')->times(16)->andReturns(true); - - $mockConnections[] = $mockConnection; - - foreach (range(0, 9) as $index) { - $mockConnections[] = m::mock(ConnectionInterface::class); - } - - foreach (range(0, 15) as $index) { - $retConnection = $roundRobin->select($mockConnections); - - $this->assertSame($mockConnections[0], $retConnection); - } - } - - public function testTenConnectionsFirstDies() - { - $roundRobin = new OpenSearch\ConnectionPool\Selectors\StickyRoundRobinSelector(); - - $mockConnections = []; - $mockConnectionNotAlive = m::mock(ConnectionInterface::class); - $mockConnectionNotAlive->expects('isAlive')->andReturns(false); - - $mockConnections[] = $mockConnectionNotAlive; - - $mockConnectionAlive = m::mock(ConnectionInterface::class); - $mockConnectionAlive->expects('isAlive')->times(15)->andReturns(true); - - $mockConnections[] = $mockConnectionAlive; - - foreach (range(0, 8) as $index) { - $mockConnections[] = m::mock(ConnectionInterface::class); - } - - foreach (range(0, 15) as $index) { - $retConnection = $roundRobin->select($mockConnections); - - $this->assertSame($mockConnections[1], $retConnection); - } - } -} diff --git a/tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php b/tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php deleted file mode 100644 index e73b1794..00000000 --- a/tests/ConnectionPool/SniffingConnectionPoolIntegrationTest.php +++ /dev/null @@ -1,50 +0,0 @@ -setHosts([Utility::getHost()]) - ->setConnectionPool(SniffingConnectionPool::class, ['sniffingInterval' => 10]) - ->setSSLVerification(false) - ->build(); - - $pinged = $client->ping(); - $this->assertTrue($pinged); - } -} diff --git a/tests/ConnectionPool/SniffingConnectionPoolTest.php b/tests/ConnectionPool/SniffingConnectionPoolTest.php deleted file mode 100644 index 5935b14e..00000000 --- a/tests/ConnectionPool/SniffingConnectionPoolTest.php +++ /dev/null @@ -1,293 +0,0 @@ -clusterState(1); - $connection = $this->createMock(Connection::class); - $connection->method('isAlive')->willReturn(true); - $connection->method('sniff')->willReturn($clusterState); - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturn($connection); - - $connectionPool = new SniffingConnectionPool( - [$connection], - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $this->assertSame($connection, $connectionPool->nextConnection()); - } - - /** @test */ - public function itShouldSniffNewConnectionsWhenPossible(): void - { - $clusterState = $this->clusterState(2); - $originalConnection = $this->createMock(Connection::class); - $originalConnection->method('isAlive')->willReturn(false); - $originalConnection->method('sniff')->willReturn($clusterState); - $discoveredConnection = $this->createMock(Connection::class); - $discoveredConnection->method('isAlive')->willReturn(true); - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls($originalConnection, $discoveredConnection); - - $connectionPool = new SniffingConnectionPool( - [$originalConnection], - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $actualConnection = $connectionPool->nextConnection(); - - $this->assertSame($discoveredConnection, $actualConnection); - } - - /** @test */ - public function forceNextConnection(): void - { - $clusterState = $this->clusterState(2); - $firstConnection = $this->createMock(Connection::class); - $firstConnection->method('isAlive')->willReturn(true); - $firstConnection->method('sniff')->willReturn($clusterState); - $secondConnection = $this->createMock(Connection::class); - $secondConnection->method('isAlive')->willReturn(true); - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); - - $connectionPool = new SniffingConnectionPool( - [$firstConnection, $secondConnection], - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $this->assertSame($secondConnection, $connectionPool->nextConnection(true)); - } - - - /** @test */ - public function itShouldReturnFirstSeededConnectionIfAlive(): void - { - $clusterState = $this->clusterState(10); - $connections = []; - for ($i = 1; $i <= 10; $i++) { - $connection = $this->createMock(Connection::class); - $connection->method('isAlive')->willReturn(true); - $connection->method('sniff')->willReturn($clusterState); - $connections[] = $connection; - } - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); - - $connectionPool = new SniffingConnectionPool( - $connections, - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $this->assertSame($connections[0], $connectionPool->nextConnection()); - } - - /** @test */ - public function itShouldReturnTheFirstAvailableConnection(): void - { - $clusterState = $this->clusterState(10); - $connections = []; - for ($i = 1; $i <= 10; $i++) { - $connection = $this->createMock(Connection::class); - $connection->method('isAlive')->willReturn(false); - $connection->method('sniff')->willReturn($clusterState); - $connections[] = $connection; - } - $randomLiveConnectionIndex = random_int(0, 9); - $connections[$randomLiveConnectionIndex] = $this->createMock(Connection::class); - $connections[$randomLiveConnectionIndex]->method('isAlive')->willReturn(true); - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); - - $connectionPool = new SniffingConnectionPool( - $connections, - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $this->assertSame($connections[$randomLiveConnectionIndex], $connectionPool->nextConnection()); - } - - /** @test */ - public function itShouldFailIfAllNodesAreDown(): void - { - $clusterState = $this->clusterState(10); - $connections = []; - for ($i = 1; $i <= 10; $i++) { - $connection = $this->createMock(Connection::class); - $connection->method('isAlive')->willReturn(false); - $connection->method('sniff')->willReturn($clusterState); - $connections[] = $connection; - } - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); - - $connectionPool = new SniffingConnectionPool( - $connections, - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $this->expectException(NoNodesAvailableException::class); - - $connectionPool->nextConnection(); - } - - /** @test */ - public function sniffShouldStopIfAllSniffRequestsFail(): void - { - $connection = $this->createMock(Connection::class); - $connection->method('isAlive')->willReturn(false); - $connection->method('sniff')->willThrowException(new OperationTimeoutException()); - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - - $connectionPool = new SniffingConnectionPool( - [$connection], - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $this->expectException(NoNodesAvailableException::class); - $connectionFactory->expects($this->never())->method('create'); - - $connectionPool->nextConnection(); - } - - /** @test */ - public function sniffShouldStopIfNodesAreEmpty(): void - { - $clusterState = $this->clusterState(0); - $connection = $this->createMock(Connection::class); - $connection->method('isAlive')->willReturn(false); - $connection->method('sniff')->willReturn($clusterState); - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - - $connectionPool = new SniffingConnectionPool( - [$connection], - $selector, - $connectionFactory, - ['sniffingInterval' => 0] - ); - - $this->expectException(NoNodesAvailableException::class); - $connectionFactory->expects($this->never())->method('create'); - - $connectionPool->nextConnection(); - } - - /** @test */ - public function itShouldNotSniffBeforeScheduledSniffTime(): void - { - $connection = $this->createMock(Connection::class); - $connection->method('isAlive')->willReturn(false); - $connection->method('sniff')->willReturn($this->clusterState(2)); - $selector = new RoundRobinSelector(); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - - $connectionPool = new SniffingConnectionPool( - [$connection], - $selector, - $connectionFactory, - ['sniffingInterval' => 300] - ); - - $connectionFactory->expects($this->never())->method('create'); - $this->expectException(NoNodesAvailableException::class); - - $connectionPool->nextConnection(); - } - - /** @test */ - public function scheduleCheck(): void - { - $clusterState = $this->clusterState(2); - $firstConnection = $this->createMock(Connection::class); - $firstConnection->method('isAlive')->willReturn(true); - $firstConnection->method('sniff')->willReturn($clusterState); - $secondConnection = $this->createMock(Connection::class); - $secondConnection->method('isAlive')->willReturn(true); - $selector = $this->createMock(RoundRobinSelector::class); - $selector->expects($this->exactly(2))->method('select')->willReturnOnConsecutiveCalls( - $firstConnection, - $secondConnection - ); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); - - $connectionPool = new SniffingConnectionPool( - [$firstConnection], - $selector, - $connectionFactory, - ['sniffingInterval' => 300] - ); - - $connectionPool->scheduleCheck(); - - $this->assertSame($secondConnection, $connectionPool->nextConnection()); - } - - private function clusterState(int $numberOfNodes): array - { - $clusterState = ['nodes' => []]; - - for ($i = 1; $i <= $numberOfNodes; $i++) { - $clusterState['nodes']["node-$i"] = [ - 'http' => [ - 'publish_address' => "172.17.0.2:920$i", - ], - ]; - } - - return $clusterState; - } -} diff --git a/tests/ConnectionPool/StaticConnectionPoolIntegrationTest.php b/tests/ConnectionPool/StaticConnectionPoolIntegrationTest.php deleted file mode 100644 index 0f3462c3..00000000 --- a/tests/ConnectionPool/StaticConnectionPoolIntegrationTest.php +++ /dev/null @@ -1,66 +0,0 @@ -host = Utility::getHost(); - } - - // Issue #636 - public function test404Liveness() - { - $client = \OpenSearch\ClientBuilder::create() - ->setHosts([$this->host]) - ->setConnectionPool(\OpenSearch\ConnectionPool\StaticConnectionPool::class) - ->setSSLVerification(false) - ->build(); - - $connection = $client->transport->getConnection(); - - // Ensure connection is dead - $connection->markDead(); - - // The index doesn't exist, but the server is up so this will return a 404 - $this->assertFalse($client->indices()->exists(['index' => 'not_existing_index'])); - - // But the node should be marked as alive since the server responded - $this->assertTrue($connection->isAlive()); - } -} diff --git a/tests/ConnectionPool/StaticConnectionPoolTest.php b/tests/ConnectionPool/StaticConnectionPoolTest.php deleted file mode 100644 index c355253f..00000000 --- a/tests/ConnectionPool/StaticConnectionPoolTest.php +++ /dev/null @@ -1,225 +0,0 @@ -expects('isAlive')->andReturns(true); - $mockConnection->expects('markDead'); - - /** - * @var \OpenSearch\Connections\ConnectionInterface[]&\Mockery\MockInterface[] $connections - */ - $connections = [$mockConnection]; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select') - ->andReturns($connections[0]) - ->getMock(); - - $connectionFactory = m::mock(ConnectionFactory::class); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - ]; - $connectionPool = new StaticConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($mockConnection, $retConnection); - } - - public function testAddMultipleHostsThenGetFirst() - { - $connections = []; - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('isAlive')->between(0, 1)->andReturns(true); - $mockConnection->expects('markDead')->once(); - - $connections[] = $mockConnection; - } - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select') - ->andReturns($connections[0]); - - $connectionFactory = m::mock(ConnectionFactory::class); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - ]; - $connectionPool = new StaticConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($connections[0], $retConnection); - } - - public function testAllHostsFailPing() - { - $connections = []; - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(false); - $mockConnection->expects('isAlive')->andReturns(false); - $mockConnection->expects('markDead'); - $mockConnection->expects('getPingFailures')->andReturns(0); - $mockConnection->expects('getLastPing')->andReturns(time()); - - $connections[] = $mockConnection; - } - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($connections); - - $connectionFactory = m::mock(ConnectionFactory::class); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - ]; - $connectionPool = new StaticConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); - - $connectionPool->nextConnection(); - } - - public function testAllExceptLastHostFailPingRevivesInSkip() - { - $connections = []; - - foreach (range(1, 9) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(false); - $mockConnection->expects('isAlive')->andReturns(false); - $mockConnection->expects('markDead'); - $mockConnection->expects('getPingFailures')->andReturns(0); - $mockConnection->expects('getLastPing')->andReturns(time()); - - $connections[] = $mockConnection; - } - - $goodConnection = m::mock(Connection::class); - $goodConnection->expects('ping')->andReturns(true); - $goodConnection->expects('isAlive')->andReturns(false); - $goodConnection->expects('markDead'); - $goodConnection->expects('getPingFailures')->andReturns(0); - $goodConnection->expects('getLastPing')->andReturns(time()); - - $connections[] = $goodConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($connections); - - $connectionFactory = m::mock(ConnectionFactory::class); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - ]; - $connectionPool = new StaticConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $ret = $connectionPool->nextConnection(); - $this->assertSame($goodConnection, $ret); - } - - public function testAllExceptLastHostFailPingRevivesPreSkip() - { - $connections = []; - - foreach (range(1, 9) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->between(0, 1)->andReturns(false); - $mockConnection->expects('isAlive')->andReturns(false); - $mockConnection->expects('markDead'); - $mockConnection->expects('getPingFailures')->andReturns(0); - $mockConnection->expects('getLastPing')->andReturns(time()); - - $connections[] = $mockConnection; - } - - $goodConnection = m::mock(Connection::class); - $goodConnection->expects('ping')->andReturns(true); - $goodConnection->expects('isAlive')->andReturns(false); - $goodConnection->expects('markDead'); - $goodConnection->expects('getPingFailures')->andReturns(0); - $goodConnection->expects('getLastPing')->andReturns(time() - 10000); - - $connections[] = $goodConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($connections); - - $connectionFactory = m::mock(ConnectionFactory::class); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - ]; - $connectionPool = new StaticConnectionPool($connections, $selector, $connectionFactory, $connectionPoolParams); - - $ret = $connectionPool->nextConnection(); - $this->assertSame($goodConnection, $ret); - } - - public function testCustomConnectionPoolIT() - { - $clientBuilder = ClientBuilder::create(); - $clientBuilder->setHosts(['localhost:1']); - $client = $clientBuilder - ->setRetries(0) - ->setConnectionPool(StaticConnectionPool::class, []) - ->build(); - - $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); - - $client->search([]); - } -} diff --git a/tests/Connections/ConnectionTest.php b/tests/Connections/ConnectionTest.php deleted file mode 100644 index ee04491a..00000000 --- a/tests/Connections/ConnectionTest.php +++ /dev/null @@ -1,504 +0,0 @@ -logger = $this->createMock(LoggerInterface::class); - $this->trace = $this->createMock(LoggerInterface::class); - $this->serializer = $this->createMock(SerializerInterface::class); - } - - public function testConstructor() - { - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - function () { - }, - $host, - [], - $this->serializer, - $this->logger, - $this->trace - ); - - $this->assertInstanceOf(Connection::class, $connection); - } - - /** - * @depends testConstructor - */ - public function testGetHeadersContainUserAgent() - { - $params = []; - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - function () { - }, - $host, - $params, - $this->serializer, - $this->logger, - $this->trace - ); - - $headers = $connection->getHeaders(); - - $this->assertArrayHasKey('User-Agent', $headers); - $this->assertStringContainsString('opensearch-php/'. Client::VERSION, $headers['User-Agent'][0]); - } - - /** - * @depends testGetHeadersContainUserAgent - */ - public function testUserAgentHeaderIsSent() - { - $params = []; - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - $params, - $this->serializer, - $this->logger, - $this->trace - ); - $result = $connection->performRequest('GET', '/'); - $request = $connection->getLastRequestInfo()['request']; - - $this->assertArrayHasKey('User-Agent', $request['headers']); - $this->assertStringContainsString('opensearch-php/'. Client::VERSION, $request['headers']['User-Agent'][0]); - } - - /** - * @depends testConstructor - */ - public function testGetHeadersContainsHostArrayConfig() - { - $host = [ - 'host' => 'localhost', - 'user' => 'foo', - 'pass' => 'bar', - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - [], - $this->serializer, - $this->logger, - $this->trace - ); - $result = $connection->performRequest('GET', '/'); - $request = $connection->getLastRequestInfo()['request']; - - $this->assertArrayHasKey(CURLOPT_HTTPAUTH, $request['client']['curl']); - $this->assertArrayHasKey(CURLOPT_USERPWD, $request['client']['curl']); - $this->assertArrayNotHasKey('Authorization', $request['headers']); - $this->assertStringContainsString('foo:bar', $request['client']['curl'][CURLOPT_USERPWD]); - } - - /** - * @depends testGetHeadersContainsHostArrayConfig - */ - public function testGetHeadersContainApiKeyAuth() - { - $params = ['client' => ['headers' => [ - 'Authorization' => [ - 'ApiKey ' . base64_encode(random_bytes(10)) - ] - ] ] ]; - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - $params, - $this->serializer, - $this->logger, - $this->trace - ); - $result = $connection->performRequest('GET', '/'); - $request = $connection->getLastRequestInfo()['request']; - - $this->assertArrayHasKey('Authorization', $request['headers']); - $this->assertArrayNotHasKey(CURLOPT_HTTPAUTH, $request['headers']); - $this->assertStringContainsString($params['client']['headers']['Authorization'][0], $request['headers']['Authorization'][0]); - } - - /** - * @depends testGetHeadersContainApiKeyAuth - */ - public function testGetHeadersContainApiKeyAuthOverHostArrayConfig() - { - $params = ['client' => ['headers' => [ - 'Authorization' => [ - 'ApiKey ' . base64_encode(random_bytes(10)) - ] - ] ] ]; - $host = [ - 'host' => 'localhost', - 'user' => 'foo', - 'pass' => 'bar', - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - $params, - $this->serializer, - $this->logger, - $this->trace - ); - $result = $connection->performRequest('GET', '/'); - $request = $connection->getLastRequestInfo()['request']; - - $this->assertArrayHasKey('Authorization', $request['headers']); - $this->assertArrayNotHasKey(CURLOPT_HTTPAUTH, $request['headers']); - $this->assertStringContainsString($params['client']['headers']['Authorization'][0], $request['headers']['Authorization'][0]); - } - - /** - * @depends testGetHeadersContainsHostArrayConfig - */ - public function testGetHeadersContainBasicAuth() - { - $params = ['client' => ['curl' => [ - CURLOPT_HTTPAUTH => CURLAUTH_BASIC, - CURLOPT_USERPWD => 'username:password', - ] ] ]; - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - $params, - $this->serializer, - $this->logger, - $this->trace - ); - $result = $connection->performRequest('GET', '/'); - $request = $connection->getLastRequestInfo()['request']; - - $this->assertArrayHasKey(CURLOPT_HTTPAUTH, $request['client']['curl']); - $this->assertArrayHasKey(CURLOPT_USERPWD, $request['client']['curl']); - $this->assertArrayNotHasKey('Authorization', $request['headers']); - $this->assertStringContainsString($params['client']['curl'][CURLOPT_USERPWD], $request['client']['curl'][CURLOPT_USERPWD]); - } - - /** - * @depends testGetHeadersContainBasicAuth - */ - public function testGetHeadersContainBasicAuthOverHostArrayConfig() - { - $params = ['client' => ['curl' => [ - CURLOPT_HTTPAUTH => CURLAUTH_BASIC, - CURLOPT_USERPWD => 'username:password', - ] ] ]; - $host = [ - 'host' => 'localhost', - 'user' => 'foo', - 'pass' => 'bar', - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - $params, - $this->serializer, - $this->logger, - $this->trace - ); - $connection->performRequest('GET', '/'); - $request = $connection->getLastRequestInfo()['request']; - - $this->assertArrayHasKey(CURLOPT_HTTPAUTH, $request['client']['curl']); - $this->assertArrayHasKey(CURLOPT_USERPWD, $request['client']['curl']); - $this->assertArrayNotHasKey('Authorization', $request['headers']); - $this->assertStringContainsString('username:password', $request['client']['curl'][CURLOPT_USERPWD]); - } - - /** - * @see https://github.com/elastic/elasticsearch-php/issues/977 - */ - public function testTryDeserializeErrorWithMasterNotDiscoveredException() - { - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - function () { - }, - $host, - [], - new SmartSerializer(), - $this->logger, - $this->trace - ); - - $reflection = new ReflectionClass(Connection::class); - $tryDeserializeError = $reflection->getMethod('tryDeserializeError'); - $tryDeserializeError->setAccessible(true); - - $body = '{"error":{"root_cause":[{"type":"master_not_discovered_exception","reason":null}],"type":"master_not_discovered_exception","reason":null},"status":503}'; - $response = [ - 'transfer_stats' => [], - 'status' => 503, - 'body' => $body - ]; - - $result = $tryDeserializeError->invoke($connection, $response, ServerErrorResponseException::class); - $this->assertInstanceOf(ServerErrorResponseException::class, $result); - $this->assertStringContainsString('master_not_discovered_exception', $result->getMessage()); - } - - /** - * @see https://github.com/opensearch-project/opensearch-php/issues/167 - */ - public function testTryDeserializeErrorWith403Error() - { - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - function () { - }, - $host, - [], - new SmartSerializer(), - $this->logger, - $this->trace - ); - - $reflection = new ReflectionClass(Connection::class); - $tryDeserializeError = $reflection->getMethod('tryDeserializeError'); - $tryDeserializeError->setAccessible(true); - - $body = '{"status":403,"error":{"reason":"403 Forbidden","type":"Forbidden"}}'; - $response = [ - 'transfer_stats' => [], - 'status' => 403, - 'body' => $body - ]; - - $result = $tryDeserializeError->invoke($connection, $response, ServerErrorResponseException::class); - $this->assertInstanceOf(ServerErrorResponseException::class, $result); - $this->assertStringContainsString('403 Forbidden', $result->getMessage()); - } - - public function testHeaderClientParamIsResetAfterSent() - { - $host = [ - 'host' => 'localhost' - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - [], - new SmartSerializer(), - $this->logger, - $this->trace - ); - - $options = [ - 'client' => [ - 'headers' => [ - 'Foo' => [ 'Bar' ] - ] - ] - ]; - - $headersBefore = $connection->getHeaders(); - $connection->performRequest('GET', '/', null, null, $options); - $headersAfter = $connection->getHeaders(); - $this->assertEquals($headersBefore, $headersAfter); - } - - public function testParametersAreSent() - { - $connectionParams = []; - $host = [ - 'host' => 'localhost' - ]; - $requestParams = [ - 'foo' => true, - 'baz' => false, - 'bar' => 'baz' - ]; - - $connection = new Connection( - ClientBuilder::defaultHandler(), - $host, - $connectionParams, - $this->serializer, - $this->logger, - $this->trace - ); - $connection->performRequest('GET', '/', $requestParams); - $request = $connection->getLastRequestInfo()['request']; - - $this->assertEquals('/?foo=true&baz=false&bar=baz', $request['uri']); - } - - public function testPortInUrlWhenLogRequestSuccess() - { - $logger = new ArrayLogger(); - $trace = new ArrayLogger(); - - $connection = new Connection( - ClientBuilder::defaultHandler(), - [ - 'host' => 'localhost', - 'port' => 9200, - 'scheme' => 'http', - 'path' => '/info' - ], - [], - $this->serializer, - $logger, - $trace - ); - $request = [ - 'body' => '{}', - 'http_method' => 'GET', - 'headers' => [ - 'User-Agent: Testing' - ] - ]; - $response = [ - 'effective_url' => 'http://localhost/info', - 'status' => 200, - 'transfer_stats' => [ - 'primary_port' => 9200, - 'total_time' => 1 - ], - 'body' => '{}' - ]; - $connection->logRequestSuccess($request, $response); - // Check for localhost:9200 in trace - foreach ($trace->output as $row) { - $this->assertStringContainsString('localhost:9200', $row); - } - // Check for localhost:9200 in logger - foreach ($logger->output as $row) { - if (false !== strpos('info: Request Success', $row)) { - $this->assertStringContainsString('localhost:9200', $row); - } - } - } - - public function testPortInLogUrlWhenLogRequestFail() - { - $logger = new ArrayLogger(); - $trace = new ArrayLogger(); - - $connection = new Connection( - ClientBuilder::defaultHandler(), - [ - 'host' => 'localhost', - 'port' => 9200, - 'scheme' => 'http', - 'path' => '/info' - ], - [], - $this->serializer, - $logger, - $trace - ); - $request = [ - 'body' => '{}', - 'http_method' => 'GET', - 'headers' => [ - 'User-Agent: Testing' - ] - ]; - $response = [ - 'effective_url' => 'http://localhost/info', - 'status' => 400, - 'transfer_stats' => [ - 'primary_port' => 9200, - 'total_time' => 1 - ], - 'body' => '{}' - ]; - $connection->logRequestFail($request, $response, new Exception()); - - // Check for localhost:9200 in trace - foreach ($trace->output as $row) { - $this->assertStringContainsString('localhost:9200', $row); - } - // Check for localhost:9200 in logger - foreach ($logger->output as $row) { - if (false !== strpos('warning: Request Failure:', $row)) { - $this->assertStringContainsString('localhost:9200', $row); - } - } - } -} diff --git a/tests/Handlers/SigV4HandlerTest.php b/tests/Handlers/SigV4HandlerTest.php deleted file mode 100644 index 1b255e65..00000000 --- a/tests/Handlers/SigV4HandlerTest.php +++ /dev/null @@ -1,241 +0,0 @@ - - */ - private $envTemp = []; - - protected function setUp(): void - { - $this->envTemp = array_combine(self::ENV_KEYS_USED, array_map( - function ($envVarName) { - $current = getenv($envVarName); - putenv($envVarName); - return $current; - }, - self::ENV_KEYS_USED - )); - } - - protected function tearDown(): void - { - foreach ($this->envTemp as $key => $value) { - putenv("$key=$value"); - } - $this->envTemp = []; - } - - public function testSignsRequestsTheSdkDefaultCredentialProviderChain() - { - $key = 'foo'; - $toWrap = function (array $ringRequest) use ($key) { - $this->assertArrayHasKey('X-Amz-Date', $ringRequest['headers']); - $this->assertArrayHasKey('Authorization', $ringRequest['headers']); - $this->assertArrayHasKey('x-amz-content-sha256', $ringRequest['headers']); - $this->assertMatchesRegularExpression( - "~^AWS4-HMAC-SHA256 Credential=$key/\\d{8}/us-west-2/es/aws4_request~", - $ringRequest['headers']['Authorization'][0] - ); - - return $this->getGenericResponse(); - }; - putenv(CredentialProvider::ENV_KEY . "=$key"); - putenv(CredentialProvider::ENV_SECRET . '=bar'); - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setSigV4Region('us-west-2') - ->setSigV4CredentialProvider(true) - ->build(); - - $client->search([ - 'index' => 'index', - 'body' => [ - 'query' => [ 'match_all' => (object)[] ], - ], - ]); - } - - public function testSignsWithProvidedCredentials() - { - $toWrap = function (array $ringRequest) { - $this->assertArrayHasKey('X-Amz-Security-Token', $ringRequest['headers']); - $this->assertSame('baz', $ringRequest['headers']['X-Amz-Security-Token'][0]); - $this->assertArrayHasKey('x-amz-content-sha256', $ringRequest['headers']); - $this->assertMatchesRegularExpression( - '~^AWS4-HMAC-SHA256 Credential=foo/\d{8}/us-west-2/es/aws4_request~', - $ringRequest['headers']['Authorization'][0] - ); - - return $this->getGenericResponse(); - }; - - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setSigV4Region('us-west-2') - ->setSigV4CredentialProvider(new Credentials('foo', 'bar', 'baz')) - ->build(); - - $client->search([ - 'index' => 'index', - 'body' => [ - 'query' => [ 'match_all' => (object)[] ], - ], - ]); - } - - public function testSignsWithProvidedCredentialsAndService() - { - $toWrap = function (array $ringRequest) { - $this->assertArrayHasKey('X-Amz-Security-Token', $ringRequest['headers']); - $this->assertSame('baz', $ringRequest['headers']['X-Amz-Security-Token'][0]); - $this->assertArrayHasKey('x-amz-content-sha256', $ringRequest['headers']); - $this->assertMatchesRegularExpression( - '~^AWS4-HMAC-SHA256 Credential=foo/\d{8}/us-west-2/aoss/aws4_request~', - $ringRequest['headers']['Authorization'][0] - ); - - return $this->getGenericResponse(); - }; - - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setSigV4Region('us-west-2') - ->setSigV4Service('aoss') - ->setSigV4CredentialProvider(new Credentials('foo', 'bar', 'baz')) - ->build(); - - $client->search([ - 'index' => 'index', - 'body' => [ - 'query' => ['match_all' => (object)[]], - ], - ]); - } - - public function testEmptyBodyProducesCorrectSha256() - { - $toWrap = function (array $ringRequest) { - $this->assertArrayHasKey('X-Amz-Security-Token', $ringRequest['headers']); - $this->assertSame('baz', $ringRequest['headers']['X-Amz-Security-Token'][0]); - $this->assertArrayHasKey('x-amz-content-sha256', $ringRequest['headers']); - $this->assertSame($ringRequest['headers']['x-amz-content-sha256'][0], 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'); - $this->assertMatchesRegularExpression( - '~^AWS4-HMAC-SHA256 Credential=foo/\d{8}/us-west-2/aoss/aws4_request~', - $ringRequest['headers']['Authorization'][0] - ); - - return $this->getGenericResponse(); - }; - - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setSigV4Region('us-west-2') - ->setSigV4Service('aoss') - ->setSigV4CredentialProvider(new Credentials('foo', 'bar', 'baz')) - ->build(); - - $client->indices()->exists(['index' => 'index']); - } - - public function testEmptyRequestBodiesShouldBeNull() - { - $toWrap = function (array $ringRequest) { - $this->assertNull($ringRequest['body']); - - return $this->getGenericResponse(); - }; - - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setSigV4Region('us-west-2') - ->setSigV4CredentialProvider(new Credentials('foo', 'bar', 'baz')) - ->build(); - - $client->indices()->exists(['index' => 'index']); - } - - public function testNonEmptyRequestBodiesShouldNotBeNull() - { - $toWrap = function (array $ringRequest) { - $this->assertNotNull($ringRequest['body']); - - return $this->getGenericResponse(); - }; - - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setSigV4Region('us-west-2') - ->setSigV4CredentialProvider(new Credentials('foo', 'bar', 'baz')) - ->build(); - - $client->search([ - 'index' => 'index', - 'body' => [ - 'query' => [ 'match_all' => (object)[] ], - ], - ]); - } - - public function testClientParametersShouldBePassedToHandler() - { - $toWrap = function (array $ringRequest) { - $this->assertArrayHasKey('client', $ringRequest); - $this->assertArrayHasKey('timeout', $ringRequest['client']); - $this->assertArrayHasKey('connect_timeout', $ringRequest['client']); - - return $this->getGenericResponse(); - }; - - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setSigV4Region('us-west-2') - ->setSigV4CredentialProvider(new Credentials('foo', 'bar', 'baz')) - ->setConnectionParams(['client' => ['timeout' => 5, 'connect_timeout' => 5]]) - ->build(); - - $client->indices()->exists(['index' => 'index']); - } - - public function testClientPortDeterminedByURL() - { - $toWrap = function (array $ringRequest) { - $this->assertArrayNotHasKey(CURLOPT_PORT, $ringRequest['client']['curl']); - - return $this->getGenericResponse(); - }; - - $client = ClientBuilder::create() - ->setHandler($toWrap) - ->setHosts(['https://search--hgkaewb2ytci3t3y6yghh5m5vje.eu-central-1.es.amazonaws.com']) - ->setSigV4Region('us-west-2') - ->setSigV4CredentialProvider(new Credentials('foo', 'bar', 'baz')) - ->build(); - - $client->indices()->exists(['index' => 'index']); - } - - private function getGenericResponse(): CompletedFutureArray - { - return new CompletedFutureArray([ - 'status' => 200, - 'body' => fopen('php://memory', 'r'), - 'transfer_stats' => ['total_time' => 0], - 'effective_url' => 'https://www.example.com', - ]); - } -} diff --git a/tests/Namespaces/SecurityNamespaceTest.php b/tests/Namespaces/SecurityNamespaceTest.php index 9c4a6e2e..cbc41997 100644 --- a/tests/Namespaces/SecurityNamespaceTest.php +++ b/tests/Namespaces/SecurityNamespaceTest.php @@ -41,7 +41,6 @@ protected function setUp(): void { $this->transport = $this->createMock(Transport::class); $this->client = ClientBuilder::create() - ->setTransport($this->transport) ->setSSLVerification(false) ->build(); } diff --git a/tests/RegisteredNamespaceTest.php b/tests/RegisteredNamespaceTest.php index 117b550f..34233e8d 100644 --- a/tests/RegisteredNamespaceTest.php +++ b/tests/RegisteredNamespaceTest.php @@ -23,10 +23,9 @@ use OpenSearch; use OpenSearch\ClientBuilder; -use OpenSearch\Endpoints\AbstractEndpoint; use OpenSearch\Serializers\SerializerInterface; -use OpenSearch\Transport; use Mockery as m; +use OpenSearch\Transport; /** * Class RegisteredNamespaceTest diff --git a/tests/TransportTest.php b/tests/TransportTest.php deleted file mode 100644 index 96469261..00000000 --- a/tests/TransportTest.php +++ /dev/null @@ -1,99 +0,0 @@ -logger = $this->createMock(LoggerInterface::class); - $this->connectionPool = $this->createMock(AbstractConnectionPool::class); - $this->connection = $this->createMock(Connection::class); - } - - public function testPerformRequestWithServerErrorResponseException404Result() - { - $deferred = new Deferred(); - $deferred->reject(new ServerErrorResponseException('foo', 404)); - $future = new FutureArray($deferred->promise()); - - $this->connection->method('performRequest') - ->willReturn($future); - - $this->connectionPool->method('nextConnection') - ->willReturn($this->connection); - - $this->connectionPool->expects($this->never()) - ->method('scheduleCheck'); - - $transport = new Transport(1, $this->connectionPool, $this->logger); - - $result = $transport->performRequest('GET', '/'); - $this->assertInstanceOf(FutureArrayInterface::class, $result); - } - - public function testPerformRequestWithServerErrorResponseException500Result() - { - $deferred = new Deferred(); - $deferred->reject(new ServerErrorResponseException('foo', 500)); - $future = new FutureArray($deferred->promise()); - - $this->connection->method('performRequest') - ->willReturn($future); - - $this->connectionPool->method('nextConnection') - ->willReturn($this->connection); - - $this->connectionPool->expects($this->once()) - ->method('scheduleCheck'); - - $transport = new Transport(1, $this->connectionPool, $this->logger); - - $result = $transport->performRequest('GET', '/'); - $this->assertInstanceOf(FutureArrayInterface::class, $result); - } -} diff --git a/tests/Utility.php b/tests/Utility.php index bf004660..880c898d 100644 --- a/tests/Utility.php +++ b/tests/Utility.php @@ -56,13 +56,6 @@ public static function getClient(): Client $clientBuilder = ClientBuilder::create() ->setHosts([self::getHost()]); - $clientBuilder->setConnectionParams([ - 'client' => [ - 'headers' => [ - 'Accept' => [] - ] - ] - ]); $clientBuilder->setSSLVerification(false); return $clientBuilder->build();