diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginSetIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginSetIT.java deleted file mode 100644 index 8f7ff2702eb4c..0000000000000 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginSetIT.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.http; - -import org.opensearch.client.Request; -import org.opensearch.client.Response; -import org.opensearch.client.ResponseException; -import org.opensearch.plugins.Plugin; -import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; -import org.opensearch.test.OpenSearchIntegTestCase.Scope; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; - -/** - * Test a rest action that sets special response headers - */ -@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1) -public class ExecutionContextPluginSetIT extends HttpSmokeTestCase { - - @Override - protected boolean addMockHttpTransport() { - return false; // enable http - } - - @Override - protected Collection> nodePlugins() { - ArrayList> plugins = new ArrayList<>(super.nodePlugins()); - plugins.add(TestExecutionContextPlugin.class); - return plugins; - } - - public void testThatPluginCannotOverrideExecutionContext() throws IOException { - ensureGreen(); - try { - Response response = getRestClient().performRequest(new Request("POST", "/_set_execution_context")); - fail("request should have failed"); - } catch(ResponseException e) { - Response response = e.getResponse(); - String responseBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); - assertThat(response.getStatusLine().getStatusCode(), equalTo(400)); - assertThat(responseBody, containsString("ExecutionContext already present")); - } - } -} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java index 0f1f8eac9ff7c..a38e01602c7e4 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java @@ -62,6 +62,6 @@ public Collection createComponents( public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { - return List.of(new TestSetExecutionContextRestAction(threadPool), new TestGetExecutionContextRestAction(threadPool)); + return List.of(new TestGetExecutionContextRestAction(threadPool)); } } diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java index 3e796bf7a14da..a5e394656812c 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java @@ -17,6 +17,7 @@ import org.opensearch.threadpool.ThreadPool; import java.util.List; +import java.util.Stack; import static java.util.Collections.singletonList; import static org.opensearch.rest.RestRequest.Method.GET; @@ -42,8 +43,8 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - String pluginMainClass = threadPool.getThreadContext().getExecutionContext(); - RestResponse response = new BytesRestResponse(RestStatus.OK, pluginMainClass); + Stack pluginExecutionStack = threadPool.getThreadContext().getPluginExecutionStack(); + RestResponse response = new BytesRestResponse(RestStatus.OK, pluginExecutionStack.peek()); return channel -> channel.sendResponse(response); } } diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestSetExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestSetExecutionContextRestAction.java deleted file mode 100644 index 8c33eb8787720..0000000000000 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestSetExecutionContextRestAction.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.http; - -import org.opensearch.client.node.NodeClient; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.rest.BaseRestHandler; -import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestResponse; -import org.opensearch.threadpool.ThreadPool; - -import java.util.List; - -import static java.util.Collections.singletonList; -import static org.opensearch.rest.RestRequest.Method.POST; - -public class TestSetExecutionContextRestAction extends BaseRestHandler { - - private final ThreadPool threadPool; - - public TestSetExecutionContextRestAction(ThreadPool threadPool) { - this.threadPool = threadPool; - } - - @Override - public List routes() { - return singletonList(new Route(POST, "/_set_execution_context")); - } - - @Override - public String getName() { - return "test_set_execution_context_action"; - } - - @Override - public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - threadPool.getThreadContext().setExecutionContext("should-not-allow-plugin-to-set-execution-context"); - RestResponse response = new BytesRestResponse(RestStatus.OK, "Should not happen"); - return channel -> channel.sendResponse(response); - } -} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java index b32c7c39ffd27..f708c0abfffaf 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java @@ -8,25 +8,41 @@ package org.opensearch.common.util.concurrent; +import org.opensearch.plugins.Plugin; + +import java.util.Stack; + /** - * An ExecutionContext is a singular header within ThreadLocal that contains the identity of a plugin that is on - * the path of execution. + * An ExecutionContext is a singular header within ThreadLocal that contains the chain of plugins on the execution path */ public class ExecutionContext { - private final ThreadLocal context = new ThreadLocal<>(); + private final ThreadLocal> context = new ThreadLocal<>(); - public void set(String value) { - if (context.get() != null) { - throw new IllegalArgumentException("ExecutionContext already present"); + public void add(Plugin plugin) { + if (context.get() == null) { + context.set(new Stack<>()); } - context.set(value); + context.get().add(plugin.getClass().getCanonicalName()); } - public String get() { + public Stack get() { + if (context.get() == null) { + return null; + } return context.get(); } - public void clear() { - context.remove(); + public String peek() { + if (context.get() == null || context.get().isEmpty()) { + return null; + } + return context.get().peek(); + } + + public String pop() { + if (context.get() == null || context.get().isEmpty()) { + return null; + } + return context.get().pop(); } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 6127cfa92061e..6a61a41556470 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.http.HttpTransportSettings; +import org.opensearch.plugins.Plugin; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskThreadContextStatePropagator; @@ -60,6 +61,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.Stack; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; @@ -110,6 +112,7 @@ public final class ThreadContext implements Writeable { * Name for the {@link #stashWithOrigin origin} attribute. */ public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; + private static final String PLUGIN_EXECUTION_CONTEXT_HEADER = "_plugin_execution_context"; private static final Logger logger = LogManager.getLogger(ThreadContext.class); private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); @@ -141,16 +144,16 @@ public void unregisterThreadContextStatePropagator(final ThreadContextStatePropa propagators.remove(Objects.requireNonNull(propagator)); } - public void setExecutionContext(String pluginName) { - this.executionContext.set(pluginName); + void delegateExecutionToPlugin(Plugin plugin) { + this.executionContext.add(plugin); } - public String getExecutionContext() { + public Stack getPluginExecutionStack() { return this.executionContext.get(); } - public void clearExecutionContext() { - this.executionContext.clear(); + void returnToCore() { + this.executionContext.pop(); } /** @@ -190,6 +193,25 @@ public StoredContext stashContext() { }; } + /** + * Keeps the current context and also adds an entry into the plugin execution stack with the + * main class name of the plugin being delegated to + */ + public StoredContext switchContext(Plugin plugin) { + final ThreadContextStruct context = threadLocal.get(); + + delegateExecutionToPlugin(plugin); + threadLocal.set(context); + + return () -> { + // If the node and thus the threadLocal get closed while this task + // is still executing, we don't want this runnable to fail with an + // uncaught exception + returnToCore(); + threadLocal.set(context); + }; + } + /** * Captures the current thread context as writeable, allowing it to be serialized out later */ diff --git a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java index cda1cdfc8c4d3..74859ff51a978 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java +++ b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java @@ -8,7 +8,9 @@ package org.opensearch.rest; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; import org.opensearch.threadpool.ThreadPool; import java.lang.reflect.InvocationHandler; @@ -42,15 +44,12 @@ private RestHandlerProxy(RestHandler restHandler, ThreadPool threadPool, ActionP @Override public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { Object result; - try { - threadPool.getThreadContext().setExecutionContext(plugin.getClass().getName()); + try (ThreadContext.StoredContext threadContext = threadPool.getThreadContext().switchContext((Plugin) plugin)) { result = m.invoke(restHandler, args); } catch (InvocationTargetException e) { throw e.getTargetException(); } catch (Exception e) { throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); - } finally { - threadPool.getThreadContext().clearExecutionContext(); } return result; }