diff --git a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java index 8e4d2711..ec3fb01c 100644 --- a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java +++ b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java @@ -15,13 +15,11 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest; import org.opensearch.extensions.OpenSearchRequest; import org.opensearch.extensions.rest.ExtensionRestRequest; import org.opensearch.extensions.rest.RegisterRestActionsRequest; -import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse; import org.opensearch.extensions.settings.RegisterCustomSettingsRequest; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -29,9 +27,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.discovery.InitializeExtensionsRequest; -import org.opensearch.discovery.InitializeExtensionsResponse; import org.opensearch.extensions.ExtensionActionListenerOnFailureRequest; -import org.opensearch.extensions.ExtensionBooleanResponse; import org.opensearch.extensions.DiscoveryExtension; import org.opensearch.extensions.EnvironmentSettingsRequest; import org.opensearch.extensions.AddSettingsUpdateConsumerRequest; @@ -39,13 +35,10 @@ import org.opensearch.extensions.ExtensionRequest; import org.opensearch.extensions.ExtensionsOrchestrator; import org.opensearch.index.IndicesModuleRequest; -import org.opensearch.index.IndicesModuleResponse; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.rest.RestHandler.Route; -import org.opensearch.rest.RestResponse; -import org.opensearch.rest.RestStatus; import org.opensearch.transport.netty4.Netty4Transport; import org.opensearch.transport.SharedGroupFactory; import org.opensearch.sdk.handlers.ActionListenerOnFailureResponseHandler; @@ -66,7 +59,6 @@ import org.opensearch.transport.ClusterConnectionManager; import org.opensearch.transport.ConnectionManager; import org.opensearch.transport.TransportInterceptor; -import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; import org.opensearch.transport.TransportSettings; @@ -145,8 +137,7 @@ public class ExtensionsRunner { /** * Instantiates a new Extensions Runner using test settings. * - * @throws IOException - * if the runner failed to read settings or API. + * @throws IOException if the runner failed to read settings or API. */ public ExtensionsRunner() throws IOException { ExtensionSettings extensionSettings = readExtensionSettings(); @@ -161,10 +152,8 @@ public ExtensionsRunner() throws IOException { /** * Instantiates a new Extensions Runner using the specified extension. * - * @param extension - * The settings with which to start the runner. - * @throws IOException - * if the runner failed to read settings or API. + * @param extension The settings with which to start the runner. + * @throws IOException if the runner failed to read settings or API. */ private ExtensionsRunner(Extension extension) throws IOException { ExtensionSettings extensionSettings = extension.getExtensionSettings(); @@ -225,121 +214,9 @@ DiscoveryNode getOpensearchNode() { return opensearchNode; } - /** - * Handles a extension request from OpenSearch. This is the first request for - * the transport communication and will initialize the extension and will be a - * part of OpenSearch bootstrap. - * - * @param extensionInitRequest - * The request to handle. - * @return A response to OpenSearch validating that this is an extension. - */ - InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequest extensionInitRequest) { - logger.info("Registering Extension Request received from OpenSearch"); - opensearchNode = extensionInitRequest.getSourceNode(); - setUniqueId(extensionInitRequest.getExtension().getId()); - // Successfully initialized. Send the response. - try { - return new InitializeExtensionsResponse(settings.get(NODE_NAME_SETTING)); - } finally { - // After sending successful response to initialization, send the REST API and - // Settings - setOpensearchNode(opensearchNode); - setExtensionNode(extensionInitRequest.getExtension()); - extensionTransportService.connectToNode(opensearchNode); - sendRegisterRestActionsRequest(extensionTransportService); - sendRegisterCustomSettingsRequest(extensionTransportService); - transportActions.sendRegisterTransportActionsRequest(extensionTransportService, opensearchNode); - } - } - - /** - * Handles a request from OpenSearch and invokes the extension point API - * corresponding with the request type - * - * @param request - * The request to handle. - * @return A response to OpenSearch for the corresponding API - * @throws Exception - * if the corresponding handler for the request is not present - */ - TransportResponse handleOpenSearchRequest(OpenSearchRequest request) throws Exception { - // Read enum - switch (request.getRequestType()) { - case REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY: - return namedWriteableRegistryApi.handleNamedWriteableRegistryRequest(request); - // Add additional request handlers here - default: - throw new Exception("Handler not present for the provided request"); - } - } - - /** - * Handles a request for extension point indices from OpenSearch. The - * {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must - * have been called first to initialize the extension. - * - * @param indicesModuleRequest - * The request to handle. - * @param transportService - * The transport service communicating with OpenSearch. - * @return A response to OpenSearch with this extension's index and search - * listeners. - */ - IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) { - logger.info("Registering Indices Module Request received from OpenSearch"); - IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true); - return indicesModuleResponse; - } - - /** - * Handles a request for extension name from OpenSearch. The - * {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must - * have been called first to initialize the extension. - * - * @param indicesModuleRequest - * The request to handle. - * @return A response acknowledging the request. - */ - ExtensionBooleanResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) { - // Works as beforeIndexRemoved - logger.info("Registering Indices Module Name Request received from OpenSearch"); - ExtensionBooleanResponse indicesModuleNameResponse = new ExtensionBooleanResponse(true); - return indicesModuleNameResponse; - } - - /** - * Handles a request from OpenSearch to execute a REST request on the extension. - * - * @param request - * The REST request to execute. - * @return A response acknowledging the request. - */ - RestExecuteOnExtensionResponse handleRestExecuteOnExtensionRequest(ExtensionRestRequest request) { - - ExtensionRestHandler restHandler = extensionRestPathRegistry.getHandler(request.method(), request.uri()); - if (restHandler == null) { - return new RestExecuteOnExtensionResponse( - RestStatus.NOT_FOUND, - "No handler for " + ExtensionRestPathRegistry.restPathToString(request.method(), request.uri()) - ); - } - logger.trace("Handling " + request.toString()); - // Get response from extension - RestResponse response = restHandler.handleRequest(request); - logger.info("Sending extension response to OpenSearch: " + response.status()); - return new RestExecuteOnExtensionResponse( - response.status(), - response.contentType(), - BytesReference.toBytes(response.content()), - response.getHeaders() - ); - } - /** * Initializes a Netty4Transport object. This object will be wrapped in a * {@link TransportService} object. - >>>>>>> 60694f1 (Rename/merge duplicate ExtensionRestRequest implementations) * * @param settings * The transport settings to configure. @@ -383,8 +260,7 @@ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPo * Initializes the TransportService object for this extension. This object will * control communication between the extension and OpenSearch. * - * @param settings - * The transport settings to configure. + * @param settings The transport settings to configure. * @return The initialized TransportService object. */ public TransportService initializeExtensionTransportService(Settings settings) { @@ -422,8 +298,7 @@ public TransportService initializeExtensionTransportService(Settings settings) { /** * Starts a TransportService. * - * @param transportService - * The TransportService to start. + * @param transportService The TransportService to start. */ public void startTransportService(TransportService transportService) { // start transport service and accept incoming requests @@ -506,8 +381,7 @@ public void startTransportService(TransportService transportService) { /** * Requests that OpenSearch register the REST Actions for this extension. * - * @param transportService - * The TransportService defining the connection to OpenSearch. + * @param transportService The TransportService defining the connection to OpenSearch. */ public void sendRegisterRestActionsRequest(TransportService transportService) { List extensionRestPaths = extensionRestPathRegistry.getRegisteredPaths(); @@ -528,8 +402,7 @@ public void sendRegisterRestActionsRequest(TransportService transportService) { /** * Requests that OpenSearch register the custom settings for this extension. * - * @param transportService - * The TransportService defining the connection to OpenSearch. + * @param transportService The TransportService defining the connection to OpenSearch. */ public void sendRegisterCustomSettingsRequest(TransportService transportService) { logger.info("Sending Settings request to OpenSearch"); @@ -550,8 +423,7 @@ public void sendRegisterCustomSettingsRequest(TransportService transportService) * Requests the cluster state from OpenSearch. The result will be handled by a * {@link ClusterStateResponseHandler}. * - * @param transportService - * The TransportService defining the connection to OpenSearch. + * @param transportService The TransportService defining the connection to OpenSearch. */ public void sendClusterStateRequest(TransportService transportService) { logger.info("Sending Cluster State request to OpenSearch"); @@ -572,8 +444,7 @@ public void sendClusterStateRequest(TransportService transportService) { * Requests the cluster settings from OpenSearch. The result will be handled by * a {@link ClusterSettingsResponseHandler}. * - * @param transportService - * The TransportService defining the connection to OpenSearch. + * @param transportService The TransportService defining the connection to OpenSearch. */ public void sendClusterSettingsRequest(TransportService transportService) { logger.info("Sending Cluster Settings request to OpenSearch"); @@ -594,8 +465,7 @@ public void sendClusterSettingsRequest(TransportService transportService) { * Requests the local node from OpenSearch. The result will be handled by a * {@link LocalNodeResponseHandler}. * - * @param transportService - * The TransportService defining the connection to OpenSearch. + * @param transportService The TransportService defining the connection to OpenSearch. */ public void sendLocalNodeRequest(TransportService transportService) { logger.info("Sending Local Node request to OpenSearch"); @@ -616,10 +486,8 @@ public void sendLocalNodeRequest(TransportService transportService) { * Requests the ActionListener onFailure method to be run by OpenSearch. The * result will be handled by a {@link ActionListenerOnFailureResponseHandler}. * - * @param transportService - * The TransportService defining the connection to OpenSearch. - * @param failureException - * The exception to be sent to OpenSearch + * @param transportService The TransportService defining the connection to OpenSearch. + * @param failureException The exception to be sent to OpenSearch */ public void sendActionListenerOnFailureRequest(TransportService transportService, Exception failureException) { logger.info("Sending ActionListener onFailure request to OpenSearch"); @@ -641,11 +509,8 @@ public void sendActionListenerOnFailureRequest(TransportService transportService * component settings. The result will be handled by a * {@link EnvironmentSettingsResponseHandler}. * - * @param componentSettings - * The component setting that correspond to the values provided by - * the environment settings - * @param transportService - * The TransportService defining the connection to OpenSearch. + * @param componentSettings The component setting that correspond to the values provided by the environment settings + * @param transportService The TransportService defining the connection to OpenSearch. */ public void sendEnvironmentSettingsRequest(TransportService transportService, List> componentSettings) { logger.info("Sending Environment Settings request to OpenSearch"); @@ -668,13 +533,9 @@ public void sendEnvironmentSettingsRequest(TransportService transportService, Li * to register these Setting objects with a callback to this extension. The * result will be handled by a {@link ExtensionBooleanResponseHandler}. * - * @param transportService - * The TransportService defining the connection to OpenSearch. - * @param settingUpdateConsumers - * A map of setting objects and their corresponding consumers - * @throws Exception - * if there are no setting update consumers within the - * settingUpdateConsumers map + * @param transportService The TransportService defining the connection to OpenSearch. + * @param settingUpdateConsumers A map of setting objects and their corresponding consumers + * @throws Exception if there are no setting update consumers within the settingUpdateConsumers map */ public void sendAddSettingsUpdateConsumerRequest(TransportService transportService, Map, Consumer> settingUpdateConsumers) throws Exception { @@ -714,9 +575,7 @@ private Settings getSettings() { /** * Starts an ActionListener. * - * @param timeout - * The timeout for the listener in milliseconds. A timeout of 0 means - * no timeout. + * @param timeout The timeout for the listener in milliseconds. A timeout of 0 means no timeout. */ public void startActionListener(int timeout) { final ActionListener actionListener = new ActionListener(); @@ -726,10 +585,8 @@ public void startActionListener(int timeout) { /** * Runs the specified extension. * - * @param extension - * The extension to run. - * @throws IOException - * on failure to bind ports. + * @param extension The extension to run. + * @throws IOException on failure to bind ports. */ public static void run(Extension extension) throws IOException { logger.info("Starting extension " + extension.getExtensionSettings().getExtensionName()); @@ -741,10 +598,8 @@ public static void run(Extension extension) throws IOException { * Run the Extension. For internal/testing purposes only. Imports settings and * sets up Transport Service listening for incoming connections. * - * @param args - * Unused - * @throws IOException - * if the runner failed to connect to the OpenSearch cluster. + * @param args Unused + * @throws IOException if the runner failed to connect to the OpenSearch cluster. */ public static void main(String[] args) throws IOException { diff --git a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java index cfa9de14..30d0ab80 100644 --- a/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java +++ b/src/test/java/org/opensearch/sdk/TestExtensionsRunner.java @@ -59,6 +59,7 @@ import org.opensearch.sdk.handlers.ClusterStateResponseHandler; import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler; import org.opensearch.sdk.handlers.ExtensionsInitRequestHandler; +import org.opensearch.sdk.handlers.ExtensionsRestRequestHandler; import org.opensearch.sdk.handlers.LocalNodeResponseHandler; import org.opensearch.sdk.handlers.ExtensionStringResponseHandler; import org.opensearch.sdk.handlers.OpensearchRequestHandler; @@ -72,6 +73,7 @@ public class TestExtensionsRunner extends OpenSearchTestCase { private static final String EXTENSION_NAME = "sample-extension"; private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(); private OpensearchRequestHandler opensearchRequestHandler = new OpensearchRequestHandler(); + private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(); private ExtensionsRunner extensionsRunner; private TransportService transportService; @@ -178,7 +180,7 @@ public void testHandleExtensionRestRequest() throws Exception { new BytesArray("bar"), ext.generateToken(userPrincipal) ); - RestExecuteOnExtensionResponse response = extensionsRunner.handleRestExecuteOnExtensionRequest(request); + RestExecuteOnExtensionResponse response = extensionsRestRequestHandler.handleRestExecuteOnExtensionRequest(request); // this will fail in test environment with no registered actions assertEquals(RestStatus.NOT_FOUND, response.getStatus()); assertEquals(BytesRestResponse.TEXT_CONTENT_TYPE, response.getContentType());