From 3506973761972df391cabef40b9858e6c05239f0 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 30 May 2024 16:33:24 -0400 Subject: [PATCH] Create ExecutionContext and show example with ActionPluginProxy Signed-off-by: Craig Perkins --- .../util/concurrent/ExecutionContext.java | 25 ++++++++++ .../common/util/concurrent/ThreadContext.java | 14 ++++++ .../main/java/org/opensearch/node/Node.java | 6 ++- .../opensearch/plugins/ActionPluginProxy.java | 49 +++++++++++++++++++ 4 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java create mode 100644 server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java new file mode 100644 index 0000000000000..bff824173df65 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +public class ExecutionContext { + private final ThreadLocal context = new ThreadLocal<>(); + + public void set(String value) { + context.set(value); + } + + public String get() { + return context.get(); + } + + public void clear() { + context.remove(); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 6580b0e0085ef..1505be8d8f891 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -115,6 +115,7 @@ public final class ThreadContext implements Writeable { private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); private final Map defaultHeader; private final ThreadLocal threadLocal; + private final ExecutionContext executionContext; private final int maxWarningHeaderCount; private final long maxWarningHeaderSize; private final List propagators; @@ -126,6 +127,7 @@ public final class ThreadContext implements Writeable { public ThreadContext(Settings settings) { this.defaultHeader = buildDefaultHeaders(settings); this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT); + this.executionContext = new ExecutionContext(); this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator())); @@ -139,6 +141,18 @@ public void unregisterThreadContextStatePropagator(final ThreadContextStatePropa propagators.remove(Objects.requireNonNull(propagator)); } + public void setExecutionContext(String pluginName) { + this.executionContext.set(pluginName); + } + + public String getExecutionContext() { + return this.executionContext.get(); + } + + public void clearExecutionContext() { + this.executionContext.clear(); + } + /** * Removes the current context and resets a default context. The removed context can be * restored by closing the returned {@link StoredContext}. diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 04bd31e6a5809..d10b4d38720fd 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,6 +185,7 @@ import org.opensearch.persistent.PersistentTasksExecutorRegistry; import org.opensearch.persistent.PersistentTasksService; import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.ActionPluginProxy; import org.opensearch.plugins.AnalysisPlugin; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.CircuitBreakerPlugin; @@ -979,7 +980,10 @@ protected Node( settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, - pluginsService.filterPlugins(ActionPlugin.class), + pluginsService.filterPlugins(ActionPlugin.class) + .stream() + .map(p -> ActionPluginProxy.newInstance(p, threadPool)) + .collect(Collectors.toList()), client, circuitBreakerService, usageService, diff --git a/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java b/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java new file mode 100644 index 0000000000000..93027af54d66c --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.threadpool.ThreadPool; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +public class ActionPluginProxy implements InvocationHandler { + private final ActionPlugin actionPlugin; + private final ThreadPool threadPool; + + public static ActionPlugin newInstance(ActionPlugin obj, ThreadPool threadPool) { + return (ActionPlugin) Proxy.newProxyInstance( + obj.getClass().getClassLoader(), + new Class[] { ActionPlugin.class }, + new ActionPluginProxy(obj, threadPool) + ); + } + + private ActionPluginProxy(ActionPlugin actionPlugin, ThreadPool threadPool) { + this.actionPlugin = actionPlugin; + this.threadPool = threadPool; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { + Object result; + try { + threadPool.getThreadContext().setExecutionContext(((Plugin) actionPlugin).getClass().getName()); + result = m.invoke(actionPlugin, args); + threadPool.getThreadContext().clearExecutionContext(); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); + } + return result; + } +}