Skip to content

Commit

Permalink
Create concept of pluginNodeClient that can be used for executing tra…
Browse files Browse the repository at this point in the history
…nsport actions as the plugin

Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Aug 7, 2024
1 parent 8fbf024 commit 76e8b04
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 41 deletions.
1 change: 1 addition & 0 deletions qa/smoke-test-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
testImplementation project(path: ':plugins:transport-reactor-netty4') // for http
testImplementation project(path: ':plugins:transport-nio')
testImplementation project(path: ':plugins:identity-shiro') // for http
testImplementation project(path: ':server')
}

integTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.http.executioncontextplugin.TestExecutionContextPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
* compatible open source license.
*/

package org.opensearch.http;
package org.opensearch.http.executioncontextplugin;

import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand All @@ -16,7 +17,7 @@
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand Down Expand Up @@ -59,6 +60,11 @@ public Collection<Object> createComponents(
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
return List.of(new TestGetExecutionContextRestAction(contextSwitcher, threadPool));
return List.of(new TestGetExecutionContextRestAction(pluginNodeClient, threadPool));
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionHandler<>(TestGetExecutionContextAction.INSTANCE, TestGetExecutionContextTransportAction.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.executioncontextplugin;

import org.opensearch.action.ActionType;

/**
* Test action to get the name of the plugin executing transport actions
*/
public class TestGetExecutionContextAction extends ActionType<TestGetExecutionContextResponse> {
/**
* Get execution context action instance
*/
public static final TestGetExecutionContextAction INSTANCE = new TestGetExecutionContextAction();
/**
* Get execution context action name
*/
public static final String NAME = "cluster:admin/executioncontext";

private TestGetExecutionContextAction() {
super(NAME, TestGetExecutionContextResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.executioncontextplugin;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request object for GetExecutionContext transport action
*/
public class TestGetExecutionContextRequest extends ActionRequest {

/**
* Default constructor
*/
public TestGetExecutionContextRequest() {}

/**
* Constructor with stream input
* @param in the stream input
* @throws IOException IOException
*/
public TestGetExecutionContextRequest(final StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.executioncontextplugin;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Response object containing the name of the plugin executing the transport action
*/
public class TestGetExecutionContextResponse extends ActionResponse implements ToXContentObject {
private final String pluginClassName;

/**
* Default constructor
*
* @param pluginClassName the canonical class name of the plugin executing the transport action
*/
public TestGetExecutionContextResponse(String pluginClassName) {
this.pluginClassName = pluginClassName;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pluginClassName);
}

/**
* Constructor with StreamInput
*
* @param in the stream input
*/
public TestGetExecutionContextResponse(final StreamInput in) throws IOException {
pluginClassName = in.readString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("plugin_execution_context", pluginClassName);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
* compatible open source license.
*/

package org.opensearch.http;
package org.opensearch.http.executioncontextplugin;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.client.node.PluginNodeClient;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;
Expand All @@ -25,11 +22,11 @@

public class TestGetExecutionContextRestAction extends BaseRestHandler {

private final ContextSwitcher contextSwitcher;
private final PluginNodeClient pluginNodeClient;
private final ThreadPool threadPool;

public TestGetExecutionContextRestAction(ContextSwitcher contextSwitcher, ThreadPool threadPool) {
this.contextSwitcher = contextSwitcher;
public TestGetExecutionContextRestAction(PluginNodeClient pluginNodeClient, ThreadPool threadPool) {
this.pluginNodeClient = pluginNodeClient;
this.threadPool = threadPool;
}

Expand All @@ -45,11 +42,7 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String stashedContext;
try (ThreadContext.StoredContext storedContext = contextSwitcher.switchContext()) {
stashedContext = threadPool.getThreadContext().getHeader(ThreadContext.PLUGIN_EXECUTION_CONTEXT);
}
RestResponse response = new BytesRestResponse(RestStatus.OK, stashedContext);
return channel -> channel.sendResponse(response);
final TestGetExecutionContextRequest getExecutionContextRequest = new TestGetExecutionContextRequest();
return channel -> pluginNodeClient.executeLocally(TestGetExecutionContextAction.INSTANCE, getExecutionContextRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.executioncontextplugin;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport action for GetExecutionContext.
*
* Returns the canonical class name of the plugin that is currently executing the transport action.
*/
public class TestGetExecutionContextTransportAction extends HandledTransportAction<TestGetExecutionContextRequest, TestGetExecutionContextResponse> {
private final TransportService transportService;

@Inject
public TestGetExecutionContextTransportAction(TransportService transportService, ActionFilters actionFilters) {
super(TestGetExecutionContextAction.NAME, transportService, actionFilters, TestGetExecutionContextRequest::new);
this.transportService = transportService;
}

@Override
protected void doExecute(Task task, TestGetExecutionContextRequest request, ActionListener<TestGetExecutionContextResponse> listener) {
String pluginClassName = transportService.getThreadPool().getThreadContext().getHeader(ThreadContext.PLUGIN_EXECUTION_CONTEXT);
listener.onResponse(new TestGetExecutionContextResponse(pluginClassName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public String getLocalNodeId() {
* Get the {@link TransportAction} for an {@link ActionType}, throwing exceptions if the action isn't available.
*/
@SuppressWarnings("unchecked")
private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
protected <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
ActionType<Response> action
) {
if (actionRegistry == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.node;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.PluginContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.plugins.Plugin;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskListener;
import org.opensearch.threadpool.ThreadPool;

/**
* Client that executes actions on the local node with contextual information about the plugin executing
* transport actions.
*
* @opensearch.api
*/
public class PluginNodeClient extends NodeClient {
private final PluginContextSwitcher contextSwitcher;

public PluginNodeClient(Settings settings, ThreadPool threadPool, Plugin plugin) {
super(settings, threadPool);
contextSwitcher = new PluginContextSwitcher(threadPool, plugin);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
try (ThreadContext.StoredContext storedContext = contextSwitcher.switchContext()) {
// Discard the task because the Client interface doesn't use it.
executeLocally(action, request, listener);
}
}

/**
* Execute an {@link ActionType} locally, returning that {@link Task} used to track it, and linking an {@link ActionListener}.
* Prefer this method if you don't need access to the task when listening for the response. This is the method used to implement
* the {@link Client} interface.
*
* This client will execute the transport action in the context of the plugin executing this action
*/
@Override
public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
Task task;
try (ThreadContext.StoredContext storedContext = contextSwitcher.switchContext()) {
// Discard the task because the Client interface doesn't use it.
task = transportAction(action).execute(request, listener);
}
return task;
}

/**
* Execute an {@link ActionType} locally, returning that {@link Task} used to track it, and linking an {@link TaskListener}. Prefer this
* method if you need access to the task when listening for the response.
*
* This client will execute the transport action in the context of the plugin executing this action
*/
@Override
public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
ActionType<Response> action,
Request request,
TaskListener<Response> listener
) {
Task task;
try (ThreadContext.StoredContext storedContext = contextSwitcher.switchContext()) {
task = transportAction(action).execute(request, listener);
}
return task;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@

package org.opensearch.common.util.concurrent;

import org.opensearch.common.annotation.PublicApi;

/**
* ContextSwitcher interface
*
* @opensearch.api
* @opensearch.internal
*/
@PublicApi(since = "2.17.0")
public interface ContextSwitcher {
ThreadContext.StoredContext switchContext();
}
Loading

0 comments on commit 76e8b04

Please sign in to comment.