From 84a7eaee0bac1ed774153a289dc23a857b802d02 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Fri, 17 May 2024 22:47:00 +0530 Subject: [PATCH] Adds support to inject telemetry instances to plugins (#13636) * Adds support to inject telemetry instances to plugins Signed-off-by: Gagan Juneja * Adds test Signed-off-by: Gagan Juneja * incorporate pr comments Signed-off-by: Gagan Juneja --------- Signed-off-by: Gagan Juneja Co-authored-by: Gagan Juneja --- CHANGELOG.md | 1 + .../main/java/org/opensearch/node/Node.java | 38 ++++++++ .../plugins/TelemetryAwarePlugin.java | 80 ++++++++++++++++ .../java/org/opensearch/node/NodeTests.java | 94 +++++++++++++++++++ 4 files changed, 213 insertions(+) create mode 100644 server/src/main/java/org/opensearch/plugins/TelemetryAwarePlugin.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f868dd76039b..ba19e8d55dab3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650)) - Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481)) +- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 614f39166ea66..0b25126f6be82 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -207,6 +207,7 @@ import org.opensearch.plugins.SearchPlugin; import org.opensearch.plugins.SecureSettingsFactory; import org.opensearch.plugins.SystemIndexPlugin; +import org.opensearch.plugins.TelemetryAwarePlugin; import org.opensearch.plugins.TelemetryPlugin; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor; @@ -274,6 +275,7 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -620,6 +622,18 @@ protected Node( final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings()); if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) { List telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class); + List telemetryPluginsImplementingTelemetryAware = telemetryPlugins.stream() + .filter(a -> TelemetryAwarePlugin.class.isAssignableFrom(a.getClass())) + .collect(toList()); + if (telemetryPluginsImplementingTelemetryAware.isEmpty() == false) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Telemetry plugins %s should not implement TelemetryAwarePlugin interface", + telemetryPluginsImplementingTelemetryAware + ) + ); + } TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings); if (telemetrySettings.isTracingFeatureEnabled()) { tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext()); @@ -909,6 +923,30 @@ protected Node( ) .collect(Collectors.toList()); + Collection telemetryAwarePluginComponents = pluginsService.filterPlugins(TelemetryAwarePlugin.class) + .stream() + .flatMap( + p -> p.createComponents( + client, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + xContentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + clusterModule.getIndexNameExpressionResolver(), + repositoriesServiceReference::get, + tracer, + metricsRegistry + ).stream() + ) + .collect(Collectors.toList()); + + // Add the telemetryAwarePlugin components to the existing pluginComponents collection. + pluginComponents.addAll(telemetryAwarePluginComponents); + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( diff --git a/server/src/main/java/org/opensearch/plugins/TelemetryAwarePlugin.java b/server/src/main/java/org/opensearch/plugins/TelemetryAwarePlugin.java new file mode 100644 index 0000000000000..42cab326f88bf --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/TelemetryAwarePlugin.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lifecycle.LifecycleComponent; +import org.opensearch.core.common.io.stream.NamedWriteable; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.function.Supplier; + +/** + * Plugin that provides the telemetry registries to build component with telemetry and also provide a way to + * pass telemetry registries to the implementing plugins for adding instrumentation in the code. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface TelemetryAwarePlugin { + + /** + * Returns components added by this plugin. + *

+ * Any components returned that implement {@link LifecycleComponent} will have their lifecycle managed. + * Note: To aid in the migration away from guice, all objects returned as components will be bound in guice + * to themselves. + * + * @param client A client to make requests to the system + * @param clusterService A service to allow watching and updating cluster state + * @param threadPool A service to allow retrieving an executor to run an async action + * @param resourceWatcherService A service to watch for changes to node local files + * @param scriptService A service to allow running scripts on the local node + * @param xContentRegistry the registry for extensible xContent parsing + * @param environment the environment for path and setting configurations + * @param nodeEnvironment the node environment used coordinate access to the data paths + * @param namedWriteableRegistry the registry for {@link NamedWriteable} object parsing + * @param indexNameExpressionResolver A service that resolves expression to index and alias names + * @param repositoriesServiceSupplier A supplier for the service that manages snapshot repositories; will return null when this method + * is called, but will return the repositories service once the node is initialized. + * @param tracer the tracer to add tracing instrumentation. + * @param metricsRegistry the registry for metrics instrumentation. + */ + default Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier, + Tracer tracer, + MetricsRegistry metricsRegistry + ) { + return Collections.emptyList(); + } +} diff --git a/server/src/test/java/org/opensearch/node/NodeTests.java b/server/src/test/java/org/opensearch/node/NodeTests.java index d91dc696eb30b..f44cc352cd330 100644 --- a/server/src/test/java/org/opensearch/node/NodeTests.java +++ b/server/src/test/java/org/opensearch/node/NodeTests.java @@ -34,17 +34,23 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.bootstrap.BootstrapCheck; import org.opensearch.bootstrap.BootstrapContext; +import org.opensearch.client.Client; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsException; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexService; @@ -56,22 +62,35 @@ import org.opensearch.monitor.fs.FsProbe; import org.opensearch.plugins.CircuitBreakerPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.TelemetryAwarePlugin; +import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.telemetry.Telemetry; +import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.MockHttpTransport; import org.opensearch.test.NodeRoles; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -404,6 +423,81 @@ public void testCreateWithFileCache() throws Exception { } } + public void testTelemetryAwarePlugins() throws IOException { + Settings.Builder settings = baseSettings(); + List> plugins = basePlugins(); + plugins.add(MockTelemetryAwarePlugin.class); + try (Node node = new MockNode(settings.build(), plugins)) { + MockTelemetryAwareComponent mockTelemetryAwareComponent = node.injector().getInstance(MockTelemetryAwareComponent.class); + assertNotNull(mockTelemetryAwareComponent.getTracer()); + assertNotNull(mockTelemetryAwareComponent.getMetricsRegistry()); + TelemetryAwarePlugin telemetryAwarePlugin = node.getPluginsService().filterPlugins(TelemetryAwarePlugin.class).get(0); + assertTrue(telemetryAwarePlugin instanceof MockTelemetryAwarePlugin); + } + } + + public void testTelemetryPluginShouldNOTImplementTelemetryAwarePlugin() throws IOException { + Settings.Builder settings = baseSettings(); + List> plugins = basePlugins(); + plugins.add(MockTelemetryPlugin.class); + FeatureFlagSetter.set(FeatureFlags.TELEMETRY); + settings.put(TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING.getKey(), true); + assertThrows(IllegalStateException.class, () -> new MockNode(settings.build(), plugins)); + } + + private static class MockTelemetryAwareComponent { + private final Tracer tracer; + private final MetricsRegistry metricsRegistry; + + public MockTelemetryAwareComponent(Tracer tracer, MetricsRegistry metricsRegistry) { + this.tracer = tracer; + this.metricsRegistry = metricsRegistry; + } + + public Tracer getTracer() { + return tracer; + } + + public MetricsRegistry getMetricsRegistry() { + return metricsRegistry; + } + } + + public static class MockTelemetryAwarePlugin extends Plugin implements TelemetryAwarePlugin { + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier, + Tracer tracer, + MetricsRegistry metricsRegistry + ) { + return List.of(new MockTelemetryAwareComponent(tracer, metricsRegistry)); + } + + } + + public static class MockTelemetryPlugin extends Plugin implements TelemetryPlugin, TelemetryAwarePlugin { + + @Override + public Optional getTelemetry(TelemetrySettings telemetrySettings) { + return Optional.empty(); + } + + @Override + public String getName() { + return null; + } + } + public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin { private SetOnce myCircuitBreaker = new SetOnce<>();