From 431e8c875a6b44b01d08654aec6296bcab3a5c8b Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 26 May 2023 01:30:39 +0000 Subject: [PATCH 1/3] Modifies extension initialization request handling, updates sdkclient opensearch node settings with the source node address and HTTP port Signed-off-by: Joshua Palis --- src/main/java/org/opensearch/sdk/SDKClient.java | 16 +++++++++++++--- .../handlers/ExtensionsInitRequestHandler.java | 2 ++ .../org/opensearch/sdk/TestExtensionsRunner.java | 4 +++- .../helloworld/TestHelloWorldExtension.java | 4 ++-- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/opensearch/sdk/SDKClient.java b/src/main/java/org/opensearch/sdk/SDKClient.java index 017b682b..53e70091 100644 --- a/src/main/java/org/opensearch/sdk/SDKClient.java +++ b/src/main/java/org/opensearch/sdk/SDKClient.java @@ -133,16 +133,26 @@ public void initialize(Map actions) { /** * Update the ExtensionSettings with a new OpenSearch address and port * @param address the TransportAddress associated with the OpenSearchNode + * @param httpPort the http port associated with the OpenSearchNOde */ - public void updateOpenSearchNodeSettings(TransportAddress address) { + public void updateOpenSearchNodeSettings(TransportAddress address, String httpPort) { // Update the settings for future initialization of new clients this.extensionSettings.setOpensearchAddress(address.getAddress()); - this.extensionSettings.setOpensearchPort(Integer.toString(address.getPort())); + this.extensionSettings.setOpensearchPort(httpPort); // Update the settings on the already-initialized SDKRestClient (Deprecated -- for migration use) if (this.sdkRestClient != null) { this.sdkRestClient.getRestHighLevelClient() .getLowLevelClient() - .setNodes(List.of(new Node(new HttpHost(address.getAddress(), address.getPort())))); + .setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort))))); + } + // Update the settings on the already-initialized OpenSearchAsyncClient + if (this.javaAsyncClient != null) { + OpenSearchTransport javaAsyncClientTransport = this.javaAsyncClient._transport(); + if (javaAsyncClientTransport instanceof RestClientTransport) { + RestClientTransport restClientTransport = (RestClientTransport) javaAsyncClientTransport; + restClientTransport.restClient() + .setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort))))); + } } } diff --git a/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java b/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java index 0fede3e0..ffbd7f6d 100644 --- a/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java +++ b/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java @@ -64,6 +64,8 @@ public InitializeExtensionResponse handleExtensionInitRequest(InitializeExtensio // After sending successful response to initialization, send the REST API and Settings extensionsRunner.setOpensearchNode(extensionInitRequest.getSourceNode()); extensionsRunner.setExtensionNode(extensionInitRequest.getExtension()); + extensionsRunner.getSdkClient() + .updateOpenSearchNodeSettings(extensionInitRequest.getSourceNode().getAddress(), extensionInitRequest.getHttpPort()); // TODO: replace with sdkTransportService.getTransportService() TransportService extensionTransportService = extensionsRunner.getExtensionTransportService(); diff --git a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java index 28ef2b91..2e5e1759 100644 --- a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java +++ b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java @@ -129,7 +129,7 @@ public void testHandleExtensionInitRequest() throws UnknownHostException { extensionsRunner.setExtensionTransportService(this.transportService); doNothing().when(this.transportService).connectToNodeAsExtension(sourceNode, "opensearch-sdk-1"); - InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension); + InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension, "9204"); InitializeExtensionResponse response = extensionsInitRequestHandler.handleExtensionInitRequest(extensionInitRequest); // Test if name and unique ID are set @@ -137,6 +137,8 @@ public void testHandleExtensionInitRequest() throws UnknownHostException { assertEquals("opensearch-sdk-1", extensionsRunner.getUniqueId()); // Test if the source node is set after handleExtensionInitRequest() is called during OpenSearch bootstrap assertEquals(sourceNode, extensionsRunner.getOpensearchNode()); + // Verify opensearch http port is updated + assertEquals("9204", extensionsRunner.getExtension().getExtensionSettings().getOpensearchPort()); } @Test diff --git a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java index 81e7811c..c6754d64 100644 --- a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java +++ b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java @@ -119,12 +119,12 @@ public void testExtensionSettingsUpdate() { assertEquals("localhost", host.getHostName()); assertEquals(9200, host.getPort()); - this.sdkClient.updateOpenSearchNodeSettings(new TransportAddress(new InetSocketAddress("10.10.10.10", 9300))); + this.sdkClient.updateOpenSearchNodeSettings(new TransportAddress(new InetSocketAddress("10.10.10.10", 9204)), "9204"); nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); assertEquals(1, nodes.size()); host = nodes.get(0).getHost(); assertEquals("10.10.10.10", host.getHostName()); - assertEquals(9300, host.getPort()); + assertEquals(9204, host.getPort()); } @Test From 52ba43a4b4958ee916ec8bb0b262e1c942f7a37f Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 26 May 2023 02:03:56 +0000 Subject: [PATCH 2/3] Resetting test TransportAddress port to 9300 Signed-off-by: Joshua Palis --- .../sdk/sample/helloworld/TestHelloWorldExtension.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java index c6754d64..2bacb263 100644 --- a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java +++ b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java @@ -119,7 +119,7 @@ public void testExtensionSettingsUpdate() { assertEquals("localhost", host.getHostName()); assertEquals(9200, host.getPort()); - this.sdkClient.updateOpenSearchNodeSettings(new TransportAddress(new InetSocketAddress("10.10.10.10", 9204)), "9204"); + this.sdkClient.updateOpenSearchNodeSettings(new TransportAddress(new InetSocketAddress("10.10.10.10", 9300)), "9204"); nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); assertEquals(1, nodes.size()); host = nodes.get(0).getHost(); From 28a8591f7eadeb60a3250928573be96b9b339064 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 26 May 2023 19:50:28 +0000 Subject: [PATCH 3/3] pulling http.port settings from environment settings response rather than from extension intialization request Signed-off-by: Joshua Palis --- .../java/org/opensearch/sdk/SDKClient.java | 12 ++--- .../ExtensionsInitRequestHandler.java | 12 ++++- .../opensearch/sdk/TestExtensionsRunner.java | 4 +- .../helloworld/TestHelloWorldExtension.java | 48 +++++++++++++------ 4 files changed, 50 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/opensearch/sdk/SDKClient.java b/src/main/java/org/opensearch/sdk/SDKClient.java index 53e70091..b8f412dc 100644 --- a/src/main/java/org/opensearch/sdk/SDKClient.java +++ b/src/main/java/org/opensearch/sdk/SDKClient.java @@ -87,7 +87,6 @@ import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.rest_client.RestClientTransport; -import org.opensearch.common.transport.TransportAddress; import org.opensearch.index.reindex.BulkByScrollResponse; import org.opensearch.index.reindex.DeleteByQueryRequest; @@ -132,26 +131,25 @@ public void initialize(Map actions) { /** * Update the ExtensionSettings with a new OpenSearch address and port - * @param address the TransportAddress associated with the OpenSearchNode + * @param address the host address associated with the OpenSearchNode * @param httpPort the http port associated with the OpenSearchNOde */ - public void updateOpenSearchNodeSettings(TransportAddress address, String httpPort) { + public void updateOpenSearchNodeSettings(String address, String httpPort) { // Update the settings for future initialization of new clients - this.extensionSettings.setOpensearchAddress(address.getAddress()); + this.extensionSettings.setOpensearchAddress(address); this.extensionSettings.setOpensearchPort(httpPort); // Update the settings on the already-initialized SDKRestClient (Deprecated -- for migration use) if (this.sdkRestClient != null) { this.sdkRestClient.getRestHighLevelClient() .getLowLevelClient() - .setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort))))); + .setNodes(List.of(new Node(new HttpHost(address, Integer.parseInt(httpPort))))); } // Update the settings on the already-initialized OpenSearchAsyncClient if (this.javaAsyncClient != null) { OpenSearchTransport javaAsyncClientTransport = this.javaAsyncClient._transport(); if (javaAsyncClientTransport instanceof RestClientTransport) { RestClientTransport restClientTransport = (RestClientTransport) javaAsyncClientTransport; - restClientTransport.restClient() - .setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort))))); + restClientTransport.restClient().setNodes(List.of(new Node(new HttpHost(address, Integer.parseInt(httpPort))))); } } } diff --git a/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java b/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java index ffbd7f6d..539d8bcf 100644 --- a/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java +++ b/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java @@ -28,6 +28,12 @@ public class ExtensionsInitRequestHandler { private static final Logger logger = LogManager.getLogger(ExtensionsInitRequestHandler.class); + // The default http port setting of OpenSearch + private static final String DEFAULT_HTTP_PORT = "9200"; + + // The configured http port setting of opensearch.yml + private static final String HTTP_PORT_SETTING = "http.port"; + private final ExtensionsRunner extensionsRunner; /** @@ -64,8 +70,6 @@ public InitializeExtensionResponse handleExtensionInitRequest(InitializeExtensio // After sending successful response to initialization, send the REST API and Settings extensionsRunner.setOpensearchNode(extensionInitRequest.getSourceNode()); extensionsRunner.setExtensionNode(extensionInitRequest.getExtension()); - extensionsRunner.getSdkClient() - .updateOpenSearchNodeSettings(extensionInitRequest.getSourceNode().getAddress(), extensionInitRequest.getHttpPort()); // TODO: replace with sdkTransportService.getTransportService() TransportService extensionTransportService = extensionsRunner.getExtensionTransportService(); @@ -81,6 +85,10 @@ public InitializeExtensionResponse handleExtensionInitRequest(InitializeExtensio extensionsRunner.setEnvironmentSettings(settings); extensionsRunner.updateNamedXContentRegistry(); extensionsRunner.updateSdkClusterService(); + // Use OpenSearch Settings to update client REST Connections + String openSearchNodeAddress = extensionInitRequest.getSourceNode().getAddress().getAddress(); + String openSearchNodeHttpPort = settings.get(HTTP_PORT_SETTING) != null ? settings.get(HTTP_PORT_SETTING) : DEFAULT_HTTP_PORT; + extensionsRunner.getSdkClient().updateOpenSearchNodeSettings(openSearchNodeAddress, openSearchNodeHttpPort); // Last step of initialization // TODO: make sure all the other sendX methods have completed diff --git a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java index 2e5e1759..28ef2b91 100644 --- a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java +++ b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java @@ -129,7 +129,7 @@ public void testHandleExtensionInitRequest() throws UnknownHostException { extensionsRunner.setExtensionTransportService(this.transportService); doNothing().when(this.transportService).connectToNodeAsExtension(sourceNode, "opensearch-sdk-1"); - InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension, "9204"); + InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension); InitializeExtensionResponse response = extensionsInitRequestHandler.handleExtensionInitRequest(extensionInitRequest); // Test if name and unique ID are set @@ -137,8 +137,6 @@ public void testHandleExtensionInitRequest() throws UnknownHostException { assertEquals("opensearch-sdk-1", extensionsRunner.getUniqueId()); // Test if the source node is set after handleExtensionInitRequest() is called during OpenSearch bootstrap assertEquals(sourceNode, extensionsRunner.getOpensearchNode()); - // Verify opensearch http port is updated - assertEquals("9204", extensionsRunner.getExtension().getExtensionSettings().getOpensearchPort()); } @Test diff --git a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java index 2bacb263..451de7be 100644 --- a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java +++ b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java @@ -9,7 +9,6 @@ package org.opensearch.sdk.sample.helloworld; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,8 +26,9 @@ import org.opensearch.action.ActionType; import org.opensearch.action.support.TransportAction; import org.opensearch.client.Node; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; +import org.opensearch.client.transport.rest_client.RestClientTransport; import org.opensearch.common.settings.Settings; -import org.opensearch.common.transport.TransportAddress; import org.opensearch.sdk.api.ActionExtension.ActionHandler; import org.opensearch.sdk.rest.ExtensionRestHandler; import org.opensearch.sdk.sample.helloworld.transport.SampleAction; @@ -56,6 +56,7 @@ public class TestHelloWorldExtension extends OpenSearchTestCase { private Injector injector; private SDKClient sdkClient; private SDKRestClient sdkRestClient; + private OpenSearchAsyncClient javaAsyncClient; private final ExtensionSettings extensionSettings = new ExtensionSettings("", "", "", "localhost", "9200"); static class UnregisteredAction extends ActionType { @@ -85,6 +86,7 @@ public void setUp() throws Exception { }); initializeSdkClient(); this.sdkRestClient = sdkClient.initializeRestClient("localhost", 9200); + this.javaAsyncClient = sdkClient.initalizeJavaAsyncClient("localhost", 9200); } @SuppressWarnings("rawtypes") @@ -98,6 +100,7 @@ private void initializeSdkClient() { public void tearDown() throws Exception { super.tearDown(); this.sdkRestClient.close(); + this.sdkClient.doCloseJavaClients(); this.injector = null; } @@ -113,18 +116,35 @@ public void testExtensionSettings() { @Test public void testExtensionSettingsUpdate() { - List nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); - assertEquals(1, nodes.size()); - HttpHost host = nodes.get(0).getHost(); - assertEquals("localhost", host.getHostName()); - assertEquals(9200, host.getPort()); - - this.sdkClient.updateOpenSearchNodeSettings(new TransportAddress(new InetSocketAddress("10.10.10.10", 9300)), "9204"); - nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); - assertEquals(1, nodes.size()); - host = nodes.get(0).getHost(); - assertEquals("10.10.10.10", host.getHostName()); - assertEquals(9204, host.getPort()); + List sdkRestClientNodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); + List javaAsyncClientNodes = ((RestClientTransport) this.javaAsyncClient._transport()).restClient().getNodes(); + + // Test rest client nodes + assertEquals(1, sdkRestClientNodes.size()); + assertEquals(1, javaAsyncClientNodes.size()); + + // Test client http hosts + HttpHost sdkRestClientHost = sdkRestClientNodes.get(0).getHost(); + assertEquals("localhost", sdkRestClientHost.getHostName()); + assertEquals(9200, sdkRestClientHost.getPort()); + HttpHost javaAsyncClientHost = javaAsyncClientNodes.get(0).getHost(); + assertEquals("localhost", javaAsyncClientHost.getHostName()); + assertEquals(9200, javaAsyncClientHost.getPort()); + + // Test updated hosts + this.sdkClient.updateOpenSearchNodeSettings("10.10.10.10", "9204"); + + sdkRestClientNodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); + assertEquals(1, sdkRestClientNodes.size()); + sdkRestClientHost = sdkRestClientNodes.get(0).getHost(); + assertEquals("10.10.10.10", sdkRestClientHost.getHostName()); + assertEquals(9204, sdkRestClientHost.getPort()); + + javaAsyncClientNodes = ((RestClientTransport) this.javaAsyncClient._transport()).restClient().getNodes(); + assertEquals(1, javaAsyncClientNodes.size()); + javaAsyncClientHost = javaAsyncClientNodes.get(0).getHost(); + assertEquals("10.10.10.10", javaAsyncClientHost.getHostName()); + assertEquals(9204, javaAsyncClientHost.getPort()); } @Test