diff --git a/tests/ConnectionPool/SniffingConnectionPoolTest.php b/tests/ConnectionPool/SniffingConnectionPoolTest.php index 7ce4f481..59a63bab 100644 --- a/tests/ConnectionPool/SniffingConnectionPoolTest.php +++ b/tests/ConnectionPool/SniffingConnectionPoolTest.php @@ -6,7 +6,7 @@ * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a + * this file to be licensed under the Apache-2.0 license or a * compatible open source license. * * Modifications Copyright OpenSearch Contributors. See @@ -15,10 +15,8 @@ namespace OpenSearch\Tests\ConnectionPool; -use Mockery as m; -use OpenSearch\Common\Exceptions\Curl\OperationTimeoutException; use OpenSearch\Common\Exceptions\NoNodesAvailableException; -use OpenSearch\ConnectionPool\Selectors\SelectorInterface; +use OpenSearch\ConnectionPool\Selectors\RoundRobinSelector; use OpenSearch\ConnectionPool\SniffingConnectionPool; use OpenSearch\Connections\Connection; use OpenSearch\Connections\ConnectionFactoryInterface; @@ -31,445 +29,168 @@ */ class SniffingConnectionPoolTest extends TestCase { - public function tearDown(): void + /** @test */ + public function itShouldReturnTheSingleLiveConnectionAvailable(): void { - m::close(); - } - - public function testAddOneHostThenGetConnection(): void - { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(true); - $mockConnection->allows('isAlive')->andReturns(true); - - $connections = [$mockConnection]; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturns($connections[0]); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); + $clusterState = $this->clusterState(1); + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(true); + $connection->method('sniff')->willReturn($clusterState); + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturn($connection); - $connectionPoolParams = ['randomizeHosts' => false]; $connectionPool = new SniffingConnectionPool( - $connections, + [$connection], $selector, $connectionFactory, - $connectionPoolParams + ['sniffingInterval' => 0] ); - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($mockConnection, $retConnection); + $this->assertSame($connection, $connectionPool->nextConnection()); } - public function testItShouldDiscoverLiveConnections(): void + /** @test */ + public function itShouldSniffNewConnectionsWhenPossible(): void { - $downConnection = $this->createMock(Connection::class); - $downConnection->method('isAlive')->willReturn(false); - $downConnection->method('ping')->willReturn(false); - $liveConnection = $this->createMock(Connection::class); - $liveConnection->method('isAlive')->willReturn(true); - $connections = [$downConnection, $liveConnection]; - $selector = $this->createMock(SelectorInterface::class); - $selector->method('select')->willReturnOnConsecutiveCalls($downConnection, $liveConnection); + $clusterState = $this->clusterState(2); + $originalConnection = $this->createMock(Connection::class); + $originalConnection->method('isAlive')->willReturn(false); + $originalConnection->method('sniff')->willReturn($clusterState); + $discoveredConnection = $this->createMock(Connection::class); + $discoveredConnection->method('isAlive')->willReturn(true); + $selector = new RoundRobinSelector(); $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls($downConnection, $liveConnection); - - $sut = new SniffingConnectionPool($connections, $selector, $connectionFactory, []); - - $this->assertSame($liveConnection, $sut->nextConnection()); - } + $connectionFactory->method('create')->willReturnOnConsecutiveCalls($originalConnection, $discoveredConnection); - public function testItShouldReturnTheFirstLiveConnection(): void - { - $firstConnection = $this->createMock(Connection::class); - $firstConnection->method('isAlive')->willReturn(true); - $secondConnection = $this->createMock(Connection::class); - $secondConnection->method('isAlive')->willReturn(true); - $connections = [$firstConnection, $secondConnection]; - $selector = $this->createMock(SelectorInterface::class); - $selector->method('select')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); + $connectionPool = new SniffingConnectionPool( + [$originalConnection], + $selector, + $connectionFactory, + ['sniffingInterval' => 0] + ); - $sut = new SniffingConnectionPool($connections, $selector, $connectionFactory, []); + $actualConnection = $connectionPool->nextConnection(); - $this->assertSame($firstConnection, $sut->nextConnection()); + $this->assertSame($discoveredConnection, $actualConnection); } - public function testForcingNextConnectionSelection(): void + /** @test */ + public function forceNextConnection(): void { - $clusterState = json_decode( - file_get_contents(__DIR__.'/../fixtures/cluster-state.json'), - true, - 512, - JSON_THROW_ON_ERROR - ); + $clusterState = $this->clusterState(2); $firstConnection = $this->createMock(Connection::class); $firstConnection->method('isAlive')->willReturn(true); $firstConnection->method('sniff')->willReturn($clusterState); $secondConnection = $this->createMock(Connection::class); $secondConnection->method('isAlive')->willReturn(true); - $connections = [$firstConnection, $secondConnection]; - $selector = $this->createMock(SelectorInterface::class); - $selector->method('select')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); + $selector = new RoundRobinSelector(); $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); $connectionFactory->method('create')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); - $sut = new SniffingConnectionPool($connections, $selector, $connectionFactory, []); - - $this->assertSame($secondConnection, $sut->nextConnection(true)); - } - - public function testAddOneHostAndForceNext(): void - { - $firstConnection = $this->createMock(Connection::class); - $secondConnection = m::mock(Connection::class); - $secondConnection->allows('isAlive')->andReturns(true); - $selector = $this->createMock(SelectorInterface::class); - $selector->method('select')->willReturnOnConsecutiveCalls($firstConnection, $secondConnection); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionFactory->method('create')->willReturn($secondConnection); $connectionPool = new SniffingConnectionPool( - [$firstConnection], + [$firstConnection, $secondConnection], $selector, $connectionFactory, - ['randomizeHosts' => false] + ['sniffingInterval' => 0] ); - $actualConnection = $connectionPool->nextConnection(true); - - $this->assertSame($secondConnection, $actualConnection); + $this->assertSame($secondConnection, $connectionPool->nextConnection(true)); } - public function testAddMultipleNodesThenGetConnection(): void - { - $connections = []; - foreach (range(1, 10) as $ignored) { - $mockConnection = $this->createMock(Connection::class); - $mockConnection->method('ping')->willReturn(true); - $mockConnection->method('isAlive')->willReturn(true); - $connections[] = $mockConnection; - } - $selector = $this->createMock(SelectorInterface::class); - $selector->method('select')->willReturn($connections[0]); - $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionPoolParams = ['randomizeHosts' => false]; - $connectionPool = new SniffingConnectionPool( - $connections, - $selector, - $connectionFactory, - $connectionPoolParams - ); - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($connections[0], $retConnection); - } - - public function testAddMultipleNodesTimeoutAllButLast(): void + /** @test */ + public function itShouldReturnFirstSeededConnectionIfAlive(): void { + $clusterState = $this->clusterState(10); $connections = []; - foreach (range(1, 10) as $ignored) { - $mockConnection = $this->createMock(Connection::class); - $mockConnection->method('ping')->willReturn(false); - $mockConnection->method('isAlive')->willReturn(false); - $connections[] = $mockConnection; + for ($i = 1; $i <= 10; $i++) { + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(true); + $connection->method('sniff')->willReturn($clusterState); + $connections[] = $connection; } - $liveConnection = $this->createMock(Connection::class); - $liveConnection->method('ping')->willReturn(true); - $liveConnection->method('isAlive')->willReturn(true); - $connections[] = $liveConnection; - $selector = $this->createMock(SelectorInterface::class); - $selector->method('select')->willReturnOnConsecutiveCalls(...$connections); + $selector = new RoundRobinSelector(); $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); $connectionPool = new SniffingConnectionPool( $connections, $selector, $connectionFactory, - ['randomizeHosts' => false] + ['sniffingInterval' => 0] ); - $retConnection = $connectionPool->nextConnection(); - - $this->assertSame($liveConnection, $retConnection); + $this->assertSame($connections[0], $connectionPool->nextConnection()); } - public function testAddTenNodesAllTimeout(): void + /** @test */ + public function itShouldReturnTheFirstAvailableConnection(): void { + $clusterState = $this->clusterState(10); $connections = []; - foreach (range(1, 10) as $ignored) { - $mockConnection = $this->createMock(Connection::class); - $mockConnection->method('ping')->willReturn(false); - $mockConnection->method('isAlive')->willReturn(false); - $connections[] = $mockConnection; + for ($i = 1; $i <= 10; $i++) { + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(false); + $connection->method('sniff')->willReturn($clusterState); + $connections[] = $connection; } - - $selector = $this->createMock(SelectorInterface::class); - $selector->method('select')->willReturnCallback( - function () use ($connections) { - return array_shift($connections); - } - ); - + $randomLiveConnectionIndex = random_int(0, 9); + $connections[$randomLiveConnectionIndex] = $this->createMock(Connection::class); + $connections[$randomLiveConnectionIndex]->method('isAlive')->willReturn(true); + $selector = new RoundRobinSelector(); $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); - $connectionPool = new SniffingConnectionPool( - $connections, - $selector, - $connectionFactory, - ['randomizeHosts' => false] - ); - - $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); - - $connectionPool->nextConnection(); - } - - public function testAddOneHostSniffTwo(): void - { - $clusterState = json_decode( - file_get_contents(__DIR__.'/../fixtures/cluster-state.json'), - true, - 512, - JSON_THROW_ON_ERROR - ); - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('getTransportSchema')->twice()->andReturns('http'); - $mockConnection->expects('sniff')->twice()->andReturns($clusterState); - - $connections = [$mockConnection]; - - $newConnections = []; - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(true); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; + $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues([ - //selects provided node first, then the new cluster list - $mockConnection, - $newConnections[0], - $newConnections[1], - ]); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns( - $newConnections[0] - ); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns( - $newConnections[1] - ); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => 0, - ]; - $connectionPool = new SniffingConnectionPool( - $connections, - $selector, - $connectionFactory, - $connectionPoolParams - ); - - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[0], $retConnection); - - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[1], $retConnection); - } - - public function testAddSeedSniffTwoTimeoutTwo(): void - { - $clusterState = json_decode( - '{"ok":true,"cluster_name":"opensearch","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', - true, - 512, - JSON_THROW_ON_ERROR - ); - - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('getTransportSchema')->andReturns('http'); - $mockConnection->expects('sniff')->andReturns($clusterState); - - $connections = [$mockConnection]; - - $newConnections = []; - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(false); - $newConnection->allows('ping')->andReturns(false); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues([ //selects provided node first, then the new cluster list - $mockConnection, - $newConnections[0], - $newConnections[1], - ]); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns( - $newConnections[0] - ); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns( - $newConnections[1] - ); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1, - ]; $connectionPool = new SniffingConnectionPool( $connections, $selector, $connectionFactory, - $connectionPoolParams + ['sniffingInterval' => 0] ); - $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); - - $retConnection = $connectionPool->nextConnection(); + $this->assertSame($connections[$randomLiveConnectionIndex], $connectionPool->nextConnection()); } - public function testTenTimeoutNineSniffTenthAddTwoAlive(): void + /** @test */ + public function itShouldFailIfAllNodesAreDown(): void { - $clusterState = json_decode( - '{"ok":true,"cluster_name":"opensearch","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', - true, - 512, - JSON_THROW_ON_ERROR - ); - + $clusterState = $this->clusterState(10); $connections = []; - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(false); - $mockConnection->allows('isAlive')->andReturns(true); - $mockConnection->allows('sniff')->andThrow(OperationTimeoutException::class); - - $connections[] = $mockConnection; + for ($i = 1; $i <= 10; $i++) { + $connection = $this->createMock(Connection::class); + $connection->method('isAlive')->willReturn(false); + $connection->method('sniff')->willReturn($clusterState); + $connections[] = $connection; } + $selector = new RoundRobinSelector(); + $connectionFactory = $this->createMock(ConnectionFactoryInterface::class); + $connectionFactory->method('create')->willReturnOnConsecutiveCalls(...$connections); - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('sniff')->andReturns($clusterState); - $mockConnection->expects('getTransportSchema')->twice()->andReturns('http'); - - $connections[] = $mockConnection; - - $newConnections = $connections; - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(true); - $newConnection->allows('ping')->andReturns(true); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($newConnections); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns( - $newConnections[10] - ); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns( - $newConnections[11] - ); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1, - ]; $connectionPool = new SniffingConnectionPool( $connections, $selector, $connectionFactory, - $connectionPoolParams + ['sniffingInterval' => 0] ); - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[11], $retConnection); + $this->expectException(NoNodesAvailableException::class); - $retConnection = $connectionPool->nextConnection(); - $this->assertSame($newConnections[12], $retConnection); + $connectionPool->nextConnection(); } - public function testTenTimeoutNineSniffTenthAddTwoDeadTimeoutEveryone(): void + private function clusterState(int $numberOfNodes): array { - $clusterState = json_decode( - '{"ok":true,"cluster_name":"opensearch","nodes":{"node1":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9300]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9200]"}, "node2":{"name":"Vesta","transport_address":"inet[/192.168.1.119:9301]","hostname":"zach-ThinkPad-W530","version":"0.90.5","http_address":"inet[/192.168.1.119:9201]"}}}', - true, - 512, - JSON_THROW_ON_ERROR - ); - - $connections = []; - - foreach (range(1, 10) as $index) { - $mockConnection = m::mock(Connection::class); - $mockConnection->allows('ping')->andReturns(false); - $mockConnection->allows('isAlive')->andReturns(true); - $mockConnection->allows('sniff')->andThrow(OperationTimeoutException::class); - - $connections[] = $mockConnection; + $clusterState = ['nodes' => []]; + + for ($i = 1; $i <= $numberOfNodes; $i++) { + $clusterState['nodes']["node-$i"] = [ + 'http' => [ + 'publish_address' => "172.17.0.2:920$i", + ], + ]; } - $mockConnection = m::mock(Connection::class); - $mockConnection->expects('ping')->andReturns(true); - $mockConnection->expects('isAlive')->andReturns(true); - $mockConnection->expects('sniff')->andReturns($clusterState); - $mockConnection->expects('getTransportSchema')->andReturns('http'); - $mockConnection->expects('sniff')->andThrow(OperationTimeoutException::class); - - $connections[] = $mockConnection; - - $newConnections = $connections; - - $newConnection = m::mock(Connection::class); - $newConnection->allows('isAlive')->andReturns(false); - $newConnection->allows('ping')->andReturns(false); - $newConnection->allows('sniff')->andThrow(OperationTimeoutException::class); - - $newConnections[] = $newConnection; - $newConnections[] = $newConnection; - - $selector = m::mock(SelectorInterface::class); - $selector->allows('select')->andReturnValues($newConnections); - - $connectionFactory = m::mock(ConnectionFactoryInterface::class); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9200])->andReturns( - $newConnections[10] - ); - $connectionFactory->allows('create')->with(['host' => '192.168.1.119', 'port' => 9201])->andReturns( - $newConnections[11] - ); - - $connectionPoolParams = [ - 'randomizeHosts' => false, - 'sniffingInterval' => -1, - ]; - $connectionPool = new SniffingConnectionPool( - $connections, - $selector, - $connectionFactory, - $connectionPoolParams - ); - - $this->expectException(NoNodesAvailableException::class); - $this->expectExceptionMessage('No alive nodes found in your cluster'); - - $retConnection = $connectionPool->nextConnection(); + return $clusterState; } }