Skip to content

Commit

Permalink
Create ExecutionContext and show example with ActionPluginProxy
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed May 30, 2024
1 parent 9832972 commit 3506973
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

public class ExecutionContext {
private final ThreadLocal<String> context = new ThreadLocal<>();

public void set(String value) {
context.set(value);
}

public String get() {
return context.get();
}

public void clear() {
context.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public final class ThreadContext implements Writeable {
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
private final Map<String, String> defaultHeader;
private final ThreadLocal<ThreadContextStruct> threadLocal;
private final ExecutionContext executionContext;
private final int maxWarningHeaderCount;
private final long maxWarningHeaderSize;
private final List<ThreadContextStatePropagator> propagators;
Expand All @@ -126,6 +127,7 @@ public final class ThreadContext implements Writeable {
public ThreadContext(Settings settings) {
this.defaultHeader = buildDefaultHeaders(settings);
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.executionContext = new ExecutionContext();
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator()));
Expand All @@ -139,6 +141,18 @@ public void unregisterThreadContextStatePropagator(final ThreadContextStatePropa
propagators.remove(Objects.requireNonNull(propagator));
}

public void setExecutionContext(String pluginName) {
this.executionContext.set(pluginName);
}

public String getExecutionContext() {
return this.executionContext.get();
}

public void clearExecutionContext() {
this.executionContext.clear();
}

/**
* Removes the current context and resets a default context. The removed context can be
* restored by closing the returned {@link StoredContext}.
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import org.opensearch.persistent.PersistentTasksExecutorRegistry;
import org.opensearch.persistent.PersistentTasksService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ActionPluginProxy;
import org.opensearch.plugins.AnalysisPlugin;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.CircuitBreakerPlugin;
Expand Down Expand Up @@ -979,7 +980,10 @@ protected Node(
settingsModule.getClusterSettings(),
settingsModule.getSettingsFilter(),
threadPool,
pluginsService.filterPlugins(ActionPlugin.class),
pluginsService.filterPlugins(ActionPlugin.class)
.stream()
.map(p -> ActionPluginProxy.newInstance(p, threadPool))
.collect(Collectors.toList()),
client,
circuitBreakerService,
usageService,
Expand Down
49 changes: 49 additions & 0 deletions server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugins;

import org.opensearch.threadpool.ThreadPool;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class ActionPluginProxy implements InvocationHandler {
private final ActionPlugin actionPlugin;
private final ThreadPool threadPool;

public static ActionPlugin newInstance(ActionPlugin obj, ThreadPool threadPool) {
return (ActionPlugin) Proxy.newProxyInstance(
obj.getClass().getClassLoader(),
new Class<?>[] { ActionPlugin.class },
new ActionPluginProxy(obj, threadPool)
);
}

private ActionPluginProxy(ActionPlugin actionPlugin, ThreadPool threadPool) {
this.actionPlugin = actionPlugin;
this.threadPool = threadPool;
}

@Override
public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
Object result;
try {
threadPool.getThreadContext().setExecutionContext(((Plugin) actionPlugin).getClass().getName());
result = m.invoke(actionPlugin, args);
threadPool.getThreadContext().clearExecutionContext();
} catch (InvocationTargetException e) {
throw e.getTargetException();
} catch (Exception e) {
throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
}
return result;
}
}

0 comments on commit 3506973

Please sign in to comment.