diff --git a/CHANGELOG.md b/CHANGELOG.md index 94069885e7da1..cd1aeeeb27091 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333)) - Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423)) - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) - Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293)) diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index 273b69e483e8c..2f353f2a53329 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -21,6 +21,7 @@ import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.plugins.ActionPlugin; import org.opensearch.rest.RestHandler; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ScalingExecutorBuilder; @@ -50,8 +51,7 @@ public void setup() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); - clusterService = new ClusterService(settings, clusterSettings, threadPool); - + clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool); } public void testGetSettings() { diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index f340950017a5c..328ed0cd2ed15 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -22,6 +22,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -57,7 +58,7 @@ public void setup() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); - clusterService = new ClusterService(settings, clusterSettings, null); + clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java index a5f36b6e8cce0..d05cf7b6a636f 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java @@ -17,6 +17,7 @@ import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase { private final Settings.Builder settingsBuilder = Settings.builder(); private final Settings settings = settingsBuilder.build(); private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool); private final TransportService transportService = mock(TransportService.class); private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class); private final ActionFilters actionFilters = mock(ActionFilters.class); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java new file mode 100644 index 0000000000000..d48f82a388245 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster; + +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.Objects; +import java.util.Optional; + +/** + * Class containing metrics (counters/latency) specific to ClusterManager. + * + * @opensearch.internal + */ +public final class ClusterManagerMetrics { + + private static final String LATENCY_METRIC_UNIT_MS = "ms"; + + public final Histogram clusterStateAppliersHistogram; + public final Histogram clusterStateListenersHistogram; + public final Histogram rerouteHistogram; + public final Histogram clusterStateComputeHistogram; + public final Histogram clusterStatePublishHistogram; + + public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { + clusterStateAppliersHistogram = metricsRegistry.createHistogram( + "cluster.state.appliers.latency", + "Histogram for tracking the latency of cluster state appliers", + LATENCY_METRIC_UNIT_MS + ); + clusterStateListenersHistogram = metricsRegistry.createHistogram( + "cluster.state.listeners.latency", + "Histogram for tracking the latency of cluster state listeners", + LATENCY_METRIC_UNIT_MS + ); + rerouteHistogram = metricsRegistry.createHistogram( + "allocation.reroute.latency", + "Histogram for recording latency of shard re-routing", + LATENCY_METRIC_UNIT_MS + ); + clusterStateComputeHistogram = metricsRegistry.createHistogram( + "cluster.state.new.compute.latency", + "Histogram for recording time taken to compute new cluster state", + LATENCY_METRIC_UNIT_MS + ); + clusterStatePublishHistogram = metricsRegistry.createHistogram( + "cluster.state.publish.success.latency", + "Histogram for recording time taken to publish a new cluster state", + LATENCY_METRIC_UNIT_MS + ); + } + + public void recordLatency(Histogram histogram, Double value) { + histogram.record(value); + } + + public void recordLatency(Histogram histogram, Double value, Optional tags) { + if (Objects.isNull(tags) || tags.isEmpty()) { + histogram.record(value); + return; + } + histogram.record(value, tags.get()); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index e3c4bc80eb4a7..148ca2b48f434 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -145,7 +145,8 @@ public ClusterModule( List clusterPlugins, ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, - ThreadContext threadContext + ThreadContext threadContext, + ClusterManagerMetrics clusterManagerMetrics ) { this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); @@ -158,7 +159,8 @@ public ClusterModule( shardsAllocator, clusterInfoService, snapshotsInfoService, - settings + settings, + clusterManagerMetrics ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index ae496e27ebf35..58b9753a11d73 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.RestoreInProgress; import org.opensearch.cluster.health.ClusterHealthStatus; @@ -56,10 +57,12 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.snapshots.SnapshotsInfoService; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import java.util.ArrayList; import java.util.Collections; @@ -96,6 +99,7 @@ public class AllocationService { private final ShardsAllocator shardsAllocator; private final ClusterInfoService clusterInfoService; private SnapshotsInfoService snapshotsInfoService; + private final ClusterManagerMetrics clusterManagerMetrics; // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator public AllocationService( @@ -105,7 +109,13 @@ public AllocationService( ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService ) { - this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); + this( + allocationDeciders, + shardsAllocator, + clusterInfoService, + snapshotsInfoService, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator)); } @@ -113,9 +123,10 @@ public AllocationService( AllocationDeciders allocationDeciders, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService + SnapshotsInfoService snapshotsInfoService, + ClusterManagerMetrics clusterManagerMetrics ) { - this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY); + this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY, clusterManagerMetrics); } public AllocationService( @@ -123,14 +134,15 @@ public AllocationService( ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, - Settings settings - + Settings settings, + ClusterManagerMetrics clusterManagerMetrics ) { this.allocationDeciders = allocationDeciders; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; this.settings = settings; + this.clusterManagerMetrics = clusterManagerMetrics; } /** @@ -550,11 +562,15 @@ private void reroute(RoutingAllocation allocation) { assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; assert assertInitialized(); - + long rerouteStartTimeNS = System.nanoTime(); removeDelayMarkers(allocation); allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first shardsAllocator.allocate(allocation); + clusterManagerMetrics.recordLatency( + clusterManagerMetrics.rerouteHistogram, + (double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS)) + ); assert RoutingNodes.assertShardStats(allocation.routingNodes()); } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index a55721fb13cdc..2ac95178d2ff9 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.ClusterStateListener; @@ -61,6 +62,7 @@ import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.telemetry.metrics.tags.Tags; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -68,6 +70,7 @@ import java.util.Collection; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -120,8 +123,15 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final String nodeName; private NodeConnectionsService nodeConnectionsService; - - public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + private final ClusterManagerMetrics clusterManagerMetrics; + + public ClusterApplierService( + String nodeName, + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics + ) { this.clusterSettings = clusterSettings; this.threadPool = threadPool; this.state = new AtomicReference<>(); @@ -132,6 +142,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold ); + this.clusterManagerMetrics = clusterManagerMetrics; } private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { @@ -597,7 +608,7 @@ private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, S callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers); } - private static void callClusterStateAppliers( + private void callClusterStateAppliers( ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch, Collection clusterStateAppliers @@ -605,7 +616,13 @@ private static void callClusterStateAppliers( for (ClusterStateApplier applier : clusterStateAppliers) { logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); try (TimingHandle ignored = stopWatch.timing("running applier [" + applier + "]")) { + long applierStartTimeNS = System.nanoTime(); applier.applyClusterState(clusterChangedEvent); + clusterManagerMetrics.recordLatency( + clusterManagerMetrics.clusterStateAppliersHistogram, + (double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - applierStartTimeNS)), + Optional.of(Tags.create().addTag("Operation", applier.getClass().getSimpleName())) + ); } } } @@ -624,7 +641,13 @@ private void callClusterStateListener( try { logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version()); try (TimingHandle ignored = stopWatch.timing("notifying listener [" + listener + "]")) { + long listenerStartTimeNS = System.nanoTime(); listener.clusterChanged(clusterChangedEvent); + clusterManagerMetrics.recordLatency( + clusterManagerMetrics.clusterStateListenersHistogram, + (double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - listenerStartTimeNS)), + Optional.of(Tags.create().addTag("Operation", listener.getClass().getSimpleName())) + ); } } catch (Exception ex) { logger.warn("failed to notify ClusterStateListener", ex); diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java index e9224596e048d..eaedb36a59f1e 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.service; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -20,7 +21,12 @@ */ @PublicApi(since = "2.2.0") public class ClusterManagerService extends MasterService { - public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { - super(settings, clusterSettings, threadPool); + public ClusterManagerService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics + ) { + super(settings, clusterSettings, threadPool, clusterManagerMetrics); } } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index aa7766979e851..fa61375e85c25 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.service; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; @@ -91,12 +92,17 @@ public class ClusterService extends AbstractLifecycleComponent { private IndexingPressureService indexingPressureService; - public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public ClusterService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics + ) { this( settings, clusterSettings, - new ClusterManagerService(settings, clusterSettings, threadPool), - new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool) + new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics), + new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics) ); } diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index af3e4f8437c43..6436dcfe33003 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -39,6 +39,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.AckedClusterStateTaskListener; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState.Builder; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -70,6 +71,7 @@ import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.discovery.Discovery; import org.opensearch.node.Node; +import org.opensearch.telemetry.metrics.tags.Tags; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -79,6 +81,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -135,8 +138,14 @@ public class MasterService extends AbstractLifecycleComponent { protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler; private final ClusterManagerThrottlingStats throttlingStats; private final ClusterStateStats stateStats; + private final ClusterManagerMetrics clusterManagerMetrics; - public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public MasterService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics + ) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); this.slowTaskLoggingThreshold = CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); @@ -154,6 +163,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP ); this.stateStats = new ClusterStateStats(); this.threadPool = threadPool; + this.clusterManagerMetrics = clusterManagerMetrics; } private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { @@ -303,6 +313,12 @@ private void runTasks(TaskInputs taskInputs) { final TimeValue computationTime = getTimeSince(computationStartTime); logExecutionTime(computationTime, "compute cluster state update", summary); + clusterManagerMetrics.recordLatency( + clusterManagerMetrics.clusterStateComputeHistogram, + (double) computationTime.getMillis(), + Optional.of(Tags.create().addTag("Operation", taskInputs.executor.getClass().getSimpleName())) + ); + if (taskOutputs.clusterStateUnchanged()) { final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); @@ -361,6 +377,7 @@ protected boolean blockingAllowed() { final long durationMillis = getTimeSince(startTimeNanos).millis(); stateStats.stateUpdateTook(durationMillis); stateStats.stateUpdated(); + clusterManagerMetrics.recordLatency(clusterManagerMetrics.clusterStatePublishHistogram, (double) durationMillis); } catch (Exception e) { stateStats.stateUpdateFailed(); onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6f90ebd8ad188..d0832a7c41e0a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -58,6 +58,7 @@ import org.opensearch.client.Client; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -604,21 +605,10 @@ protected Node( getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)) ); - List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); - final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); - clusterService.addStateApplier(scriptService); - resourcesToClose.add(clusterService); - final Set> consistentSettings = settingsModule.getConsistentSettings(); - if (consistentSettings.isEmpty() == false) { - clusterService.addLocalNodeMasterListener( - new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher() - ); - } - TracerFactory tracerFactory; MetricsRegistryFactory metricsRegistryFactory; if (FeatureFlags.isEnabled(TELEMETRY)) { - final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings()); + final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, settingsModule.getClusterSettings()); if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) { List telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class); List telemetryPluginsImplementingTelemetryAware = telemetryPlugins.stream() @@ -658,6 +648,24 @@ protected Node( resourcesToClose.add(tracer::close); resourcesToClose.add(metricsRegistry::close); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); + + List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); + final ClusterService clusterService = new ClusterService( + settings, + settingsModule.getClusterSettings(), + threadPool, + clusterManagerMetrics + ); + clusterService.addStateApplier(scriptService); + resourcesToClose.add(clusterService); + final Set> consistentSettings = settingsModule.getConsistentSettings(); + if (consistentSettings.isEmpty() == false) { + clusterService.addLocalNodeMasterListener( + new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher() + ); + } + final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); @@ -685,7 +693,8 @@ protected Node( clusterPlugins, clusterInfoService, snapshotsInfoService, - threadPool.getThreadContext() + threadPool.getThreadContext(), + clusterManagerMetrics ); modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index cf7080ab2fc06..ff9e41ee7c784 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -55,6 +55,7 @@ import org.opensearch.indices.SystemIndices; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -153,7 +154,11 @@ private void indicesThatCannotBeCreatedTestCase( null, new IndexingPressureService( Settings.EMPTY, - new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) + ClusterServiceUtils.createClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ) ), null, new SystemIndices(emptyMap()), diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index da9156ccdb71a..a94a5d60b3f5a 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -71,6 +71,7 @@ import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -170,7 +171,11 @@ class TestTransportBulkAction extends TransportBulkAction { ), new IndexingPressureService( SETTINGS, - new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) + ClusterServiceUtils.createClusterService( + SETTINGS, + new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ) ), null, new SystemIndices(emptyMap()), diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java index f009988ffae17..91a2552ac3f04 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -45,6 +45,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -90,7 +91,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { LoggerContext context = (LoggerContext) LogManager.getContext(false); SearchPhaseContext searchPhaseContext1 = new MockSearchPhaseContext(1); - ClusterService clusterService1 = new ClusterService( + ClusterService clusterService1 = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null @@ -99,7 +100,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { int numberOfLoggersBefore = context.getLoggers().size(); SearchPhaseContext searchPhaseContext2 = new MockSearchPhaseContext(1); - ClusterService clusterService2 = new ClusterService( + ClusterService clusterService2 = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null @@ -124,7 +125,7 @@ public void testOnRequestEnd() throws InterruptedException { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "0ms"); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); @@ -157,7 +158,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "-1"); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); @@ -321,7 +322,7 @@ public void testLevelSettingWarn() { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL.getKey(), level); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(level, searchRequestSlowLog.getLevel()); } @@ -332,7 +333,7 @@ public void testLevelSettingDebug() { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL.getKey(), level); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(level, searchRequestSlowLog.getLevel().toString()); } @@ -343,7 +344,7 @@ public void testLevelSettingFail() { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL.getKey(), level); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); try { new SearchRequestSlowLog(clusterService); @@ -363,7 +364,7 @@ public void testSetThresholds() { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "100ms"); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); @@ -380,7 +381,7 @@ public void testSetThresholdsUnits() { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "100nanos"); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(TimeValue.timeValueSeconds(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); @@ -395,7 +396,7 @@ public void testSetThresholdsDefaults() { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING.getKey(), "200ms"); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(-1).nanos(), searchRequestSlowLog.getInfoThreshold()); @@ -409,7 +410,7 @@ public void testSetThresholdsError() { settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING.getKey(), "NOT A TIME VALUE"); Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); try { new SearchRequestSlowLog(clusterService); diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 557e4dc2ca8c5..ae35d37fe77b2 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -72,6 +72,8 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.plugins.ClusterPlugin; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.gateway.TestGatewayAllocator; import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; @@ -92,7 +94,7 @@ public class ClusterModuleTests extends ModuleTestCase { public void setUp() throws Exception { super.setUp(); threadContext = new ThreadContext(Settings.EMPTY); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null @@ -167,7 +169,7 @@ public void testRegisterAllocationDeciderDuplicate() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new EnableAllocationDecider(settings, clusterSettings)); } - }), clusterInfoService, null, threadContext) + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)) ); assertEquals(e.getMessage(), "Cannot specify allocation decider [" + EnableAllocationDecider.class.getName() + "] twice"); } @@ -178,7 +180,7 @@ public void testRegisterAllocationDecider() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new FakeAllocationDecider()); } - }), clusterInfoService, null, threadContext); + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); assertTrue(module.deciderList.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class))); } @@ -188,7 +190,7 @@ private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, Str public Map> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonMap(name, supplier); } - }), clusterInfoService, null, threadContext); + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); } public void testRegisterShardsAllocator() { @@ -209,7 +211,15 @@ public void testUnknownShardsAllocator() { Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "dne").build(); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> new ClusterModule(settings, clusterService, Collections.emptyList(), clusterInfoService, null, threadContext) + () -> new ClusterModule( + settings, + clusterService, + Collections.emptyList(), + clusterInfoService, + null, + threadContext, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ) ); assertEquals("Unknown ShardsAllocator [dne]", e.getMessage()); } @@ -295,7 +305,8 @@ public void testRejectsReservedExistingShardsAllocatorName() { Collections.singletonList(existingShardsAllocatorPlugin(GatewayAllocator.ALLOCATOR_NAME)), clusterInfoService, null, - threadContext + threadContext, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); expectThrows( IllegalArgumentException.class, @@ -310,7 +321,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { Arrays.asList(existingShardsAllocatorPlugin("duplicate"), existingShardsAllocatorPlugin("duplicate")), clusterInfoService, null, - threadContext + threadContext, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); expectThrows( IllegalArgumentException.class, diff --git a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 47dbf85c13b1f..537b2d13ec08a 100644 --- a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -54,6 +54,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.node.Node; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.client.NoOpClient; @@ -83,7 +84,13 @@ public void testScheduling() { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); - final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) { + final ClusterApplierService clusterApplierService = new ClusterApplierService( + "test", + settings, + clusterSettings, + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ) { @Override protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index d94f3fb304fe2..10d5dceb74f55 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -33,6 +33,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; @@ -61,6 +62,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -179,7 +181,8 @@ private void setupRealClusterManagerServiceAndCoordinator(long term, ClusterStat ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); AtomicReference clusterStateRef = new AtomicReference<>(initialState); clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java index 64d9c243304d8..cce75105dd33f 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java @@ -33,6 +33,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.EmptyClusterInfoService; @@ -56,6 +57,9 @@ import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.snapshots.EmptySnapshotsInfoService; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.gateway.TestGatewayAllocator; @@ -77,6 +81,12 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class AllocationServiceTests extends OpenSearchTestCase { @@ -137,6 +147,16 @@ public void testAssignsPrimariesInPriorityOrderThenReplicas() { .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Histogram rerouteHistogram = mock(Histogram.class); + final Histogram mockedHistogram = mock(Histogram.class); + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.contains("reroute.latency")) { + return rerouteHistogram; + } + return mockedHistogram; + }); final AllocationService allocationService = new AllocationService( new AllocationDeciders( Arrays.asList( @@ -158,7 +178,8 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }, new EmptyClusterInfoService(), - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + new ClusterManagerMetrics(metricsRegistry) ); final String unrealisticAllocatorName = "unrealistic"; @@ -258,10 +279,18 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing assertThat(routingTable3.index("mediumPriority").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); assertTrue(routingTable3.index("lowPriority").allPrimaryShardsActive()); assertThat(routingTable3.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty()); + + verify(rerouteHistogram, times(3)).record(anyDouble()); } public void testExplainsNonAllocationOfShardWithUnknownAllocator() { - final AllocationService allocationService = new AllocationService(null, null, null, null); + final AllocationService allocationService = new AllocationService( + null, + null, + null, + null, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); allocationService.setExistingShardsAllocators( Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()) ); diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java index c5ed505e6bbf2..3cbdfb80067d7 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; @@ -51,6 +52,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.junit.annotations.TestLogging; @@ -74,15 +77,30 @@ import static org.opensearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; public class ClusterApplierServiceTests extends OpenSearchTestCase { private static ThreadPool threadPool; private TimedClusterApplierService clusterApplierService; + private static MetricsRegistry metricsRegistry; + private static Histogram applierslatenctHistogram; + private static Histogram listenerslatenctHistogram; @BeforeClass public static void createThreadPool() { threadPool = new TestThreadPool(ClusterApplierServiceTests.class.getName()); + metricsRegistry = mock(MetricsRegistry.class); + applierslatenctHistogram = mock(Histogram.class); + listenerslatenctHistogram = mock(Histogram.class); } @AfterClass @@ -96,6 +114,13 @@ public static void stopThreadPool() { @Before public void setUp() throws Exception { super.setUp(); + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.contains("appliers.latency")) { + return applierslatenctHistogram; + } + return listenerslatenctHistogram; + }); clusterApplierService = createTimedClusterService(true); } @@ -110,7 +135,8 @@ private TimedClusterApplierService createTimedClusterService(boolean makeCluster TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService( Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new ClusterManagerMetrics(metricsRegistry) ); timedClusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService()); timedClusterApplierService.setInitialState( @@ -194,6 +220,8 @@ public void onFailure(String source, Exception e) { }); assertBusy(mockAppender::assertAllExpectationsMatched); } + verifyNoInteractions(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level") @@ -291,6 +319,8 @@ public void onFailure(String source, Exception e) { latch.await(); mockAppender.assertAllExpectationsMatched(); } + verifyNoInteractions(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } public void testLocalNodeClusterManagerListenerCallbacks() { @@ -329,6 +359,10 @@ public void offClusterManager() { setState(timedClusterApplierService, state); assertThat(isClusterManager.get(), is(true)); + verify(listenerslatenctHistogram, atLeastOnce()).record(anyDouble(), any()); + clearInvocations(listenerslatenctHistogram); + verifyNoInteractions(applierslatenctHistogram); + timedClusterApplierService.close(); } @@ -366,6 +400,10 @@ public void offMaster() { setState(timedClusterApplierService, state); assertThat(isClusterManager.get(), is(false)); + verify(listenerslatenctHistogram, atLeastOnce()).record(anyDouble(), any()); + clearInvocations(listenerslatenctHistogram); + verifyNoInteractions(applierslatenctHistogram); + timedClusterApplierService.close(); } @@ -405,6 +443,10 @@ public void onFailure(String source, Exception e) { latch.await(); assertNull(error.get()); assertTrue(applierCalled.get()); + + verify(applierslatenctHistogram, atLeastOnce()).record(anyDouble(), any()); + clearInvocations(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } public void testClusterStateApplierBubblesUpExceptionsInApplier() throws InterruptedException { @@ -435,6 +477,9 @@ public void onFailure(String source, Exception e) { latch.await(); assertNotNull(error.get()); assertThat(error.get().getMessage(), containsString("dummy exception")); + + verifyNoInteractions(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws InterruptedException { @@ -478,6 +523,9 @@ public void onFailure(String source, Exception e) { latch.await(); assertNotNull(error.get()); assertThat(error.get().getMessage(), containsString("illegal value can't update")); + + verifyNoInteractions(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } public void testClusterStateApplierSwallowsExceptionInListener() throws InterruptedException { @@ -509,6 +557,9 @@ public void onFailure(String source, Exception e) { latch.await(); assertNull(error.get()); assertTrue(applierCalled.get()); + + verifyNoInteractions(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } public void testClusterStateApplierCanCreateAnObserver() throws InterruptedException { @@ -565,6 +616,10 @@ public void onFailure(String source, Exception e) { latch.await(); assertNull(error.get()); assertTrue(applierCalled.get()); + + verify(applierslatenctHistogram, atLeastOnce()).record(anyDouble(), any()); + clearInvocations(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } public void testThreadContext() throws InterruptedException { @@ -609,6 +664,9 @@ public void onFailure(String source, Exception e) { } latch.await(); + + verifyNoInteractions(applierslatenctHistogram); + verifyNoInteractions(listenerslatenctHistogram); } static class TimedClusterApplierService extends ClusterApplierService { @@ -617,8 +675,13 @@ static class TimedClusterApplierService extends ClusterApplierService { volatile Long currentTimeOverride = null; boolean applicationMayFail; - TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { - super("test_node", settings, clusterSettings, threadPool); + TimedClusterApplierService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics + ) { + super("test_node", settings, clusterSettings, threadPool, clusterManagerMetrics); this.clusterSettings = clusterSettings; } diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java index 4d88683826af7..bd12b09d2b983 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.junit.After; @@ -26,11 +27,11 @@ public void terminateThreadPool() { public void testDeprecatedGetMasterServiceBWC() { try ( - ClusterService clusterService = new ClusterService( + ClusterService clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool - ) + ); ) { MasterService masterService = clusterService.getMasterService(); ClusterManagerService clusterManagerService = clusterService.getClusterManagerService(); diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 85f6c129944fa..0ff8d9dc4e7a5 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -40,6 +40,7 @@ import org.opensearch.Version; import org.opensearch.cluster.AckedClusterStateUpdateTask; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -61,6 +62,9 @@ import org.opensearch.common.util.concurrent.BaseFuture; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.node.Node; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.junit.annotations.TestLogging; @@ -95,6 +99,13 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class MasterServiceTests extends OpenSearchTestCase { @@ -125,6 +136,10 @@ public void randomizeCurrentTime() { } private ClusterManagerService createClusterManagerService(boolean makeClusterManager) { + return createClusterManagerService(makeClusterManager, NoopMetricsRegistry.INSTANCE); + } + + private ClusterManagerService createClusterManagerService(boolean makeClusterManager, MetricsRegistry metricsRegistry) { final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() @@ -132,7 +147,8 @@ private ClusterManagerService createClusterManagerService(boolean makeClusterMan .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new ClusterManagerMetrics(metricsRegistry) ); final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) .nodes( @@ -154,7 +170,18 @@ private ClusterManagerService createClusterManagerService(boolean makeClusterMan } public void testClusterManagerAwareExecution() throws Exception { - final ClusterManagerService nonClusterManager = createClusterManagerService(false); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Histogram clusterStateComputeHistogram = mock(Histogram.class); + final Histogram clusterStatePublishHistogram = mock(Histogram.class); + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.contains("cluster.state.new.compute.latency")) { + return clusterStateComputeHistogram; + } + return clusterStatePublishHistogram; + }); + + final ClusterManagerService nonClusterManager = createClusterManagerService(false, metricsRegistry); final boolean[] taskFailed = { false }; final CountDownLatch latch1 = new CountDownLatch(1); @@ -194,6 +221,8 @@ public void onFailure(String source, Exception e) { assertFalse("non-cluster-manager cluster state update task was not executed", taskFailed[0]); nonClusterManager.close(); + + verify(clusterStateComputeHistogram, times(1)).record(anyDouble(), any()); } public void testThreadContext() throws InterruptedException { @@ -1070,7 +1099,8 @@ public void testLongClusterStateUpdateLogging() throws Exception { .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ) ) { @@ -1246,6 +1276,18 @@ public void testAcking() throws InterruptedException { final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Histogram clusterStateComputeHistogram = mock(Histogram.class); + final Histogram clusterStatePublishHistogram = mock(Histogram.class); + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())).thenAnswer(invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.contains("cluster.state.new.compute.latency")) { + return clusterStateComputeHistogram; + } + return clusterStatePublishHistogram; + }); + try ( ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() @@ -1253,7 +1295,8 @@ public void testAcking() throws InterruptedException { .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new ClusterManagerMetrics(metricsRegistry) ) ) { @@ -1372,6 +1415,9 @@ public void onAckTimeout() { latch.await(); } } + + verify(clusterStateComputeHistogram, times(2)).record(anyDouble(), any()); + verify(clusterStatePublishHistogram, times(1)).record(anyDouble()); } public void testDeprecatedMasterServiceUpdateTaskThreadName() { diff --git a/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java b/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java index 1c43bb565ef69..dd2fb51151a5b 100644 --- a/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java +++ b/server/src/test/java/org/opensearch/gateway/ClusterStateUpdatersTests.java @@ -55,6 +55,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.repositories.IndexId; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import java.util.Arrays; @@ -123,7 +124,7 @@ public String getValue(final String value) { }) ); - final ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, null); + final ClusterService clusterService = ClusterServiceUtils.createClusterService(Settings.EMPTY, clusterSettings, null); final Metadata.Builder builder = Metadata.builder(); final Settings settings = Settings.builder().put("foo.old", randomAlphaOfLength(8)).build(); applySettingsToBuilder.accept(builder, settings); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java index c448c4b07e03b..59fb7df5428e2 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayServiceTests.java @@ -53,6 +53,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.snapshots.EmptySnapshotsInfoService; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; @@ -68,7 +69,7 @@ public class GatewayServiceTests extends OpenSearchTestCase { private GatewayService createService(final Settings.Builder settings) { - final ClusterService clusterService = new ClusterService( + final ClusterService clusterService = ClusterServiceUtils.createClusterService( Settings.builder().put("cluster.name", "GatewayServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null diff --git a/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java b/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java index 0b657c1c9745f..8db7c58bf9503 100644 --- a/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java @@ -24,6 +24,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.stats.IndexingPressurePerShardStats; import org.opensearch.index.stats.IndexingPressureStats; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -44,7 +45,7 @@ public class IndexingPressureServiceTests extends OpenSearchTestCase { @Before public void beforeTest() { clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - clusterService = new ClusterService(settings, clusterSettings, null); + clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); } public void testCoordinatingOperationForShardIndexingPressure() { diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java index ce719a18898f8..c5ad1370ac75a 100644 --- a/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java @@ -19,6 +19,7 @@ import org.opensearch.index.stats.IndexingPressurePerShardStats; import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.hamcrest.Matcher; import org.hamcrest.MatcherAssert; @@ -42,7 +43,7 @@ public class ShardIndexingPressureConcurrentExecutionTests extends OpenSearchTes .build(); private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - private final ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); public enum OperationType { COORDINATING, diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java index 023063c7d6e03..31ecad7c8d701 100644 --- a/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureMemoryManagerTests.java @@ -8,11 +8,11 @@ package org.opensearch.index; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.TimeUnit; @@ -27,7 +27,7 @@ public class ShardIndexingPressureMemoryManagerTests extends OpenSearchTestCase .build(); private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final ShardIndexingPressureSettings shardIndexingPressureSettings = new ShardIndexingPressureSettings( - new ClusterService(settings, clusterSettings, null), + ClusterServiceUtils.createClusterService(settings, clusterSettings, null), settings, IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes() ); diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureSettingsTests.java index c555d8f9c489d..5e84a76b2250a 100644 --- a/server/src/test/java/org/opensearch/index/ShardIndexingPressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureSettingsTests.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; public class ShardIndexingPressureSettingsTests extends OpenSearchTestCase { @@ -24,7 +25,7 @@ public class ShardIndexingPressureSettingsTests extends OpenSearchTestCase { .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - final ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); public void testFromSettings() { ShardIndexingPressureSettings shardIndexingPressureSettings = new ShardIndexingPressureSettings( diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java index 46f9801035ac3..d97eec4cc001d 100644 --- a/server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java @@ -8,10 +8,10 @@ package org.opensearch.index; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -22,7 +22,7 @@ public class ShardIndexingPressureStoreTests extends OpenSearchTestCase { private final Settings settings = Settings.builder().put(ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.getKey(), 200).build(); private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final ShardIndexingPressureSettings shardIndexingPressureSettings = new ShardIndexingPressureSettings( - new ClusterService(settings, clusterSettings, null), + ClusterServiceUtils.createClusterService(settings, clusterSettings, null), settings, IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes() ); diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureTests.java index e7600b1d4c41a..ddc3592511de4 100644 --- a/server/src/test/java/org/opensearch/index/ShardIndexingPressureTests.java +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureTests.java @@ -17,6 +17,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.stats.IndexingPressurePerShardStats; import org.opensearch.index.stats.IndexingPressureStats; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; public class ShardIndexingPressureTests extends OpenSearchTestCase { @@ -30,7 +31,7 @@ public class ShardIndexingPressureTests extends OpenSearchTestCase { .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - final ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null); public void testMemoryBytesMarkedAndReleased() { ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index ccdd1fe4ab609..280598c516c3c 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -15,6 +15,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.DirectoryFileTransferTracker; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -42,7 +43,7 @@ public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index 9d00cf9f2be46..18d18f2dc30b1 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -14,6 +14,7 @@ import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -45,7 +46,7 @@ public class RemoteStorePressureServiceTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java index 064c6c10eba02..7c1ef0de91887 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -27,7 +28,7 @@ public class RemoteStorePressureSettingsTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java index c300f316ac633..2bc4792a9c31c 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactoryTests.java @@ -13,6 +13,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -39,7 +40,11 @@ public void setUp() throws Exception { RemoteStoreStatsTrackerFactory.Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE ) .build(); - clusterService = new ClusterService(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); + clusterService = ClusterServiceUtils.createClusterService( + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); } @@ -85,7 +90,11 @@ public void testInvalidMovingAverageWindowSize() { "Failed to parse value", IllegalArgumentException.class, () -> new RemoteStoreStatsTrackerFactory( - new ClusterService(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool), + ClusterServiceUtils.createClusterService( + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ), settings ) ); @@ -107,7 +116,11 @@ public void testUpdateAfterGetConfiguredSettings() { public void testGetDefaultSettings() { remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory( - new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool), + ClusterServiceUtils.createClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ), Settings.EMPTY ); // Check moving average window size updated diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 33f6c67b94b3d..8fbff4527ec7b 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -37,6 +37,7 @@ import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -82,7 +83,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard.refresh("test"); } - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool @@ -627,7 +628,7 @@ private Tuple mockIn return null; }).when(emptyCheckpointPublisher).publish(any(), any()); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java index 7ca1f1e864b99..7567224f16ac3 100644 --- a/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java +++ b/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -63,7 +64,7 @@ public class ResponseCollectorServiceTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadpool = new TestThreadPool("response_collector_tests"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadpool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index 4f615290f1805..fbb083a3ae419 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -17,6 +17,7 @@ import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -33,7 +34,7 @@ public class AdmissionControlServiceTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("admission_controller_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java index c11ee1cc608f6..fbadcad804b31 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java @@ -13,6 +13,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -28,7 +29,7 @@ public class AdmissionControlSettingsTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("admission_controller_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java index e72c0cd58ed64..f2cb45a033460 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java @@ -15,6 +15,7 @@ import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -31,7 +32,7 @@ public class CpuBasedAdmissionControllerTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("admission_controller_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java index c5a2208f49ce6..54cb438e14ce6 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java @@ -15,6 +15,7 @@ import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -31,7 +32,7 @@ public class IoBasedAdmissionControllerTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("admission_controller_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java index 6836ecb3d615f..f5686f33e7f50 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java @@ -13,6 +13,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -28,7 +29,7 @@ public class CPUBasedAdmissionControllerSettingsTests extends OpenSearchTestCase public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("admission_controller_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java index c462f9700264d..3f157531f6c9a 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java @@ -21,6 +21,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -36,7 +37,7 @@ public class IoBasedAdmissionControllerSettingsTests extends OpenSearchTestCase public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("io_based_admission_controller_settings_test"); - clusterService = new ClusterService( + clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java index 7b4db5f787d6e..da57ef9f06a1a 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java @@ -20,6 +20,7 @@ import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -46,7 +47,7 @@ public void setUp() throws Exception { ) .build(); threadPool = new TestThreadPool("admission_controller_settings_test"); - ClusterService clusterService = new ClusterService( + ClusterService clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java index fe0399e79a5f4..0ef9aa61bb827 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java @@ -20,6 +20,7 @@ import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -43,7 +44,7 @@ public void setUp() throws Exception { ) .build(); threadPool = new TestThreadPool("admission_controller_settings_test"); - ClusterService clusterService = new ClusterService( + ClusterService clusterService = ClusterServiceUtils.createClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 95a343f3b4025..3a07aa413e2a2 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -112,6 +112,7 @@ import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -1922,7 +1923,13 @@ private final class TestClusterNode { settings, clusterSettings, clusterManagerService, - new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) { + new ClusterApplierService( + node.getName(), + settings, + clusterSettings, + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ) { @Override protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 0754cc1793dc8..3ae737bf63923 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -39,6 +39,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskListener; @@ -89,6 +90,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.disruption.DisruptableMockTransport; @@ -1144,7 +1146,8 @@ protected Optional getDisruptableMockTransport(Transpo settings, clusterSettings, deterministicTaskQueue, - threadPool + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService); clusterService.setNodeConnectionsService( @@ -1594,9 +1597,10 @@ static class DisruptableClusterApplierService extends ClusterApplierService { Settings settings, ClusterSettings clusterSettings, DeterministicTaskQueue deterministicTaskQueue, - ThreadPool threadPool + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics ) { - super(nodeName, settings, clusterSettings, threadPool); + super(nodeName, settings, clusterSettings, threadPool, clusterManagerMetrics); this.nodeName = nodeName; this.deterministicTaskQueue = deterministicTaskQueue; addStateApplier(event -> { diff --git a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java index 3ca938c99b5fd..53ef595c7931e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.opensearch.common.UUIDs; @@ -45,6 +46,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.node.Node; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -74,7 +76,8 @@ public FakeThreadPoolClusterManagerService( super( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); this.name = serviceName; this.onTaskAvailableToRun = onTaskAvailableToRun; diff --git a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java index 8f4f510da5ec3..f0c0e9bc2d589 100644 --- a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.core.util.Throwables; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; @@ -52,6 +53,8 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.Node; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.threadpool.ThreadPool; import java.util.Collections; @@ -66,7 +69,8 @@ public static ClusterManagerService createClusterManagerService(ThreadPool threa ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_cluster_manager_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { @@ -169,8 +173,22 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove } public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode, ClusterSettings clusterSettings) { + return createClusterService(threadPool, localNode, clusterSettings, NoopMetricsRegistry.INSTANCE); + } + + public static ClusterService createClusterService( + ThreadPool threadPool, + DiscoveryNode localNode, + ClusterSettings clusterSettings, + MetricsRegistry metricsRegistry + ) { Settings settings = Settings.builder().put("node.name", "test").put("cluster.name", "ClusterServiceTests").build(); - ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + ClusterService clusterService = new ClusterService( + settings, + clusterSettings, + threadPool, + new ClusterManagerMetrics(metricsRegistry) + ); clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService()); ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).clusterManagerNodeId(localNode.getId())) @@ -184,6 +202,10 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove return clusterService; } + public static ClusterService createClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + return new ClusterService(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); + } + public static NodeConnectionsService createNoOpNodeConnectionsService() { return new NodeConnectionsService(Settings.EMPTY, null, null) { @Override