Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor handler method #158

Merged
merged 21 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/sdk/ExtensionRestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected ExtensionRestRequest(RestExecuteOnExtensionRequest request) throws Ill
/**
* Object generated from input stream
* @param in Input stream
* @throws IOException
* @throws IOException if there a error generating object from input stream
mloufra marked this conversation as resolved.
Show resolved Hide resolved
*/
public ExtensionRestRequest(StreamInput in) throws IOException {
super(in);
Expand All @@ -68,7 +68,7 @@ public ExtensionRestRequest(StreamInput in) throws IOException {
/**
* Write this object to output stream
* @param out the writeable output stream
* @throws IOException
* @throws IOException if there is an error writing object to output stream
mloufra marked this conversation as resolved.
Show resolved Hide resolved
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
Expand Down
173 changes: 52 additions & 121 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest;
import org.opensearch.extensions.OpenSearchRequest;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionActionListenerOnFailureRequest;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.EnvironmentSettingsRequest;
Expand All @@ -39,31 +35,32 @@
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.rest.RestStatus;
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.rest.RestResponse;
import org.opensearch.transport.netty4.Netty4Transport;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.sdk.handlers.ActionListenerOnFailureResponseHandler;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionBooleanResponseHandler;
import org.opensearch.sdk.handlers.ExtensionsIndicesModuleNameRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsIndicesModuleRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsInitRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsRestRequestHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.UpdateSettingsRequestHandler;
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.sdk.handlers.OpensearchRequestHandler;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.TransportResponse;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -91,23 +88,46 @@ public class ExtensionsRunner {
private static final String NODE_NAME_SETTING = "node.name";

private String uniqueId;
private DiscoveryNode opensearchNode;
/**
* This method is call from {@link ExtensionsInitRequestHandler}.
mloufra marked this conversation as resolved.
Show resolved Hide resolved
*/
public DiscoveryNode opensearchNode;
private DiscoveryExtension extensionNode;
private TransportService extensionTransportService = null;
/**
* This method is call from {@link ExtensionsInitRequestHandler}.
mloufra marked this conversation as resolved.
Show resolved Hide resolved
*/
public TransportService extensionTransportService = null;
// The routes and classes which handle the REST requests
private final ExtensionRestPathRegistry extensionRestPathRegistry = new ExtensionRestPathRegistry();
// Custom settings from the extension's getSettings
/**
* This method is call from {@link ExtensionsInitRequestHandler}.
mloufra marked this conversation as resolved.
Show resolved Hide resolved
*/
private final List<Setting<?>> customSettings;
// Node name, host, and port
private final Settings settings;
/**
* This method is call from {@link ExtensionsInitRequestHandler}.
mloufra marked this conversation as resolved.
Show resolved Hide resolved
*/
public final Settings settings;
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};
private NamedWriteableRegistryAPI namedWriteableRegistryApi = new NamedWriteableRegistryAPI();
private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler();
private OpensearchRequestHandler opensearchRequestHandler = new OpensearchRequestHandler();
private ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler();

/*
* TODO: expose an interface for extension to register actions
* https://github.com/opensearch-project/opensearch-sdk-java/issues/119
*/
private TransportActions transportActions = new TransportActions(new HashMap<>());
/**
* This method is call from {@link ExtensionsInitRequestHandler}.
mloufra marked this conversation as resolved.
Show resolved Hide resolved
*/
public TransportActions transportActions = new TransportActions(new HashMap<>());

/**
* Instantiates a new update settings request handler
*/
Expand Down Expand Up @@ -161,131 +181,38 @@ private static ExtensionSettings readExtensionSettings() throws IOException {
return objectMapper.readValue(file, ExtensionSettings.class);
}

/**
* This method is call from {@link ExtensionsInitRequestHandler}.
* @param extensionTransportService assign value for extensionTransportService
*/
void setExtensionTransportService(TransportService extensionTransportService) {
this.extensionTransportService = extensionTransportService;
}

private void setUniqueId(String id) {
/**
* This method is call from {@link ExtensionsInitRequestHandler}.
mloufra marked this conversation as resolved.
Show resolved Hide resolved
* @param id assign value for id
*/
public void setUniqueId(String id) {
this.uniqueId = id;
}

String getUniqueId() {
return uniqueId;
}

private void setOpensearchNode(DiscoveryNode opensearchNode) {
public void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

private void setExtensionNode(DiscoveryExtension extensionNode) {
public void setExtensionNode(DiscoveryExtension extensionNode) {
this.extensionNode = extensionNode;
}

DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

/**
* Handles a extension request from OpenSearch. This is the first request for the transport communication and will initialize the extension and will be a part of OpenSearch bootstrap.
*
* @param extensionInitRequest The request to handle.
* @return A response to OpenSearch validating that this is an extension.
*/
InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequest extensionInitRequest) {
logger.info("Registering Extension Request received from OpenSearch");
opensearchNode = extensionInitRequest.getSourceNode();
setUniqueId(extensionInitRequest.getExtension().getId());
// Successfully initialized. Send the response.
try {
return new InitializeExtensionsResponse(settings.get(NODE_NAME_SETTING));
} finally {
// After sending successful response to initialization, send the REST API and Settings
setOpensearchNode(opensearchNode);
setExtensionNode(extensionInitRequest.getExtension());
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
sendRegisterCustomSettingsRequest(extensionTransportService);
transportActions.sendRegisterTransportActionsRequest(extensionTransportService, opensearchNode);
}
}

/**
* Handles a request from OpenSearch and invokes the extension point API corresponding with the request type
*
* @param request The request to handle.
* @return A response to OpenSearch for the corresponding API
* @throws Exception if the corresponding handler for the request is not present
*/
TransportResponse handleOpenSearchRequest(OpenSearchRequest request) throws Exception {
// Read enum
switch (request.getRequestType()) {
case REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY:
return namedWriteableRegistryApi.handleNamedWriteableRegistryRequest(request);
// Add additional request handlers here
default:
throw new Exception("Handler not present for the provided request");
}
}

/**
* Handles a request for extension point indices from OpenSearch. The {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @param transportService The transport service communicating with OpenSearch.
* @return A response to OpenSearch with this extension's index and search listeners.
*/
IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);
return indicesModuleResponse;
}

/**
* Handles a request for extension name from OpenSearch. The {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @return A response acknowledging the request.
*/
ExtensionBooleanResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) {
// Works as beforeIndexRemoved
logger.info("Registering Indices Module Name Request received from OpenSearch");
ExtensionBooleanResponse indicesModuleNameResponse = new ExtensionBooleanResponse(true);
return indicesModuleNameResponse;
}

/**
* Handles a request from OpenSearch to execute a REST request on the extension.
*
* @param request The REST request to execute.
* @return A response acknowledging the request.
*/
RestExecuteOnExtensionResponse handleRestExecuteOnExtensionRequest(RestExecuteOnExtensionRequest request) {

ExtensionRestHandler restHandler = extensionRestPathRegistry.getHandler(request.getMethod(), request.getUri());
if (restHandler == null) {
return new RestExecuteOnExtensionResponse(
RestStatus.NOT_FOUND,
"No handler for " + ExtensionRestPathRegistry.restPathToString(request.getMethod(), request.getUri())
);
}
// ExtensionRestRequest restRequest = new ExtensionRestRequest(request);
ExtensionRestRequest restRequest = new ExtensionRestRequest(
request.getMethod(),
request.getUri(),
request.getRequestIssuerIdentity()
);

// Get response from extension
RestResponse response = restHandler.handleRequest(restRequest);
logger.info("Sending extension response to OpenSearch: " + response.status());
return new RestExecuteOnExtensionResponse(
response.status(),
response.contentType(),
BytesReference.toBytes(response.content()),
response.getHeaders()
);
}

/**
* Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
*
Expand Down Expand Up @@ -381,7 +308,7 @@ public void startTransportService(TransportService transportService) {
false,
false,
InitializeExtensionsRequest::new,
(request, channel, task) -> channel.sendResponse(handleExtensionInitRequest(request))
(request, channel, task) -> channel.sendResponse(extensionsInitRequestHandler.handleExtensionInitRequest(request, this))
mloufra marked this conversation as resolved.
Show resolved Hide resolved
);

transportService.registerRequestHandler(
Expand All @@ -390,7 +317,7 @@ public void startTransportService(TransportService transportService) {
false,
false,
OpenSearchRequest::new,
(request, channel, task) -> channel.sendResponse(handleOpenSearchRequest(request))
(request, channel, task) -> channel.sendResponse(opensearchRequestHandler.handleOpenSearchRequest(request))
);

transportService.registerRequestHandler(
Expand All @@ -408,7 +335,9 @@ public void startTransportService(TransportService transportService) {
false,
false,
IndicesModuleRequest::new,
((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request, transportService)))
((request, channel, task) -> channel.sendResponse(
extensionsIndicesModuleRequestHandler.handleIndicesModuleRequest(request, transportService)
))

);

Expand All @@ -418,7 +347,9 @@ public void startTransportService(TransportService transportService) {
false,
false,
IndicesModuleRequest::new,
((request, channel, task) -> channel.sendResponse(handleIndicesModuleNameRequest(request)))
((request, channel, task) -> channel.sendResponse(
extensionsIndicesModuleNameRequestHandler.handleIndicesModuleNameRequest(request)
))
);

transportService.registerRequestHandler(
Expand All @@ -427,7 +358,7 @@ public void startTransportService(TransportService transportService) {
false,
false,
RestExecuteOnExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleRestExecuteOnExtensionRequest(request)))
((request, channel, task) -> channel.sendResponse(extensionsRestRequestHandler.handleRestExecuteOnExtensionRequest(request)))
);

transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.index.IndicesModuleRequest;

/**
* This class handles the request from OpenSearch to a {@link ExtensionsRunner#startTransportService(TransportService transportService)} call.
*/

public class ExtensionsIndicesModuleNameRequestHandler {
private static final Logger logger = LogManager.getLogger(ExtensionsIndicesModuleNameRequestHandler.class);

/**
* Handles a request for extension name from OpenSearch. The {@link ExtensionsInitRequestHandler} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @return A response acknowledging the request.
*/
public ExtensionBooleanResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) {
// Works as beforeIndexRemoved
logger.info("Registering Indices Module Name Request received from OpenSearch");
ExtensionBooleanResponse indicesModuleNameResponse = new ExtensionBooleanResponse(true);
return indicesModuleNameResponse;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.transport.TransportService;

/**
* This class handles the request from OpenSearch to a {@link ExtensionsRunner#startTransportService(TransportService transportService)} call.
*/

public class ExtensionsIndicesModuleRequestHandler {
private static final Logger logger = LogManager.getLogger(ExtensionsIndicesModuleRequestHandler.class);

/**
* Handles a request for extension point indices from OpenSearch. The {@link ExtensionsInitRequestHandler} class must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @param transportService The transport service communicating with OpenSearch.
* @return A response to OpenSearch with this extension's index and search listeners.
*/
public IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);
return indicesModuleResponse;
}

}
Loading