Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.x] Execute remote actions on another extension #606

Merged
merged 2 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ The `ExtensionsManager` reads a list of extensions present in `extensions.yml`.

(27) The User receives the response.

#### Remote Action Execution on another Extension

Extensions may invoke actions on other extensions using the `ProxyAction` and `ProxyActionRequest`. The code sequence is shown below.

![](Docs/RemoteActionExecution.svg)

#### Extension Point Implementation Walk Through

An example of a more complex extension point, `getNamedXContent()` is shown below. A similar pattern can be followed for most extension points.
Expand Down
1 change: 1 addition & 0 deletions Docs/RemoteActionExecution.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 5 additions & 3 deletions src/main/java/org/opensearch/sdk/BaseExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

import java.io.IOException;

import com.google.inject.Inject;

/**
* An abstract class that simplifies extension initialization and provides an instance of the runner.
*/
Expand All @@ -21,7 +19,6 @@ public abstract class BaseExtension implements Extension {
/**
* The {@link ExtensionsRunner} instance running this extension
*/
@Inject
private ExtensionsRunner extensionsRunner;

/**
Expand Down Expand Up @@ -57,6 +54,11 @@ public ExtensionSettings getExtensionSettings() {
return this.settings;
}

@Override
public void setExtensionsRunner(ExtensionsRunner runner) {
this.extensionsRunner = runner;
}

/**
* Gets the {@link ExtensionsRunner} of this extension.
*
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/sdk/Extension.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
*/
public interface Extension {

/**
* Set the instance of {@link ExtensionsRunner} for this extension.
*
* @param runner The ExtensionsRunner instance.
*/
public void setExtensionsRunner(ExtensionsRunner runner);

/**
* Gets the {@link ExtensionSettings} of this extension.
*
Expand Down
45 changes: 41 additions & 4 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.extensions.DiscoveryExtensionNode;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.ExtensionsManager.RequestType;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsManager;
Expand All @@ -34,6 +35,7 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionRequestHandler;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ExtensionDependencyResponseHandler;
Expand Down Expand Up @@ -139,13 +141,15 @@ public class ExtensionsRunner {
private final SDKNamedWriteableRegistry sdkNamedWriteableRegistry;
private final SDKClient sdkClient;
private final SDKClusterService sdkClusterService;
private final SDKTransportService sdkTransportService;
private final SDKActionModule sdkActionModule;

private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(this);
private ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
private final ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(this);
private final ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private final ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);
private final ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);
private final ExtensionActionRequestHandler extensionsActionRequestHandler;

/**
* Instantiates a new update settings request handler
Expand All @@ -159,7 +163,10 @@ public class ExtensionsRunner {
* @throws IOException if the runner failed to read settings or API.
*/
protected ExtensionsRunner(Extension extension) throws IOException {
// Link these classes together
this.extension = extension;
extension.setExtensionsRunner(this);

// Initialize concrete classes needed by extensions
// These must have getters from this class to be accessible via createComponents
// If they require later initialization, create a concrete wrapper class and update the internals
Expand Down Expand Up @@ -187,6 +194,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
this.sdkClient = new SDKClient(extensionSettings);
// initialize SDKClusterService. Must happen after extension field assigned
this.sdkClusterService = new SDKClusterService(this);
// initialize SDKTransportService. Must happen after extension field assigned
this.sdkTransportService = new SDKTransportService();

// Create Guice modules for injection
List<com.google.inject.Module> modules = new ArrayList<>();
Expand All @@ -201,6 +210,7 @@ protected ExtensionsRunner(Extension extension) throws IOException {

b.bind(SDKClient.class).toInstance(getSdkClient());
b.bind(SDKClusterService.class).toInstance(getSdkClusterService());
b.bind(SDKTransportService.class).toInstance(getSdkTransportService());
});
// Bind the return values from create components
modules.add(this::injectComponents);
Expand All @@ -214,6 +224,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
// initialize SDKClient action map
initializeSdkClient();

extensionsActionRequestHandler = new ExtensionActionRequestHandler(getSdkClient());

if (extension instanceof ActionExtension) {
// store REST handlers in the registry
for (ExtensionRestHandler extensionRestHandler : ((ActionExtension) extension).getExtensionRestHandlers()) {
Expand Down Expand Up @@ -423,6 +435,25 @@ public void startTransportService(TransportService transportService) {
((request, channel, task) -> channel.sendResponse(updateSettingsRequestHandler.handleUpdateSettingsRequest(request)))
);

// This handles a remote extension request from OpenSearch or a plugin, sending an ExtensionActionResponse
transportService.registerRequestHandler(
ExtensionsManager.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionActionRequest::new,
((request, channel, task) -> channel.sendResponse(extensionsActionRequestHandler.handleExtensionActionRequest(request)))
);

// This handles a remote extension request from another extension, sending a RemoteExtensionActionResponse
transportService.registerRequestHandler(
ExtensionsManager.REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionActionRequest::new,
((request, channel, task) -> channel.sendResponse(extensionsActionRequestHandler.handleRemoteExtensionActionRequest(request)))
);
}

/**
Expand Down Expand Up @@ -670,6 +701,10 @@ public TransportService getExtensionTransportService() {
return extensionTransportService;
}

public SDKTransportService getSdkTransportService() {
return sdkTransportService;
}

/**
* Starts an ActionListener.
*
Expand All @@ -692,6 +727,8 @@ public static void run(Extension extension) throws IOException {
// initialize the transport service
NettyTransport nettyTransport = new NettyTransport(runner);
runner.extensionTransportService = nettyTransport.initializeExtensionTransportService(runner.getSettings(), runner.getThreadPool());
// TODO: merge above line with below line when refactoring out extensionTransportService
runner.getSdkTransportService().setTransportService(runner.extensionTransportService);
runner.startActionListener(0);
}

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand Down Expand Up @@ -91,6 +92,9 @@ public SDKClient(ExtensionSettings extensionSettings) {
// Used by client.execute, populated by initialize method
@SuppressWarnings("rawtypes")
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
// Used by remote client execution where we get a string for the class name
@SuppressWarnings("rawtypes")
private Map<String, ActionType> actionClassToInstanceMap = Collections.emptyMap();

/**
* Initialize this client.
Expand All @@ -100,6 +104,7 @@ public SDKClient(ExtensionSettings extensionSettings) {
@SuppressWarnings("rawtypes")
public void initialize(Map<ActionType, TransportAction> actions) {
this.actions = actions;
this.actionClassToInstanceMap = actions.keySet().stream().collect(Collectors.toMap(a -> a.getClass().getName(), a -> a));
}

/**
Expand Down Expand Up @@ -259,6 +264,17 @@ public void close() throws IOException {
doCloseHighLevelClient();
}

/**
* Gets an instance of {@link ActionType} from its corresponding class name, suitable for using as the first parameter in {@link #execute(ActionType, ActionRequest, ActionListener)}.
*
* @param className The class name of the action type
* @return The instance corresponding to the class name
*/
@SuppressWarnings("unchecked")
public ActionType<? extends ActionResponse> getActionFromClassName(String className) {
return actionClassToInstanceMap.get(className);
}

/**
* Executes a generic action, denoted by an {@link ActionType}.
*
Expand Down
132 changes: 132 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKTransportService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sdk;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.sdk.ActionExtension.ActionHandler;
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
import org.opensearch.transport.TransportService;

/**
* Wrapper class for {@link TransportService} and associated methods.
*
* TODO: Move all the sendFooRequest() methods here
* TODO: Replace usages of getExtensionTransportService with this class
* https://github.com/opensearch-project/opensearch-sdk-java/issues/585
*/
public class SDKTransportService {
private final Logger logger = LogManager.getLogger(SDKTransportService.class);

private TransportService transportService;
private DiscoveryNode opensearchNode;
private String uniqueId;

/**
* Requests that OpenSearch register the Transport Actions for this extension.
*
* @param actions The map of registered actions from {@link SDKActionModule#getActions()}
*/
public void sendRegisterTransportActionsRequest(Map<String, ActionHandler<?, ?>> actions) {
logger.info("Sending Register Transport Actions request to OpenSearch");
Set<String> actionNameSet = actions.values()
.stream()
.filter(h -> !h.getAction().name().startsWith("internal"))
.map(h -> h.getAction().getClass().getName())
.collect(Collectors.toSet());
AcknowledgedResponseHandler registerTransportActionsResponseHandler = new AcknowledgedResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS,
new RegisterTransportActionsRequest(uniqueId, actionNameSet),
registerTransportActionsResponseHandler
);
} catch (Exception e) {
logger.error("Failed to send Register Transport Actions request to OpenSearch", e);
}
}

/**
* Requests that OpenSearch execute a Transport Actions on another extension.
*
* @param request The request to send
* @return A buffer serializing the response from the remote action if successful, otherwise null
*/
public RemoteExtensionActionResponse sendRemoteExtensionActionRequest(RemoteExtensionActionRequest request) {
logger.info("Sending Remote Extension Action request to OpenSearch for [" + request.getAction() + "]");
// Combine class name string and request bytes
byte[] requestClassBytes = request.getRequestClass().getBytes(StandardCharsets.UTF_8);
byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + request.getRequestBytes().length)
.put(requestClassBytes)
.put(RemoteExtensionActionRequest.UNIT_SEPARATOR)
.put(request.getRequestBytes())
.array();
ExtensionActionResponseHandler extensionActionResponseHandler = new ExtensionActionResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.TRANSPORT_ACTION_REQUEST_FROM_EXTENSION,
new TransportActionRequestFromExtension(request.getAction(), proxyRequestBytes, uniqueId),
extensionActionResponseHandler
);
// Wait on response
extensionActionResponseHandler.awaitResponse();
} catch (TimeoutException e) {
logger.error("Failed to receive Remote Extension Action response from OpenSearch", e);
} catch (Exception e) {
logger.error("Failed to send Remote Extension Action request to OpenSearch", e);
}
// At this point, response handler has read in the response bytes
return new RemoteExtensionActionResponse(
extensionActionResponseHandler.isSuccess(),
extensionActionResponseHandler.getResponseBytes()
);
}

public TransportService getTransportService() {
return transportService;
}

public DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

public String getUniqueId() {
return uniqueId;
}

public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}

public void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

public void setUniqueId(String uniqueId) {
this.uniqueId = uniqueId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sdk.action;

import org.opensearch.action.ActionType;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;

/**
* The {@link ActionType} used as they key for the {@link RemoteExtensionTransportAction}.
*/
public class RemoteExtensionAction extends ActionType<RemoteExtensionActionResponse> {

/**
* The name to look up this action with
*/
public static final String NAME = "internal:remote-extension-action";
/**
* The singleton instance of this class
*/
public static final RemoteExtensionAction INSTANCE = new RemoteExtensionAction();

private RemoteExtensionAction() {
super(NAME, RemoteExtensionActionResponse::new);
}
}
Loading