Skip to content

Commit

Permalink
[Backport] ClusterManager metrics instrumentation changes (#14118)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Harsh Garg <[email protected]>
Co-authored-by: Harsh Garg <[email protected]>
  • Loading branch information
gargharsh3134 and Harsh Garg authored Jun 10, 2024
1 parent 9b72166 commit 2a8ffba
Show file tree
Hide file tree
Showing 56 changed files with 848 additions and 137 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add leader and follower check failure counter metrics ([#12439](https://github.com/opensearch-project/OpenSearch/pull/12439))
- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333))
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.rest.RestHandler;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
Expand Down Expand Up @@ -51,8 +52,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS);

clusterService = new ClusterService(settings, clusterSettings, threadPool);

clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
}

public void testGetSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterService = new ClusterService(settings, clusterSettings, null);
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase {
private final Settings.Builder settingsBuilder = Settings.builder();
private final Settings settings = settingsBuilder.build();
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
private final TransportService transportService = mock(TransportService.class);
private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class);
private final ActionFilters actionFilters = mock(ActionFilters.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Objects;
import java.util.Optional;

/**
* Class containing metrics (counters/latency) specific to ClusterManager.
*
* @opensearch.internal
*/
public final class ClusterManagerMetrics {

private static final String LATENCY_METRIC_UNIT_MS = "ms";
private static final String COUNTER_METRICS_UNIT = "1";

public final Histogram clusterStateAppliersHistogram;
public final Histogram clusterStateListenersHistogram;
public final Histogram rerouteHistogram;
public final Histogram clusterStateComputeHistogram;
public final Histogram clusterStatePublishHistogram;

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
"cluster.state.appliers.latency",
"Histogram for tracking the latency of cluster state appliers",
LATENCY_METRIC_UNIT_MS
);
clusterStateListenersHistogram = metricsRegistry.createHistogram(
"cluster.state.listeners.latency",
"Histogram for tracking the latency of cluster state listeners",
LATENCY_METRIC_UNIT_MS
);
rerouteHistogram = metricsRegistry.createHistogram(
"allocation.reroute.latency",
"Histogram for recording latency of shard re-routing",
LATENCY_METRIC_UNIT_MS
);
clusterStateComputeHistogram = metricsRegistry.createHistogram(
"cluster.state.new.compute.latency",
"Histogram for recording time taken to compute new cluster state",
LATENCY_METRIC_UNIT_MS
);
clusterStatePublishHistogram = metricsRegistry.createHistogram(
"cluster.state.publish.success.latency",
"Histogram for recording time taken to publish a new cluster state",
LATENCY_METRIC_UNIT_MS
);
followerChecksFailureCounter = metricsRegistry.createCounter(
"followers.checker.failure.count",
"Counter for number of failed follower checks",
COUNTER_METRICS_UNIT
);
leaderCheckFailureCounter = metricsRegistry.createCounter(
"leader.checker.failure.count",
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
}

public void recordLatency(Histogram histogram, Double value) {
histogram.record(value);
}

public void recordLatency(Histogram histogram, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
histogram.record(value);
return;
}
histogram.record(value, tags.get());
}

public void incrementCounter(Counter counter, Double value) {
incrementCounter(counter, value, Optional.empty());
}

public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
counter.add(value);
return;
}
counter.add(value, tags.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public ClusterModule(
List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext
ThreadContext threadContext,
ClusterManagerMetrics clusterManagerMetrics
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand All @@ -157,7 +158,8 @@ public ClusterModule(
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
settings
settings,
clusterManagerMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.LegacyESVersion;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -208,7 +209,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -262,14 +264,22 @@ public Coordinator(
this::handlePublishRequest,
this::handleApplyCommit
);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.leaderChecker = new LeaderChecker(
settings,
clusterSettings,
transportService,
this::onLeaderFailure,
nodeHealthService,
clusterManagerMetrics
);
this.followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
this::onFollowerCheckRequest,
this::removeNode,
nodeHealthService
nodeHealthService,
clusterManagerMetrics
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -127,14 +128,16 @@ public class FollowersChecker {
private final TransportService transportService;
private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;
private ClusterManagerMetrics clusterManagerMetrics;

public FollowersChecker(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -161,6 +164,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
Expand Down Expand Up @@ -413,6 +417,7 @@ public String executor() {
}

void failNode(String reason) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0);
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -119,17 +120,17 @@ public class LeaderChecker {
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;
private final ClusterManagerMetrics clusterManagerMetrics;

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
final ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
Expand All @@ -138,6 +139,7 @@ public class LeaderChecker {
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
this.clusterManagerMetrics = clusterManagerMetrics;
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);

transportService.registerRequestHandler(
Expand Down Expand Up @@ -293,7 +295,6 @@ public void handleResponse(Empty response) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

failureCountSinceLastSuccess.set(0);
scheduleNextWakeUp(); // logs trace message indicating success
}
Expand All @@ -304,7 +305,6 @@ public void handleException(TransportException exp) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
Expand Down Expand Up @@ -355,6 +355,7 @@ public String executor() {

void leaderFailed(Exception e) {
if (isClosed.compareAndSet(false, true)) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0);
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
Expand Down
Loading

0 comments on commit 2a8ffba

Please sign in to comment.