Skip to content

Commit

Permalink
Remove http client and replace with PSR interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
kimpepper committed Oct 22, 2024
1 parent 919caf2 commit 82721bd
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 213 deletions.
19 changes: 14 additions & 5 deletions samples/index.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

use OpenSearch\Client;

require_once __DIR__ . '/vendor/autoload.php';

// Guzzle example
Expand All @@ -14,16 +16,19 @@
'auth' => ['admin', getenv('OPENSEARCH_PASSWORD')],
'verify' => false,
'retries' => 2,
'headers' => [
'Accept' => 'application/json',
'Content-Type' => 'application/json',
'User-Agent' => sprintf('opensearch-php/%s (%s; PHP %s)', Client::VERSION, PHP_OS, PHP_VERSION),
]
]);
$guzzlePsrFactory = new \GuzzleHttp\Psr7\HttpFactory();
$httpFactory = new \GuzzleHttp\Psr7\HttpFactory();
$requestFactory = new \OpenSearch\RequestFactory();
$transport = new OpenSearch\Transport(
$guzzleClient,
$guzzlePsrFactory,
$guzzlePsrFactory,
new \Psr\Log\NullLogger(),
);

$client = new OpenSearch\Client($transport);
$client = new OpenSearch\Client($transport, $requestFactory, $httpFactory);
$info = $client->info();

echo "{$info['version']['distribution']}: {$info['version']['number']}\n";
Expand All @@ -35,6 +40,10 @@
'auth_basic' => ['admin', getenv('OPENSEARCH_PASSWORD')],
'verify_peer' => false,
'max_retries' => 2,
'headers' => [
'Accept' => 'application/json',
'Content-Type' => 'application/json',
],
]);

$client = new OpenSearch\Client($symfonyClient);
Expand Down
69 changes: 41 additions & 28 deletions src/OpenSearch/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

namespace OpenSearch;

use Http\Promise\Promise;
use OpenSearch\Common\Exceptions\BadMethodCallException;
use OpenSearch\Common\Exceptions\NoNodesAvailableException;
use OpenSearch\Endpoints\AbstractEndpoint;
use OpenSearch\Namespaces\AsyncSearchNamespace;
use OpenSearch\Namespaces\BooleanRequestWrapper;
Expand Down Expand Up @@ -51,8 +51,7 @@
use OpenSearch\Namespaces\SslNamespace;
use OpenSearch\Namespaces\TasksNamespace;
use OpenSearch\Namespaces\TransformsNamespace;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\ResponseInterface;

/**
* Class Client
Expand All @@ -68,6 +67,11 @@ class Client
*/
public $transport;

/**
* Whether the client is async mode.
*/
protected bool $isAsync = false;

/**
* @var array
*/
Expand Down Expand Up @@ -1884,50 +1888,59 @@ public function extractArgument(array &$params, string $arg)
}
}

/**
* Check if the client is running in async mode.
*/
public function isAsync(): bool
{
return $this->isAsync;
}

/**
* Set the client to run in async mode.
*/
public function setAsync(bool $isAsync): Client
{
$this->isAsync = $isAsync;
return $this;
}

/**
* Sends a raw request to the cluster
* @return callable|array
* @throws NoNodesAvailableException
* @return \Http\Promise\Promise|\Psr\Http\Message\ResponseInterface
* @throws \Psr\Http\Client\ClientExceptionInterface
* @throws \Exception
*/
public function request(string $method, string $uri, array $attributes = [])
public function request(string $method, string $uri, array $attributes = []): Promise|ResponseInterface
{
$params = $attributes['params'] ?? [];
$body = $attributes['body'] ?? null;
$options = $attributes['options'] ?? [];

$async = isset($options['client']['future']) ? $options['client']['future'] : null;

$request = $this->createRequest($method, $uri, $params, $body, $options);
$request = $this->transport->createRequest($method, $uri, $params, $body);

if ($async) {
$promise = $this->transport->sendAsyncRequest($request);
if ($this->isAsync) {
return $this->transport->sendAsyncRequest($request);
}

$promise = $this->transport->performRequest($method, $uri, $params, $body, $options);

return $this->transport->resultOrFuture($promise, $options);
return $this->transport->sendRequest($request);
}

/**
* @return callable|array
* @return \Http\Promise\Promise|\Psr\Http\Message\ResponseInterface
* @throws \Psr\Http\Client\ClientExceptionInterface
* @throws \Exception
*/
private function performRequest(AbstractEndpoint $endpoint)
private function performRequest(AbstractEndpoint $endpoint): Promise|ResponseInterface
{
$promise = $this->transport->performRequest(
$request = $this->transport->createRequest(
$endpoint->getMethod(),
$endpoint->getURI(),
$endpoint->getParams(),
$endpoint->getBody(),
$endpoint->getOptions()
);

return $this->transport->resultOrFuture($promise, $endpoint->getOptions());
}

private function createRequest(string $method, string $uri, mixed $params, mixed $body, mixed $options)
{
$request = $this->requestFactory->createRequest($method, $uri);
$request->
if ($this->isAsync) {
return $this->transport->sendAsyncRequest($request);
}
return $this->transport->sendRequest($request);
}

}
39 changes: 10 additions & 29 deletions src/OpenSearch/Connections/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

namespace OpenSearch\Connections;

use Http\Client\HttpAsyncClient;
use OpenSearch\Client;
use OpenSearch\Common\Exceptions\BadRequest400Exception;
use OpenSearch\Common\Exceptions\Conflict409Exception;
Expand All @@ -43,12 +42,9 @@
use OpenSearch\Serializers\SerializerInterface;
use OpenSearch\Transport;
use Exception;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\UriFactoryInterface;
use Psr\Http\Message\UriInterface;
use GuzzleHttp\Ring\Core;
use GuzzleHttp\Ring\Exception\ConnectException;
use GuzzleHttp\Ring\Exception\RingException;
use Psr\Log\LoggerInterface;

class Connection implements ConnectionInterface
Expand Down Expand Up @@ -143,12 +139,7 @@ public function __construct(
array $connectionParams,
SerializerInterface $serializer,
LoggerInterface $log,
LoggerInterface $trace,
protected ClientInterface $httpClient,
protected HttpAsyncClient $httpAsyncClient,
protected RequestFactoryInterface $requestFactory,
protected StreamFactoryInterface $streamFactory,
protected UriFactoryInterface $uriFactory,
LoggerInterface $trace
) {
if (isset($hostDetails['port']) !== true) {
$hostDetails['port'] = 9200;
Expand Down Expand Up @@ -209,11 +200,10 @@ public function __construct(
* @param mixed $body
* @param array $options
* @param Transport|null $transport
* @return RequestInterface
* @return mixed
*/
public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null): RequestInterface
public function performRequest(string $method, string $uri, ?array $params = [], $body = null, array $options = [], ?Transport $transport = null)
{

if ($body !== null) {
$body = $this->serializer->serialize($body);
}
Expand All @@ -228,7 +218,7 @@ public function performRequest(string $method, string $uri, ?array $params = [],
$host .= ':' . $this->port;
}

$request_params = [
$request = [
'http_method' => $method,
'scheme' => $this->transportSchema,
'uri' => $this->getURI($uri, $params),
Expand All @@ -241,22 +231,13 @@ public function performRequest(string $method, string $uri, ?array $params = [],
)
];

$request_params = array_replace_recursive($request_params, $this->connectionParams, $options);

$request = $this->requestFactory->createRequest($request_params['http_method'], $request_params['uri'])
->withBody($this->streamFactory->createStream($request_params['body']));

foreach ($this->headers as $name => $value) {
$request = $request->withHeader($name, $value);
}
$request = array_replace_recursive($request, $this->connectionParams, $options);

// RingPHP does not like if client is empty
if (empty($request['client'])) {
unset($request['client']);
}

$this->httpClient->sendRequest($request);

$handler = $this->handler;
$future = $handler($request, $this, $transport, $options);

Expand Down Expand Up @@ -370,7 +351,7 @@ function ($response) use ($connection, $transport, $request, $options) {
/**
* @param array<string, string|int|bool>|null $params
*/
private function getURI(string $uri, ?array $params): UriInterface
private function getURI(string $uri, ?array $params): string
{
if (isset($params) === true && !empty($params)) {
$params = array_map(
Expand All @@ -393,7 +374,7 @@ function ($value) {
$uri = $this->path . $uri;
}

return $this->uriFactory->createUri($uri);
return $uri;
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/OpenSearch/Endpoints/AbstractEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ private function checkUserParams(array $params)
}
}

/**
* Get the headers for the endpoint request.
*/
public function getHeaders(): array {
return $this->options['client']['headers'] ?? [];
}

/**
* @param array<string, mixed> $params Note: this is passed by-reference!
*/
Expand Down
Loading

0 comments on commit 82721bd

Please sign in to comment.