Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: imdhemy <[email protected]>
  • Loading branch information
imdhemy committed Feb 25, 2024
1 parent cb14688 commit 74c1a80
Show file tree
Hide file tree
Showing 3 changed files with 2,890 additions and 143 deletions.
38 changes: 21 additions & 17 deletions src/OpenSearch/ConnectionPool/SniffingConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

namespace OpenSearch\ConnectionPool;

use Exception;
use InvalidArgumentException;
use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException;
use OpenSearch\Common\Exceptions\NoNodesAvailableException;
use OpenSearch\ConnectionPool\Selectors\SelectorInterface;
use OpenSearch\Connections\Connection;
use OpenSearch\Connections\ConnectionInterface;
use OpenSearch\Connections\ConnectionFactoryInterface;
use OpenSearch\Connections\ConnectionInterface;

class SniffingConnectionPool extends AbstractConnectionPool
{
Expand All @@ -44,8 +44,12 @@ class SniffingConnectionPool extends AbstractConnectionPool
/**
* {@inheritdoc}
*/
public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams)
{
public function __construct(
$connections,
SelectorInterface $selector,
ConnectionFactoryInterface $factory,
$connectionPoolParams
) {
parent::__construct($connections, $selector, $factory, $connectionPoolParams);

$this->setConnectionPoolParams($connectionPoolParams);
Expand Down Expand Up @@ -120,23 +124,23 @@ private function sniffConnection(Connection $connection): bool
{
try {
$response = $connection->sniff();
} catch (OperationTimeoutException $exception) {
} catch (Exception $exception) {
return false;
}

$nodes = $this->parseClusterState($connection->getTransportSchema(), $response);
$nodes = $this->parseClusterState($response);

if (count($nodes) === 0) {
return false;
}

$this->connections = array();
$this->connections = [];

foreach ($nodes as $node) {
$nodeDetails = array(
$nodeDetails = [
'host' => $node['host'],
'port' => $node['port']
);
'port' => $node['port'],
];
$this->connections[] = $this->connectionFactory->create($nodeDetails);
}

Expand All @@ -145,18 +149,18 @@ private function sniffConnection(Connection $connection): bool
return true;
}

private function parseClusterState(string $transportSchema, $nodeInfo): array
private function parseClusterState($nodeInfo): array
{
$pattern = '/([^:]*):(\d+)/';
$hosts = [];
$pattern = '/([^:]*):(\d+)/';
$hosts = [];

foreach ($nodeInfo['nodes'] as $node) {
if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) {
if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) {
$hosts[] = array(
$hosts[] = [
'host' => $match[1],
'port' => (int) $match[2],
);
'port' => (int)$match[2],
];
}
}
}
Expand All @@ -168,7 +172,7 @@ private function setConnectionPoolParams(array $connectionPoolParams): void
{
$this->sniffingInterval = (int)($connectionPoolParams['sniffingInterval'] ?? 300);

if($this->sniffingInterval < 0) {
if ($this->sniffingInterval < 0) {
throw new InvalidArgumentException('sniffingInterval must be greater than or equal to 0');
}
}
Expand Down
Loading

0 comments on commit 74c1a80

Please sign in to comment.