From 28a8591f7eadeb60a3250928573be96b9b339064 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Fri, 26 May 2023 19:50:28 +0000 Subject: [PATCH] pulling http.port settings from environment settings response rather than from extension intialization request Signed-off-by: Joshua Palis --- .../java/org/opensearch/sdk/SDKClient.java | 12 ++--- .../ExtensionsInitRequestHandler.java | 12 ++++- .../opensearch/sdk/TestExtensionsRunner.java | 4 +- .../helloworld/TestHelloWorldExtension.java | 48 +++++++++++++------ 4 files changed, 50 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/opensearch/sdk/SDKClient.java b/src/main/java/org/opensearch/sdk/SDKClient.java index 53e70091..b8f412dc 100644 --- a/src/main/java/org/opensearch/sdk/SDKClient.java +++ b/src/main/java/org/opensearch/sdk/SDKClient.java @@ -87,7 +87,6 @@ import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.rest_client.RestClientTransport; -import org.opensearch.common.transport.TransportAddress; import org.opensearch.index.reindex.BulkByScrollResponse; import org.opensearch.index.reindex.DeleteByQueryRequest; @@ -132,26 +131,25 @@ public void initialize(Map actions) { /** * Update the ExtensionSettings with a new OpenSearch address and port - * @param address the TransportAddress associated with the OpenSearchNode + * @param address the host address associated with the OpenSearchNode * @param httpPort the http port associated with the OpenSearchNOde */ - public void updateOpenSearchNodeSettings(TransportAddress address, String httpPort) { + public void updateOpenSearchNodeSettings(String address, String httpPort) { // Update the settings for future initialization of new clients - this.extensionSettings.setOpensearchAddress(address.getAddress()); + this.extensionSettings.setOpensearchAddress(address); this.extensionSettings.setOpensearchPort(httpPort); // Update the settings on the already-initialized SDKRestClient (Deprecated -- for migration use) if (this.sdkRestClient != null) { this.sdkRestClient.getRestHighLevelClient() .getLowLevelClient() - .setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort))))); + .setNodes(List.of(new Node(new HttpHost(address, Integer.parseInt(httpPort))))); } // Update the settings on the already-initialized OpenSearchAsyncClient if (this.javaAsyncClient != null) { OpenSearchTransport javaAsyncClientTransport = this.javaAsyncClient._transport(); if (javaAsyncClientTransport instanceof RestClientTransport) { RestClientTransport restClientTransport = (RestClientTransport) javaAsyncClientTransport; - restClientTransport.restClient() - .setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort))))); + restClientTransport.restClient().setNodes(List.of(new Node(new HttpHost(address, Integer.parseInt(httpPort))))); } } } diff --git a/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java b/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java index ffbd7f6d..539d8bcf 100644 --- a/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java +++ b/src/main/java/org/opensearch/sdk/handlers/ExtensionsInitRequestHandler.java @@ -28,6 +28,12 @@ public class ExtensionsInitRequestHandler { private static final Logger logger = LogManager.getLogger(ExtensionsInitRequestHandler.class); + // The default http port setting of OpenSearch + private static final String DEFAULT_HTTP_PORT = "9200"; + + // The configured http port setting of opensearch.yml + private static final String HTTP_PORT_SETTING = "http.port"; + private final ExtensionsRunner extensionsRunner; /** @@ -64,8 +70,6 @@ public InitializeExtensionResponse handleExtensionInitRequest(InitializeExtensio // After sending successful response to initialization, send the REST API and Settings extensionsRunner.setOpensearchNode(extensionInitRequest.getSourceNode()); extensionsRunner.setExtensionNode(extensionInitRequest.getExtension()); - extensionsRunner.getSdkClient() - .updateOpenSearchNodeSettings(extensionInitRequest.getSourceNode().getAddress(), extensionInitRequest.getHttpPort()); // TODO: replace with sdkTransportService.getTransportService() TransportService extensionTransportService = extensionsRunner.getExtensionTransportService(); @@ -81,6 +85,10 @@ public InitializeExtensionResponse handleExtensionInitRequest(InitializeExtensio extensionsRunner.setEnvironmentSettings(settings); extensionsRunner.updateNamedXContentRegistry(); extensionsRunner.updateSdkClusterService(); + // Use OpenSearch Settings to update client REST Connections + String openSearchNodeAddress = extensionInitRequest.getSourceNode().getAddress().getAddress(); + String openSearchNodeHttpPort = settings.get(HTTP_PORT_SETTING) != null ? settings.get(HTTP_PORT_SETTING) : DEFAULT_HTTP_PORT; + extensionsRunner.getSdkClient().updateOpenSearchNodeSettings(openSearchNodeAddress, openSearchNodeHttpPort); // Last step of initialization // TODO: make sure all the other sendX methods have completed diff --git a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java index 2e5e1759..28ef2b91 100644 --- a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java +++ b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java @@ -129,7 +129,7 @@ public void testHandleExtensionInitRequest() throws UnknownHostException { extensionsRunner.setExtensionTransportService(this.transportService); doNothing().when(this.transportService).connectToNodeAsExtension(sourceNode, "opensearch-sdk-1"); - InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension, "9204"); + InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension); InitializeExtensionResponse response = extensionsInitRequestHandler.handleExtensionInitRequest(extensionInitRequest); // Test if name and unique ID are set @@ -137,8 +137,6 @@ public void testHandleExtensionInitRequest() throws UnknownHostException { assertEquals("opensearch-sdk-1", extensionsRunner.getUniqueId()); // Test if the source node is set after handleExtensionInitRequest() is called during OpenSearch bootstrap assertEquals(sourceNode, extensionsRunner.getOpensearchNode()); - // Verify opensearch http port is updated - assertEquals("9204", extensionsRunner.getExtension().getExtensionSettings().getOpensearchPort()); } @Test diff --git a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java index 2bacb263..451de7be 100644 --- a/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java +++ b/src/test/java/org/opensearch/sdk/sample/helloworld/TestHelloWorldExtension.java @@ -9,7 +9,6 @@ package org.opensearch.sdk.sample.helloworld; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,8 +26,9 @@ import org.opensearch.action.ActionType; import org.opensearch.action.support.TransportAction; import org.opensearch.client.Node; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; +import org.opensearch.client.transport.rest_client.RestClientTransport; import org.opensearch.common.settings.Settings; -import org.opensearch.common.transport.TransportAddress; import org.opensearch.sdk.api.ActionExtension.ActionHandler; import org.opensearch.sdk.rest.ExtensionRestHandler; import org.opensearch.sdk.sample.helloworld.transport.SampleAction; @@ -56,6 +56,7 @@ public class TestHelloWorldExtension extends OpenSearchTestCase { private Injector injector; private SDKClient sdkClient; private SDKRestClient sdkRestClient; + private OpenSearchAsyncClient javaAsyncClient; private final ExtensionSettings extensionSettings = new ExtensionSettings("", "", "", "localhost", "9200"); static class UnregisteredAction extends ActionType { @@ -85,6 +86,7 @@ public void setUp() throws Exception { }); initializeSdkClient(); this.sdkRestClient = sdkClient.initializeRestClient("localhost", 9200); + this.javaAsyncClient = sdkClient.initalizeJavaAsyncClient("localhost", 9200); } @SuppressWarnings("rawtypes") @@ -98,6 +100,7 @@ private void initializeSdkClient() { public void tearDown() throws Exception { super.tearDown(); this.sdkRestClient.close(); + this.sdkClient.doCloseJavaClients(); this.injector = null; } @@ -113,18 +116,35 @@ public void testExtensionSettings() { @Test public void testExtensionSettingsUpdate() { - List nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); - assertEquals(1, nodes.size()); - HttpHost host = nodes.get(0).getHost(); - assertEquals("localhost", host.getHostName()); - assertEquals(9200, host.getPort()); - - this.sdkClient.updateOpenSearchNodeSettings(new TransportAddress(new InetSocketAddress("10.10.10.10", 9300)), "9204"); - nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); - assertEquals(1, nodes.size()); - host = nodes.get(0).getHost(); - assertEquals("10.10.10.10", host.getHostName()); - assertEquals(9204, host.getPort()); + List sdkRestClientNodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); + List javaAsyncClientNodes = ((RestClientTransport) this.javaAsyncClient._transport()).restClient().getNodes(); + + // Test rest client nodes + assertEquals(1, sdkRestClientNodes.size()); + assertEquals(1, javaAsyncClientNodes.size()); + + // Test client http hosts + HttpHost sdkRestClientHost = sdkRestClientNodes.get(0).getHost(); + assertEquals("localhost", sdkRestClientHost.getHostName()); + assertEquals(9200, sdkRestClientHost.getPort()); + HttpHost javaAsyncClientHost = javaAsyncClientNodes.get(0).getHost(); + assertEquals("localhost", javaAsyncClientHost.getHostName()); + assertEquals(9200, javaAsyncClientHost.getPort()); + + // Test updated hosts + this.sdkClient.updateOpenSearchNodeSettings("10.10.10.10", "9204"); + + sdkRestClientNodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes(); + assertEquals(1, sdkRestClientNodes.size()); + sdkRestClientHost = sdkRestClientNodes.get(0).getHost(); + assertEquals("10.10.10.10", sdkRestClientHost.getHostName()); + assertEquals(9204, sdkRestClientHost.getPort()); + + javaAsyncClientNodes = ((RestClientTransport) this.javaAsyncClient._transport()).restClient().getNodes(); + assertEquals(1, javaAsyncClientNodes.size()); + javaAsyncClientHost = javaAsyncClientNodes.get(0).getHost(); + assertEquals("10.10.10.10", javaAsyncClientHost.getHostName()); + assertEquals(9204, javaAsyncClientHost.getPort()); } @Test