Skip to content

Commit

Permalink
Update registration of transport actions
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Mar 17, 2023
1 parent 42bba17 commit 76671b7
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
Expand All @@ -47,6 +48,7 @@
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.ExtensionTransportActionsHandler;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestActionsRequestHandler;
Expand All @@ -58,7 +60,6 @@
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.rest.RestController;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -168,20 +169,21 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept
*
* @param restController The RestController on which to register Rest Actions.
* @param settingsModule The module that binds the provided settings to interface.
* @param actionsModule The module that binds transport actions.
* @param transportService The Node's transport service.
* @param clusterService The Node's cluster service.
* @param initialEnvironmentSettings The finalized view of settings for the Environment
* @param client The client used to make transport requests
*/
public void initializeServicesAndRestHandler(
RestController restController,
ActionModule actionModule,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings,
NodeClient client
) {
this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService);
this.restActionsRequestHandler = new RestActionsRequestHandler(actionModule.getRestController(), extensionIdMap, transportService);
this.customSettingsRequestHandler = new CustomSettingsRequestHandler(settingsModule);
this.transportService = transportService;
this.clusterService = clusterService;
Expand All @@ -192,7 +194,12 @@ public void initializeServicesAndRestHandler(
REQUEST_EXTENSION_UPDATE_SETTINGS
);
this.client = client;
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client);
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(
extensionIdMap,
transportService,
client,
actionModule.getExtensionActions()
);
registerRequestHandler();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionModule.DynamicActionRegistry;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.DiscoveryExtensionNode;
import org.opensearch.extensions.AcknowledgedResponse;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.RegisterTransportActionsRequest;
import org.opensearch.plugins.ActionPlugin.ActionHandler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ActionNotFoundTransportException;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -44,16 +45,19 @@ public class ExtensionTransportActionsHandler {
private final Map<String, DiscoveryExtensionNode> extensionIdMap;
private final TransportService transportService;
private final NodeClient client;
private DynamicActionRegistry dynamicActionRegistry;

public ExtensionTransportActionsHandler(
Map<String, DiscoveryExtensionNode> extensionIdMap,
TransportService transportService,
NodeClient client
NodeClient client,
DynamicActionRegistry dynamicActionRegistry
) {
this.actionsMap = new HashMap<>();
this.extensionIdMap = extensionIdMap;
this.transportService = transportService;
this.client = client;
this.dynamicActionRegistry = dynamicActionRegistry;
}

/**
Expand All @@ -64,10 +68,14 @@ public ExtensionTransportActionsHandler(
* @throws IllegalArgumentException when action being registered already is registered.
*/
void registerAction(String action, DiscoveryExtensionNode extension) throws IllegalArgumentException {
if (actionsMap.containsKey(action)) {
throw new IllegalArgumentException("The " + action + " you are trying to register is already registered");
// Register the action in this handler so it knows which extension owns it
if (actionsMap.putIfAbsent(action, extension) != null) {
throw new IllegalArgumentException("The action [" + action + "] you are trying to register is already registered");
}
actionsMap.putIfAbsent(action, extension);
// Register the action in the action module's extension actions map
dynamicActionRegistry.registerExtensionAction(
new ActionHandler<>(new ExtensionAction(action, extension.getId()), ExtensionTransportAction.class)
);
}

/**
Expand Down Expand Up @@ -111,10 +119,26 @@ public TransportResponse handleRegisterTransportActionsRequest(RegisterTransport
* @throws InterruptedException when message transport fails.
*/
public TransportResponse handleTransportActionRequestFromExtension(TransportActionRequestFromExtension request) throws Exception {
DiscoveryExtensionNode extension = extensionIdMap.get(request.getUniqueId());
final CompletableFuture<ExtensionActionResponse> inProgressFuture = new CompletableFuture<>();
String actionName = request.getAction();
String uniqueId = request.getUniqueId();
final TransportActionResponseToExtension response = new TransportActionResponseToExtension(new byte[0]);
// Validate that this action has been registered
ActionHandler<?, ?> handler = dynamicActionRegistry.get(actionName);
if (handler == null) {
byte[] responseBytes = ("Request failed: action [" + actionName + "] is not registered for extension [" + uniqueId + "].")
.getBytes(StandardCharsets.UTF_8);
response.setResponseBytes(responseBytes);
return response;
}
DiscoveryExtensionNode extension = extensionIdMap.get(uniqueId);
if (extension == null) {
byte[] responseBytes = ("Request failed: extension [" + uniqueId + "] can not be reached.").getBytes(StandardCharsets.UTF_8);
response.setResponseBytes(responseBytes);
return response;
}
final CompletableFuture<ExtensionActionResponse> inProgressFuture = new CompletableFuture<>();
client.execute(
// TODO change this to the registered action type
ExtensionProxyAction.INSTANCE,
new ExtensionActionRequest(request.getAction(), request.getRequestBytes()),
new ActionListener<ExtensionActionResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.extensions;
package org.opensearch.extensions.action;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ protected Node(
);
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
this.extensionsManager.initializeServicesAndRestHandler(
restController,
actionModule,
settingsModule,
transportService,
clusterService,
Expand Down Expand Up @@ -1112,8 +1112,18 @@ protected Node(
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry);
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
}),
actionModule.getExtensionActions(),
() -> clusterService.localNode().getId(),
transportService.getRemoteClusterService(),
namedWriteableRegistry
);
} else {
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry);
}
this.namedWriteableRegistry = namedWriteableRegistry;

logger.debug("initializing HTTP handlers ...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.Before;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.DiscoveryExtensionNode;
import org.opensearch.extensions.AcknowledgedResponse;
import org.opensearch.extensions.RegisterTransportActionsRequest;
import org.opensearch.extensions.rest.RestSendToExtensionActionTests;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.test.OpenSearchTestCase;
Expand Down

0 comments on commit 76671b7

Please sign in to comment.