Skip to content

Commit

Permalink
pulling http.port settings from environment settings response rather …
Browse files Browse the repository at this point in the history
…than from extension intialization request

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed May 26, 2023
1 parent 52ba43a commit 28a8591
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 26 deletions.
12 changes: 5 additions & 7 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryRequest;

Expand Down Expand Up @@ -132,26 +131,25 @@ public void initialize(Map<ActionType, TransportAction> actions) {

/**
* Update the ExtensionSettings with a new OpenSearch address and port
* @param address the TransportAddress associated with the OpenSearchNode
* @param address the host address associated with the OpenSearchNode
* @param httpPort the http port associated with the OpenSearchNOde
*/
public void updateOpenSearchNodeSettings(TransportAddress address, String httpPort) {
public void updateOpenSearchNodeSettings(String address, String httpPort) {
// Update the settings for future initialization of new clients
this.extensionSettings.setOpensearchAddress(address.getAddress());
this.extensionSettings.setOpensearchAddress(address);
this.extensionSettings.setOpensearchPort(httpPort);
// Update the settings on the already-initialized SDKRestClient (Deprecated -- for migration use)
if (this.sdkRestClient != null) {
this.sdkRestClient.getRestHighLevelClient()
.getLowLevelClient()
.setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort)))));
.setNodes(List.of(new Node(new HttpHost(address, Integer.parseInt(httpPort)))));
}
// Update the settings on the already-initialized OpenSearchAsyncClient
if (this.javaAsyncClient != null) {
OpenSearchTransport javaAsyncClientTransport = this.javaAsyncClient._transport();
if (javaAsyncClientTransport instanceof RestClientTransport) {
RestClientTransport restClientTransport = (RestClientTransport) javaAsyncClientTransport;
restClientTransport.restClient()
.setNodes(List.of(new Node(new HttpHost(address.getAddress(), Integer.parseInt(httpPort)))));
restClientTransport.restClient().setNodes(List.of(new Node(new HttpHost(address, Integer.parseInt(httpPort)))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
public class ExtensionsInitRequestHandler {
private static final Logger logger = LogManager.getLogger(ExtensionsInitRequestHandler.class);

// The default http port setting of OpenSearch
private static final String DEFAULT_HTTP_PORT = "9200";

// The configured http port setting of opensearch.yml
private static final String HTTP_PORT_SETTING = "http.port";

private final ExtensionsRunner extensionsRunner;

/**
Expand Down Expand Up @@ -64,8 +70,6 @@ public InitializeExtensionResponse handleExtensionInitRequest(InitializeExtensio
// After sending successful response to initialization, send the REST API and Settings
extensionsRunner.setOpensearchNode(extensionInitRequest.getSourceNode());
extensionsRunner.setExtensionNode(extensionInitRequest.getExtension());
extensionsRunner.getSdkClient()
.updateOpenSearchNodeSettings(extensionInitRequest.getSourceNode().getAddress(), extensionInitRequest.getHttpPort());

// TODO: replace with sdkTransportService.getTransportService()
TransportService extensionTransportService = extensionsRunner.getExtensionTransportService();
Expand All @@ -81,6 +85,10 @@ public InitializeExtensionResponse handleExtensionInitRequest(InitializeExtensio
extensionsRunner.setEnvironmentSettings(settings);
extensionsRunner.updateNamedXContentRegistry();
extensionsRunner.updateSdkClusterService();
// Use OpenSearch Settings to update client REST Connections
String openSearchNodeAddress = extensionInitRequest.getSourceNode().getAddress().getAddress();
String openSearchNodeHttpPort = settings.get(HTTP_PORT_SETTING) != null ? settings.get(HTTP_PORT_SETTING) : DEFAULT_HTTP_PORT;
extensionsRunner.getSdkClient().updateOpenSearchNodeSettings(openSearchNodeAddress, openSearchNodeHttpPort);

// Last step of initialization
// TODO: make sure all the other sendX methods have completed
Expand Down
4 changes: 1 addition & 3 deletions src/test/java/org/opensearch/sdk/TestExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,14 @@ public void testHandleExtensionInitRequest() throws UnknownHostException {
extensionsRunner.setExtensionTransportService(this.transportService);
doNothing().when(this.transportService).connectToNodeAsExtension(sourceNode, "opensearch-sdk-1");

InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension, "9204");
InitializeExtensionRequest extensionInitRequest = new InitializeExtensionRequest(sourceNode, extension);

InitializeExtensionResponse response = extensionsInitRequestHandler.handleExtensionInitRequest(extensionInitRequest);
// Test if name and unique ID are set
assertEquals(EXTENSION_NAME, response.getName());
assertEquals("opensearch-sdk-1", extensionsRunner.getUniqueId());
// Test if the source node is set after handleExtensionInitRequest() is called during OpenSearch bootstrap
assertEquals(sourceNode, extensionsRunner.getOpensearchNode());
// Verify opensearch http port is updated
assertEquals("9204", extensionsRunner.getExtension().getExtensionSettings().getOpensearchPort());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.opensearch.sdk.sample.helloworld;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -27,8 +26,9 @@
import org.opensearch.action.ActionType;
import org.opensearch.action.support.TransportAction;
import org.opensearch.client.Node;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.sdk.api.ActionExtension.ActionHandler;
import org.opensearch.sdk.rest.ExtensionRestHandler;
import org.opensearch.sdk.sample.helloworld.transport.SampleAction;
Expand Down Expand Up @@ -56,6 +56,7 @@ public class TestHelloWorldExtension extends OpenSearchTestCase {
private Injector injector;
private SDKClient sdkClient;
private SDKRestClient sdkRestClient;
private OpenSearchAsyncClient javaAsyncClient;
private final ExtensionSettings extensionSettings = new ExtensionSettings("", "", "", "localhost", "9200");

static class UnregisteredAction extends ActionType<SampleResponse> {
Expand Down Expand Up @@ -85,6 +86,7 @@ public void setUp() throws Exception {
});
initializeSdkClient();
this.sdkRestClient = sdkClient.initializeRestClient("localhost", 9200);
this.javaAsyncClient = sdkClient.initalizeJavaAsyncClient("localhost", 9200);
}

@SuppressWarnings("rawtypes")
Expand All @@ -98,6 +100,7 @@ private void initializeSdkClient() {
public void tearDown() throws Exception {
super.tearDown();
this.sdkRestClient.close();
this.sdkClient.doCloseJavaClients();
this.injector = null;
}

Expand All @@ -113,18 +116,35 @@ public void testExtensionSettings() {

@Test
public void testExtensionSettingsUpdate() {
List<Node> nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes();
assertEquals(1, nodes.size());
HttpHost host = nodes.get(0).getHost();
assertEquals("localhost", host.getHostName());
assertEquals(9200, host.getPort());

this.sdkClient.updateOpenSearchNodeSettings(new TransportAddress(new InetSocketAddress("10.10.10.10", 9300)), "9204");
nodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes();
assertEquals(1, nodes.size());
host = nodes.get(0).getHost();
assertEquals("10.10.10.10", host.getHostName());
assertEquals(9204, host.getPort());
List<Node> sdkRestClientNodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes();
List<Node> javaAsyncClientNodes = ((RestClientTransport) this.javaAsyncClient._transport()).restClient().getNodes();

// Test rest client nodes
assertEquals(1, sdkRestClientNodes.size());
assertEquals(1, javaAsyncClientNodes.size());

// Test client http hosts
HttpHost sdkRestClientHost = sdkRestClientNodes.get(0).getHost();
assertEquals("localhost", sdkRestClientHost.getHostName());
assertEquals(9200, sdkRestClientHost.getPort());
HttpHost javaAsyncClientHost = javaAsyncClientNodes.get(0).getHost();
assertEquals("localhost", javaAsyncClientHost.getHostName());
assertEquals(9200, javaAsyncClientHost.getPort());

// Test updated hosts
this.sdkClient.updateOpenSearchNodeSettings("10.10.10.10", "9204");

sdkRestClientNodes = this.sdkClient.getSdkRestClient().getRestHighLevelClient().getLowLevelClient().getNodes();
assertEquals(1, sdkRestClientNodes.size());
sdkRestClientHost = sdkRestClientNodes.get(0).getHost();
assertEquals("10.10.10.10", sdkRestClientHost.getHostName());
assertEquals(9204, sdkRestClientHost.getPort());

javaAsyncClientNodes = ((RestClientTransport) this.javaAsyncClient._transport()).restClient().getNodes();
assertEquals(1, javaAsyncClientNodes.size());
javaAsyncClientHost = javaAsyncClientNodes.get(0).getHost();
assertEquals("10.10.10.10", javaAsyncClientHost.getHostName());
assertEquals(9204, javaAsyncClientHost.getPort());
}

@Test
Expand Down

0 comments on commit 28a8591

Please sign in to comment.