Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds latency metrics for ClusterState Appliers and Listeners #12333

Merged
merged 10 commits into from
May 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.rest.RestHandler;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
Expand Down Expand Up @@ -50,8 +51,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);

clusterService = new ClusterService(settings, clusterSettings, threadPool);

clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
}

public void testGetSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -57,7 +58,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterService = new ClusterService(settings, clusterSettings, null);
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase {
private final Settings.Builder settingsBuilder = Settings.builder();
private final Settings settings = settingsBuilder.build();
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
private final TransportService transportService = mock(TransportService.class);
private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class);
private final ActionFilters actionFilters = mock(ActionFilters.class);
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResultsService;
import org.opensearch.telemetry.metrics.MetricsRegistry;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -145,15 +146,22 @@ public ClusterModule(
List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext
ThreadContext threadContext,
MetricsRegistry metricsRegistry
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
metricsRegistry
);
}

public static List<Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -91,6 +95,7 @@ public class AllocationService {
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private Histogram rerouteHistogram;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand All @@ -100,20 +105,30 @@ public AllocationService(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, NoopMetricsRegistry.INSTANCE);
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
SnapshotsInfoService snapshotsInfoService,
MetricsRegistry metricsRegistry
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.rerouteHistogram = metricsRegistry.createHistogram(
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
"allocation.reroute.latency",
"Histogram for recording latency of shard re-routing",
"ms"
);
}

/**
Expand Down Expand Up @@ -533,11 +548,12 @@ private void reroute(RoutingAllocation allocation) {
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();

long rerouteStartTimeNS = System.nanoTime();
removeDelayMarkers(allocation);

allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
shardsAllocator.allocate(allocation);
this.rerouteHistogram.record((double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS)));
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -121,7 +124,17 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
private Histogram clusterStateAppliersHistogram;

private Histogram clusterStateListenersHistogram;

public ClusterApplierService(
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
String nodeName,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
MetricsRegistry metricsRegistry
) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
Expand All @@ -132,6 +145,20 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold
);
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.clusterStateAppliersHistogram = metricsRegistry.createHistogram(
"cluster.state.appliers.latency",
"Histogram for tracking the latency of cluster state appliers",
"ms"
);
this.clusterStateListenersHistogram = metricsRegistry.createHistogram(
"cluster.state.listeners.latency",
"Histogram for tracking the latency of cluster state listeners",
"ms"
);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -597,15 +624,20 @@ private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, S
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
}

private static void callClusterStateAppliers(
private void callClusterStateAppliers(
ClusterChangedEvent clusterChangedEvent,
StopWatch stopWatch,
Collection<ClusterStateApplier> clusterStateAppliers
) {
for (ClusterStateApplier applier : clusterStateAppliers) {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("running applier [" + applier + "]")) {
long applierStartTimeNS = System.nanoTime();
applier.applyClusterState(clusterChangedEvent);
clusterStateAppliersHistogram.record(
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - applierStartTimeNS)),
Tags.create().addTag("Operation", applier.getClass().getSimpleName())
);
}
}
}
Expand All @@ -624,7 +656,12 @@ private void callClusterStateListener(
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
long listenerStartTimeNS = System.nanoTime();
listener.clusterChanged(clusterChangedEvent);
clusterStateListenersHistogram.record(
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - listenerStartTimeNS)),
Tags.create().addTag("Operation", listener.getClass().getSimpleName())
);
}
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

/**
Expand All @@ -20,7 +21,12 @@
*/
@PublicApi(since = "2.2.0")
public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
public ClusterManagerService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
MetricsRegistry metricsRegistry
) {
super(settings, clusterSettings, threadPool, metricsRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.node.Node;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
Expand Down Expand Up @@ -91,12 +92,12 @@ public class ClusterService extends AbstractLifecycleComponent {

private IndexingPressureService indexingPressureService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, MetricsRegistry metricsRegistry) {
this(
settings,
clusterSettings,
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterManagerService(settings, clusterSettings, threadPool, metricsRegistry),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, metricsRegistry)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.discovery.Discovery;
import org.opensearch.node.Node;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -135,8 +138,10 @@ public class MasterService extends AbstractLifecycleComponent {
protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
private final ClusterManagerThrottlingStats throttlingStats;
private final ClusterStateStats stateStats;
private Histogram clusterStateComputeHistogram;
private Histogram clusterStatePublishHistogram;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, MetricsRegistry metricsRegistry) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));

this.slowTaskLoggingThreshold = CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
Expand All @@ -154,6 +159,20 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
);
this.stateStats = new ClusterStateStats();
this.threadPool = threadPool;
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.clusterStateComputeHistogram = metricsRegistry.createHistogram(
"cluster.state.new.compute.latency",
"Histogram for recording time taken to compute new cluster state",
"ms"
);
this.clusterStatePublishHistogram = metricsRegistry.createHistogram(
"cluster.state.publish.success.latency",
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
"Histogram for recording time taken to publish a new cluster state",
"ms"
);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -302,6 +321,10 @@ private void runTasks(TaskInputs taskInputs) {
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);
clusterStateComputeHistogram.record(
computationTime.getMillis(),
Tags.create().addTag("Operation", taskInputs.executor.getClass().getSimpleName())
);

if (taskOutputs.clusterStateUnchanged()) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
Expand Down Expand Up @@ -361,6 +384,7 @@ protected boolean blockingAllowed() {
final long durationMillis = getTimeSince(startTimeNanos).millis();
stateStats.stateUpdateTook(durationMillis);
stateStats.stateUpdated();
clusterStatePublishHistogram.record(durationMillis);
} catch (Exception e) {
stateStats.stateUpdateFailed();
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e);
Expand Down
Loading
Loading