From 3506973761972df391cabef40b9858e6c05239f0 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 30 May 2024 16:33:24 -0400 Subject: [PATCH 01/12] Create ExecutionContext and show example with ActionPluginProxy Signed-off-by: Craig Perkins --- .../util/concurrent/ExecutionContext.java | 25 ++++++++++ .../common/util/concurrent/ThreadContext.java | 14 ++++++ .../main/java/org/opensearch/node/Node.java | 6 ++- .../opensearch/plugins/ActionPluginProxy.java | 49 +++++++++++++++++++ 4 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java create mode 100644 server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java new file mode 100644 index 0000000000000..bff824173df65 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +public class ExecutionContext { + private final ThreadLocal context = new ThreadLocal<>(); + + public void set(String value) { + context.set(value); + } + + public String get() { + return context.get(); + } + + public void clear() { + context.remove(); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 6580b0e0085ef..1505be8d8f891 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -115,6 +115,7 @@ public final class ThreadContext implements Writeable { private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); private final Map defaultHeader; private final ThreadLocal threadLocal; + private final ExecutionContext executionContext; private final int maxWarningHeaderCount; private final long maxWarningHeaderSize; private final List propagators; @@ -126,6 +127,7 @@ public final class ThreadContext implements Writeable { public ThreadContext(Settings settings) { this.defaultHeader = buildDefaultHeaders(settings); this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT); + this.executionContext = new ExecutionContext(); this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator())); @@ -139,6 +141,18 @@ public void unregisterThreadContextStatePropagator(final ThreadContextStatePropa propagators.remove(Objects.requireNonNull(propagator)); } + public void setExecutionContext(String pluginName) { + this.executionContext.set(pluginName); + } + + public String getExecutionContext() { + return this.executionContext.get(); + } + + public void clearExecutionContext() { + this.executionContext.clear(); + } + /** * Removes the current context and resets a default context. The removed context can be * restored by closing the returned {@link StoredContext}. diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 04bd31e6a5809..d10b4d38720fd 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,6 +185,7 @@ import org.opensearch.persistent.PersistentTasksExecutorRegistry; import org.opensearch.persistent.PersistentTasksService; import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.ActionPluginProxy; import org.opensearch.plugins.AnalysisPlugin; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.CircuitBreakerPlugin; @@ -979,7 +980,10 @@ protected Node( settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, - pluginsService.filterPlugins(ActionPlugin.class), + pluginsService.filterPlugins(ActionPlugin.class) + .stream() + .map(p -> ActionPluginProxy.newInstance(p, threadPool)) + .collect(Collectors.toList()), client, circuitBreakerService, usageService, diff --git a/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java b/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java new file mode 100644 index 0000000000000..93027af54d66c --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.threadpool.ThreadPool; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +public class ActionPluginProxy implements InvocationHandler { + private final ActionPlugin actionPlugin; + private final ThreadPool threadPool; + + public static ActionPlugin newInstance(ActionPlugin obj, ThreadPool threadPool) { + return (ActionPlugin) Proxy.newProxyInstance( + obj.getClass().getClassLoader(), + new Class[] { ActionPlugin.class }, + new ActionPluginProxy(obj, threadPool) + ); + } + + private ActionPluginProxy(ActionPlugin actionPlugin, ThreadPool threadPool) { + this.actionPlugin = actionPlugin; + this.threadPool = threadPool; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { + Object result; + try { + threadPool.getThreadContext().setExecutionContext(((Plugin) actionPlugin).getClass().getName()); + result = m.invoke(actionPlugin, args); + threadPool.getThreadContext().clearExecutionContext(); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); + } + return result; + } +} From f8cf238079ba58969c01cdfdd6a9c5c977f6ec2c Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 30 May 2024 16:46:45 -0400 Subject: [PATCH 02/12] Only allow core to set the ExecutionContext Signed-off-by: Craig Perkins --- .../opensearch/common/util/concurrent/ExecutionContext.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java index bff824173df65..251454d8ec868 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java @@ -12,6 +12,9 @@ public class ExecutionContext { private final ThreadLocal context = new ThreadLocal<>(); public void set(String value) { + if (context.get() != null) { + throw new IllegalArgumentException("ExecutionContext already present"); + } context.set(value); } From b6b2a1939427dfe248d1540907fc261703235673 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Fri, 31 May 2024 12:35:55 -0400 Subject: [PATCH 03/12] WIP on plugin aware thread context Signed-off-by: Craig Perkins --- .../http/ExecutionContextPluginIT.java | 79 ++++++++++++++++ .../http/TestExecutionContextPlugin.java | 91 +++++++++++++++++++ .../http/TestExecutionContextRestAction.java | 72 +++++++++++++++ .../org/opensearch/action/ActionModule.java | 4 +- .../org/opensearch/rest/RestHandlerProxy.java | 53 +++++++++++ 5 files changed, 298 insertions(+), 1 deletion(-) create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java create mode 100644 server/src/main/java/org/opensearch/rest/RestHandlerProxy.java diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java new file mode 100644 index 0000000000000..aa777a2f005d5 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.http; + +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Test a rest action that sets special response headers + */ +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1) +public class ExecutionContextPluginIT extends HttpSmokeTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Collection> nodePlugins() { + ArrayList> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestExecutionContextPlugin.class); + return plugins; + } + + public void testThatSettingHeadersWorks() throws IOException { + ensureGreen(); + try { + Response response = getRestClient().performRequest(new Request("GET", "/_execution_context")); + System.out.println("Response body: " + new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8)); +// fail("request should have failed"); + } catch(ResponseException e) { + Response response = e.getResponse(); + assertThat(response.getStatusLine().getStatusCode(), equalTo(401)); + assertThat(response.getHeader("Secret"), equalTo("required")); + } + } +} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java new file mode 100644 index 0000000000000..5fa301a271308 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.http; + +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static java.util.Collections.singletonList; + +public class TestExecutionContextPlugin extends Plugin implements ActionPlugin { + + private ThreadPool threadPool; + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver expressionResolver, + Supplier repositoriesServiceSupplier + ) { + this.threadPool = threadPool; + return Collections.emptyList(); + } + + @Override + public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster) { + return singletonList(new TestExecutionContextRestAction(threadPool)); + } +} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java new file mode 100644 index 0000000000000..a2562c8efa027 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.http; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.GET; + +public class TestExecutionContextRestAction extends BaseRestHandler { + + private final ThreadPool threadPool; + + public TestExecutionContextRestAction(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public List routes() { + return singletonList(new Route(GET, "/_execution_context")); + } + + @Override + public String getName() { + return "test_execution_context_action"; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + System.out.println("Plugin execution context: " + threadPool.getThreadContext().getExecutionContext()); + threadPool.getThreadContext().setExecutionContext("should-not-allow-plugin-to-set-execution-context"); + RestResponse response = new BytesRestResponse(RestStatus.OK, "Should not happen"); + return channel -> channel.sendResponse(response); + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 5e2b62614fc47..893dadd432f8f 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -322,6 +322,7 @@ import org.opensearch.rest.NamedRoute; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestHandlerProxy; import org.opensearch.rest.RestHeaderDefinition; import org.opensearch.rest.action.RestFieldCapabilitiesAction; import org.opensearch.rest.action.RestMainAction; @@ -993,7 +994,8 @@ public void initRestHandlers(Supplier nodesInCluster) { indexNameExpressionResolver, nodesInCluster )) { - registerHandler.accept(handler); + RestHandler handlerProxy = RestHandlerProxy.newInstance(handler, threadPool, plugin); + registerHandler.accept(handlerProxy); } } registerHandler.accept(new RestCatAction(catActions)); diff --git a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java new file mode 100644 index 0000000000000..f1c3081214e6e --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +public class RestHandlerProxy implements InvocationHandler { + private final RestHandler restHandler; + private final ThreadPool threadPool; + private final ActionPlugin plugin; + + public static RestHandler newInstance(RestHandler obj, ThreadPool threadPool, ActionPlugin plugin) { + return (RestHandler) Proxy.newProxyInstance( + obj.getClass().getClassLoader(), + new Class[] { RestHandler.class }, + new RestHandlerProxy(obj, threadPool, plugin) + ); + } + + private RestHandlerProxy(RestHandler restHandler, ThreadPool threadPool, ActionPlugin plugin) { + this.restHandler = restHandler; + this.threadPool = threadPool; + this.plugin = plugin; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { + Object result; + try { + threadPool.getThreadContext().setExecutionContext(Proxy.getInvocationHandler(plugin).getClass().getName()); + result = m.invoke(restHandler, args); + threadPool.getThreadContext().clearExecutionContext(); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); + } + return result; + } +} From 8165f05d401ba0b8ec7acd824c6bdd03c714e673 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Fri, 31 May 2024 15:09:32 -0400 Subject: [PATCH 04/12] Plugin Aware API Handling Signed-off-by: Craig Perkins --- .../opensearch/http/ExecutionContextPluginIT.java | 12 +++++++----- server/src/main/java/org/opensearch/node/Node.java | 6 +----- .../java/org/opensearch/rest/RestHandlerProxy.java | 3 +-- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java index aa777a2f005d5..f6c6e82888bff 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java @@ -44,6 +44,8 @@ import java.util.ArrayList; import java.util.Collection; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; /** @@ -64,16 +66,16 @@ protected Collection> nodePlugins() { return plugins; } - public void testThatSettingHeadersWorks() throws IOException { + public void testThatPluginCannotOverrideExecutionContext() throws IOException { ensureGreen(); try { Response response = getRestClient().performRequest(new Request("GET", "/_execution_context")); - System.out.println("Response body: " + new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8)); -// fail("request should have failed"); + fail("request should have failed"); } catch(ResponseException e) { Response response = e.getResponse(); - assertThat(response.getStatusLine().getStatusCode(), equalTo(401)); - assertThat(response.getHeader("Secret"), equalTo("required")); + String responseBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + assertThat(response.getStatusLine().getStatusCode(), equalTo(400)); + assertThat(responseBody, containsString("ExecutionContext already present")); } } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d10b4d38720fd..04bd31e6a5809 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,7 +185,6 @@ import org.opensearch.persistent.PersistentTasksExecutorRegistry; import org.opensearch.persistent.PersistentTasksService; import org.opensearch.plugins.ActionPlugin; -import org.opensearch.plugins.ActionPluginProxy; import org.opensearch.plugins.AnalysisPlugin; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.CircuitBreakerPlugin; @@ -980,10 +979,7 @@ protected Node( settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, - pluginsService.filterPlugins(ActionPlugin.class) - .stream() - .map(p -> ActionPluginProxy.newInstance(p, threadPool)) - .collect(Collectors.toList()), + pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, diff --git a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java index f1c3081214e6e..945e927c40732 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java +++ b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java @@ -9,7 +9,6 @@ package org.opensearch.rest; import org.opensearch.plugins.ActionPlugin; -import org.opensearch.plugins.Plugin; import org.opensearch.threadpool.ThreadPool; import java.lang.reflect.InvocationHandler; @@ -40,7 +39,7 @@ private RestHandlerProxy(RestHandler restHandler, ThreadPool threadPool, ActionP public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { Object result; try { - threadPool.getThreadContext().setExecutionContext(Proxy.getInvocationHandler(plugin).getClass().getName()); + threadPool.getThreadContext().setExecutionContext(plugin.getClass().getName()); result = m.invoke(restHandler, args); threadPool.getThreadContext().clearExecutionContext(); } catch (InvocationTargetException e) { From 5b989d6cd8bdcafee7923d9d467ebf0fcc685f6c Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 11 Jun 2024 16:08:40 -0400 Subject: [PATCH 05/12] Add test to verify that ExecutionContext is being populated during RestHandling Signed-off-by: Craig Perkins --- .../http/ExecutionContextPluginGetIT.java | 51 +++++++++++++ ....java => ExecutionContextPluginSetIT.java} | 28 +------- .../http/TestExecutionContextPlugin.java | 26 +------ .../http/TestExecutionContextRestAction.java | 72 ------------------- .../TestGetExecutionContextRestAction.java | 49 +++++++++++++ .../TestSetExecutionContextRestAction.java | 48 +++++++++++++ .../util/concurrent/ExecutionContext.java | 4 ++ .../opensearch/plugins/ActionPluginProxy.java | 49 ------------- .../org/opensearch/rest/RestHandlerProxy.java | 4 ++ 9 files changed, 159 insertions(+), 172 deletions(-) create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginGetIT.java rename qa/smoke-test-http/src/test/java/org/opensearch/http/{ExecutionContextPluginIT.java => ExecutionContextPluginSetIT.java} (65%) delete mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/TestSetExecutionContextRestAction.java delete mode 100644 server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginGetIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginGetIT.java new file mode 100644 index 0000000000000..95f8898862357 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginGetIT.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +/** + * Test a rest action that sets special response headers + */ +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1) +public class ExecutionContextPluginGetIT extends HttpSmokeTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Collection> nodePlugins() { + ArrayList> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestExecutionContextPlugin.class); + return plugins; + } + + public void testGetExecutionContext() throws IOException { + ensureGreen(); + Response response = getRestClient().performRequest(new Request("GET", "/_get_execution_context")); + String responseBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(responseBody, containsString(TestExecutionContextPlugin.class.getName())); + } +} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginSetIT.java similarity index 65% rename from qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java rename to qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginSetIT.java index f6c6e82888bff..8f7ff2702eb4c 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginSetIT.java @@ -6,33 +6,9 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.http; import org.opensearch.client.Request; -import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; import org.opensearch.client.ResponseException; import org.opensearch.plugins.Plugin; @@ -52,7 +28,7 @@ * Test a rest action that sets special response headers */ @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1) -public class ExecutionContextPluginIT extends HttpSmokeTestCase { +public class ExecutionContextPluginSetIT extends HttpSmokeTestCase { @Override protected boolean addMockHttpTransport() { @@ -69,7 +45,7 @@ protected Collection> nodePlugins() { public void testThatPluginCannotOverrideExecutionContext() throws IOException { ensureGreen(); try { - Response response = getRestClient().performRequest(new Request("GET", "/_execution_context")); + Response response = getRestClient().performRequest(new Request("POST", "/_set_execution_context")); fail("request should have failed"); } catch(ResponseException e) { Response response = e.getResponse(); diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java index 5fa301a271308..0f1f8eac9ff7c 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java @@ -6,30 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.http; import org.opensearch.client.Client; @@ -86,6 +62,6 @@ public Collection createComponents( public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { - return singletonList(new TestExecutionContextRestAction(threadPool)); + return List.of(new TestSetExecutionContextRestAction(threadPool), new TestGetExecutionContextRestAction(threadPool)); } } diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java deleted file mode 100644 index a2562c8efa027..0000000000000 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.http; - -import org.opensearch.client.node.NodeClient; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.rest.BaseRestHandler; -import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestResponse; -import org.opensearch.threadpool.ThreadPool; - -import java.util.List; - -import static java.util.Collections.singletonList; -import static org.opensearch.rest.RestRequest.Method.GET; - -public class TestExecutionContextRestAction extends BaseRestHandler { - - private final ThreadPool threadPool; - - public TestExecutionContextRestAction(ThreadPool threadPool) { - this.threadPool = threadPool; - } - - @Override - public List routes() { - return singletonList(new Route(GET, "/_execution_context")); - } - - @Override - public String getName() { - return "test_execution_context_action"; - } - - @Override - public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - System.out.println("Plugin execution context: " + threadPool.getThreadContext().getExecutionContext()); - threadPool.getThreadContext().setExecutionContext("should-not-allow-plugin-to-set-execution-context"); - RestResponse response = new BytesRestResponse(RestStatus.OK, "Should not happen"); - return channel -> channel.sendResponse(response); - } -} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java new file mode 100644 index 0000000000000..3e796bf7a14da --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.GET; +import static org.opensearch.rest.RestRequest.Method.POST; + +public class TestGetExecutionContextRestAction extends BaseRestHandler { + + private final ThreadPool threadPool; + + public TestGetExecutionContextRestAction(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public List routes() { + return singletonList(new Route(GET, "/_get_execution_context")); + } + + @Override + public String getName() { + return "test_get_execution_context_action"; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String pluginMainClass = threadPool.getThreadContext().getExecutionContext(); + RestResponse response = new BytesRestResponse(RestStatus.OK, pluginMainClass); + return channel -> channel.sendResponse(response); + } +} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestSetExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestSetExecutionContextRestAction.java new file mode 100644 index 0000000000000..8c33eb8787720 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestSetExecutionContextRestAction.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.POST; + +public class TestSetExecutionContextRestAction extends BaseRestHandler { + + private final ThreadPool threadPool; + + public TestSetExecutionContextRestAction(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public List routes() { + return singletonList(new Route(POST, "/_set_execution_context")); + } + + @Override + public String getName() { + return "test_set_execution_context_action"; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + threadPool.getThreadContext().setExecutionContext("should-not-allow-plugin-to-set-execution-context"); + RestResponse response = new BytesRestResponse(RestStatus.OK, "Should not happen"); + return channel -> channel.sendResponse(response); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java index 251454d8ec868..b32c7c39ffd27 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java @@ -8,6 +8,10 @@ package org.opensearch.common.util.concurrent; +/** + * An ExecutionContext is a singular header within ThreadLocal that contains the identity of a plugin that is on + * the path of execution. + */ public class ExecutionContext { private final ThreadLocal context = new ThreadLocal<>(); diff --git a/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java b/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java deleted file mode 100644 index 93027af54d66c..0000000000000 --- a/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugins; - -import org.opensearch.threadpool.ThreadPool; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; - -public class ActionPluginProxy implements InvocationHandler { - private final ActionPlugin actionPlugin; - private final ThreadPool threadPool; - - public static ActionPlugin newInstance(ActionPlugin obj, ThreadPool threadPool) { - return (ActionPlugin) Proxy.newProxyInstance( - obj.getClass().getClassLoader(), - new Class[] { ActionPlugin.class }, - new ActionPluginProxy(obj, threadPool) - ); - } - - private ActionPluginProxy(ActionPlugin actionPlugin, ThreadPool threadPool) { - this.actionPlugin = actionPlugin; - this.threadPool = threadPool; - } - - @Override - public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { - Object result; - try { - threadPool.getThreadContext().setExecutionContext(((Plugin) actionPlugin).getClass().getName()); - result = m.invoke(actionPlugin, args); - threadPool.getThreadContext().clearExecutionContext(); - } catch (InvocationTargetException e) { - throw e.getTargetException(); - } catch (Exception e) { - throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); - } - return result; - } -} diff --git a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java index 945e927c40732..be01b000b683d 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java +++ b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java @@ -16,6 +16,10 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; +/** + * RestHandlerProxy is a wrapper around {@link RestHandler} that populates the ExecutionContext prior + * to delegating execution to a plugin for handling a REST Request + */ public class RestHandlerProxy implements InvocationHandler { private final RestHandler restHandler; private final ThreadPool threadPool; From 312c3b75a97ddc25e27ba0a728f8af1bfb91150e Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 11 Jun 2024 16:09:17 -0400 Subject: [PATCH 06/12] Create extension point for a single plugin to receive the registry of system indices Signed-off-by: Craig Perkins --- .../main/java/org/opensearch/node/Node.java | 33 +++++++++++++++++-- .../opensearch/plugins/SystemIndexPlugin.java | 12 +++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index afefb2f390636..ade2f368864c0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -813,14 +813,43 @@ protected Node( .flatMap(m -> m.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List systemIndexPlugins = pluginsService.filterPlugins(SystemIndexPlugin.class); final Map> systemIndexDescriptorMap = Collections.unmodifiableMap( - pluginsService.filterPlugins(SystemIndexPlugin.class) - .stream() + systemIndexPlugins.stream() .collect( Collectors.toMap(plugin -> plugin.getClass().getSimpleName(), plugin -> plugin.getSystemIndexDescriptors(settings)) ) ); + + final Map> systemIndexMap = Collections.unmodifiableMap( + systemIndexPlugins.stream() + .collect( + Collectors.toMap( + plugin -> plugin.getClass().getSimpleName(), + plugin -> plugin.getSystemIndexDescriptors(settings) + .stream() + .map(SystemIndexDescriptor::getIndexPattern) + .collect(Collectors.toSet()) + ) + ) + ); + final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap); + Function>, Void> onSystemIndex = null; + for (SystemIndexPlugin plugin : systemIndexPlugins) { + Function>, Void> newOnSystemIndex = plugin.onSystemIndices(Collections.emptyMap()); + if (newOnSystemIndex != null) { + logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getName()); + if (onSystemIndex != null) { + throw new IllegalArgumentException("Cannot have more than one plugin implementing onSystemIndex"); + } + onSystemIndex = newOnSystemIndex; + } + } + + if (onSystemIndex != null) { + onSystemIndex.apply(systemIndexMap); + } final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); rerouteServiceReference.set(rerouteService); diff --git a/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java b/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java index 4937a5ed091dc..5feb33e91091d 100644 --- a/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java @@ -37,6 +37,9 @@ import java.util.Collection; import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; /** * Plugin for defining system indices. Extends {@link ActionPlugin} because system indices must be accessed via APIs @@ -55,4 +58,13 @@ public interface SystemIndexPlugin extends ActionPlugin { default Collection getSystemIndexDescriptors(Settings settings) { return Collections.emptyList(); } + + /** + * This function passes the registered system indices to a plugin. + * + * Note: Only one installed plugin may implement onSystemIndices. + */ + default Function>, Void> onSystemIndices(Map> systemIndices) { + return null; + } } From ed9fd9052a5b9693ca5b52720ddbbac9d75feaf7 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 11 Jun 2024 16:15:27 -0400 Subject: [PATCH 07/12] Move to after createComponents Signed-off-by: Craig Perkins --- .../main/java/org/opensearch/node/Node.java | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ade2f368864c0..1f2fc5ae94e22 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -821,36 +821,6 @@ protected Node( ) ); - final Map> systemIndexMap = Collections.unmodifiableMap( - systemIndexPlugins.stream() - .collect( - Collectors.toMap( - plugin -> plugin.getClass().getSimpleName(), - plugin -> plugin.getSystemIndexDescriptors(settings) - .stream() - .map(SystemIndexDescriptor::getIndexPattern) - .collect(Collectors.toSet()) - ) - ) - ); - - final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap); - Function>, Void> onSystemIndex = null; - for (SystemIndexPlugin plugin : systemIndexPlugins) { - Function>, Void> newOnSystemIndex = plugin.onSystemIndices(Collections.emptyMap()); - if (newOnSystemIndex != null) { - logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getName()); - if (onSystemIndex != null) { - throw new IllegalArgumentException("Cannot have more than one plugin implementing onSystemIndex"); - } - onSystemIndex = newOnSystemIndex; - } - } - - if (onSystemIndex != null) { - onSystemIndex.apply(systemIndexMap); - } - final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); rerouteServiceReference.set(rerouteService); clusterService.setRerouteService(rerouteService); @@ -991,6 +961,36 @@ protected Node( // Add the telemetryAwarePlugin components to the existing pluginComponents collection. pluginComponents.addAll(telemetryAwarePluginComponents); + final Map> systemIndexMap = Collections.unmodifiableMap( + systemIndexPlugins.stream() + .collect( + Collectors.toMap( + plugin -> plugin.getClass().getSimpleName(), + plugin -> plugin.getSystemIndexDescriptors(settings) + .stream() + .map(SystemIndexDescriptor::getIndexPattern) + .collect(Collectors.toSet()) + ) + ) + ); + + final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap); + Function>, Void> onSystemIndex = null; + for (SystemIndexPlugin plugin : systemIndexPlugins) { + Function>, Void> newOnSystemIndex = plugin.onSystemIndices(Collections.emptyMap()); + if (newOnSystemIndex != null) { + logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getName()); + if (onSystemIndex != null) { + throw new IllegalArgumentException("Cannot have more than one plugin implementing onSystemIndex"); + } + onSystemIndex = newOnSystemIndex; + } + } + + if (onSystemIndex != null) { + onSystemIndex.apply(systemIndexMap); + } + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( From 3cc88d12b957eaefdc91f048c9aa8298ba329879 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 11 Jun 2024 16:18:16 -0400 Subject: [PATCH 08/12] Small change Signed-off-by: Craig Perkins --- server/src/main/java/org/opensearch/node/Node.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1f2fc5ae94e22..acc5fb0cb5d85 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -820,6 +820,7 @@ protected Node( Collectors.toMap(plugin -> plugin.getClass().getSimpleName(), plugin -> plugin.getSystemIndexDescriptors(settings)) ) ); + final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap); final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); rerouteServiceReference.set(rerouteService); @@ -974,7 +975,6 @@ protected Node( ) ); - final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap); Function>, Void> onSystemIndex = null; for (SystemIndexPlugin plugin : systemIndexPlugins) { Function>, Void> newOnSystemIndex = plugin.onSystemIndices(Collections.emptyMap()); From 4f90099bfb884de274b682e60331d0d7f3ff9d8b Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 11 Jun 2024 17:03:05 -0400 Subject: [PATCH 09/12] Change to Consumer Signed-off-by: Craig Perkins --- server/src/main/java/org/opensearch/node/Node.java | 7 ++++--- .../java/org/opensearch/plugins/SystemIndexPlugin.java | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index acc5fb0cb5d85..3ea14e4d2614d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -286,6 +286,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -975,9 +976,9 @@ protected Node( ) ); - Function>, Void> onSystemIndex = null; + Consumer>> onSystemIndex = null; for (SystemIndexPlugin plugin : systemIndexPlugins) { - Function>, Void> newOnSystemIndex = plugin.onSystemIndices(Collections.emptyMap()); + Consumer>> newOnSystemIndex = plugin.onSystemIndices(); if (newOnSystemIndex != null) { logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getName()); if (onSystemIndex != null) { @@ -988,7 +989,7 @@ protected Node( } if (onSystemIndex != null) { - onSystemIndex.apply(systemIndexMap); + onSystemIndex.accept(systemIndexMap); } // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory diff --git a/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java b/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java index 5feb33e91091d..b80b206c9f436 100644 --- a/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SystemIndexPlugin.java @@ -39,7 +39,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.function.Function; +import java.util.function.Consumer; /** * Plugin for defining system indices. Extends {@link ActionPlugin} because system indices must be accessed via APIs @@ -64,7 +64,7 @@ default Collection getSystemIndexDescriptors(Settings set * * Note: Only one installed plugin may implement onSystemIndices. */ - default Function>, Void> onSystemIndices(Map> systemIndices) { + default Consumer>> onSystemIndices() { return null; } } From a4974833254527dc80343038d886602525ba30cd Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Wed, 12 Jun 2024 16:51:33 -0400 Subject: [PATCH 10/12] Use getCanonicalName Signed-off-by: Craig Perkins --- server/src/main/java/org/opensearch/node/Node.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3ea14e4d2614d..c11721a1ea4c3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -967,7 +967,7 @@ protected Node( systemIndexPlugins.stream() .collect( Collectors.toMap( - plugin -> plugin.getClass().getSimpleName(), + plugin -> plugin.getClass().getCanonicalName(), plugin -> plugin.getSystemIndexDescriptors(settings) .stream() .map(SystemIndexDescriptor::getIndexPattern) @@ -980,7 +980,7 @@ protected Node( for (SystemIndexPlugin plugin : systemIndexPlugins) { Consumer>> newOnSystemIndex = plugin.onSystemIndices(); if (newOnSystemIndex != null) { - logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getName()); + logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getCanonicalName()); if (onSystemIndex != null) { throw new IllegalArgumentException("Cannot have more than one plugin implementing onSystemIndex"); } From 5c2a1aeeb6bbb8529df3e179d42edf636995f1fa Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Mon, 17 Jun 2024 15:38:39 -0400 Subject: [PATCH 11/12] Move before createComponents Signed-off-by: Craig Perkins --- .../main/java/org/opensearch/node/Node.java | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index c11721a1ea4c3..fd6621fb8f24d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -823,6 +823,35 @@ protected Node( ); final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap); + final Map> systemIndexMap = Collections.unmodifiableMap( + systemIndexPlugins.stream() + .collect( + Collectors.toMap( + plugin -> plugin.getClass().getCanonicalName(), + plugin -> plugin.getSystemIndexDescriptors(settings) + .stream() + .map(SystemIndexDescriptor::getIndexPattern) + .collect(Collectors.toSet()) + ) + ) + ); + + Consumer>> onSystemIndex = null; + for (SystemIndexPlugin plugin : systemIndexPlugins) { + Consumer>> newOnSystemIndex = plugin.onSystemIndices(); + if (newOnSystemIndex != null) { + logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getCanonicalName()); + if (onSystemIndex != null) { + throw new IllegalArgumentException("Cannot have more than one plugin implementing onSystemIndex"); + } + onSystemIndex = newOnSystemIndex; + } + } + + if (onSystemIndex != null) { + onSystemIndex.accept(systemIndexMap); + } + final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); rerouteServiceReference.set(rerouteService); clusterService.setRerouteService(rerouteService); @@ -963,35 +992,6 @@ protected Node( // Add the telemetryAwarePlugin components to the existing pluginComponents collection. pluginComponents.addAll(telemetryAwarePluginComponents); - final Map> systemIndexMap = Collections.unmodifiableMap( - systemIndexPlugins.stream() - .collect( - Collectors.toMap( - plugin -> plugin.getClass().getCanonicalName(), - plugin -> plugin.getSystemIndexDescriptors(settings) - .stream() - .map(SystemIndexDescriptor::getIndexPattern) - .collect(Collectors.toSet()) - ) - ) - ); - - Consumer>> onSystemIndex = null; - for (SystemIndexPlugin plugin : systemIndexPlugins) { - Consumer>> newOnSystemIndex = plugin.onSystemIndices(); - if (newOnSystemIndex != null) { - logger.debug("Using onSystemIndex from plugin " + plugin.getClass().getCanonicalName()); - if (onSystemIndex != null) { - throw new IllegalArgumentException("Cannot have more than one plugin implementing onSystemIndex"); - } - onSystemIndex = newOnSystemIndex; - } - } - - if (onSystemIndex != null) { - onSystemIndex.accept(systemIndexMap); - } - // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( From 61d4ab896a7b8e2105e66b4869c32dc0112fb8b4 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Mon, 17 Jun 2024 16:12:45 -0400 Subject: [PATCH 12/12] Add test to ensure IllegalArgumentException is thrown Signed-off-by: Craig Perkins --- .../plugins/SystemIndexPluginIT.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/opensearch/plugins/SystemIndexPluginIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/plugins/SystemIndexPluginIT.java b/server/src/internalClusterTest/java/org/opensearch/plugins/SystemIndexPluginIT.java new file mode 100644 index 0000000000000..7e7aa6fbe84a3 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/plugins/SystemIndexPluginIT.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.SystemIndexDescriptor; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.containsString; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class SystemIndexPluginIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(SystemIndexPlugin1.class, SystemIndexPlugin2.class); + } + + public void test2SystemIndexPluginsImplementOnSystemIndices_shouldFail() throws Exception { + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> internalCluster().startNode()); + assertThat(e.getMessage(), containsString("Cannot have more than one plugin implementing onSystemIndex")); + } + +} + +final class SystemIndexPlugin1 extends Plugin implements SystemIndexPlugin { + + private Map> systemIndices; + + public SystemIndexPlugin1() {} + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + final SystemIndexDescriptor systemIndexDescriptor = new SystemIndexDescriptor(".system-index1", "System index 1"); + return Collections.singletonList(systemIndexDescriptor); + } + + @Override + public Consumer>> onSystemIndices() { + return (systemIndices) -> { this.systemIndices = systemIndices; }; + } +} + +class SystemIndexPlugin2 extends Plugin implements SystemIndexPlugin { + + private Map> systemIndices; + + public SystemIndexPlugin2() {} + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + final SystemIndexDescriptor systemIndexDescriptor = new SystemIndexDescriptor(".system-index2", "System index 2"); + return Collections.singletonList(systemIndexDescriptor); + } + + @Override + public Consumer>> onSystemIndices() { + return (systemIndices) -> { this.systemIndices = systemIndices; }; + } +}