From c66be2ce9d6d937b7ca9d99786afa0a982af11cb Mon Sep 17 00:00:00 2001 From: gargharsh3134 <51459091+gargharsh3134@users.noreply.github.com> Date: Thu, 6 Jun 2024 12:33:42 +0530 Subject: [PATCH] Adds counter metrics for leader and follower check failures (#12439) * Adds counter metrics for leader and follower check failures Signed-off-by: Harsh Garg --- CHANGELOG.md | 1 + .../cluster/ClusterManagerMetrics.java | 27 +++++++ .../cluster/coordination/Coordinator.java | 16 ++++- .../coordination/FollowersChecker.java | 7 +- .../cluster/coordination/LeaderChecker.java | 11 +-- .../opensearch/discovery/DiscoveryModule.java | 7 +- .../main/java/org/opensearch/node/Node.java | 3 +- .../coordination/FollowersCheckerTests.java | 65 +++++++++++++---- .../coordination/LeaderCheckerTests.java | 25 +++++-- .../cluster/coordination/NodeJoinTests.java | 3 +- .../discovery/DiscoveryModuleTests.java | 5 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../telemetry/TestInMemoryCounter.java | 52 ++++++++++++++ .../telemetry/TestInMemoryHistogram.java | 47 ++++++++++++ .../TestInMemoryMetricsRegistry.java | 71 +++++++++++++++++++ .../AbstractCoordinatorTestCase.java | 3 +- 16 files changed, 313 insertions(+), 33 deletions(-) create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ffb438172f50..759cf53adf8b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add leader and follower check failure counter metrics ([#12439](https://github.com/opensearch-project/OpenSearch/pull/12439)) - Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333)) - Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423)) - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java index d48f82a388245..a98349a4af5cd 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java @@ -8,6 +8,7 @@ package org.opensearch.cluster; +import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; @@ -23,6 +24,7 @@ public final class ClusterManagerMetrics { private static final String LATENCY_METRIC_UNIT_MS = "ms"; + private static final String COUNTER_METRICS_UNIT = "1"; public final Histogram clusterStateAppliersHistogram; public final Histogram clusterStateListenersHistogram; @@ -30,6 +32,9 @@ public final class ClusterManagerMetrics { public final Histogram clusterStateComputeHistogram; public final Histogram clusterStatePublishHistogram; + public final Counter leaderCheckFailureCounter; + public final Counter followerChecksFailureCounter; + public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { clusterStateAppliersHistogram = metricsRegistry.createHistogram( "cluster.state.appliers.latency", @@ -56,6 +61,16 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { "Histogram for recording time taken to publish a new cluster state", LATENCY_METRIC_UNIT_MS ); + followerChecksFailureCounter = metricsRegistry.createCounter( + "followers.checker.failure.count", + "Counter for number of failed follower checks", + COUNTER_METRICS_UNIT + ); + leaderCheckFailureCounter = metricsRegistry.createCounter( + "leader.checker.failure.count", + "Counter for number of failed leader checks", + COUNTER_METRICS_UNIT + ); } public void recordLatency(Histogram histogram, Double value) { @@ -69,4 +84,16 @@ public void recordLatency(Histogram histogram, Double value, Optional tags } histogram.record(value, tags.get()); } + + public void incrementCounter(Counter counter, Double value) { + incrementCounter(counter, value, Optional.empty()); + } + + public void incrementCounter(Counter counter, Double value, Optional tags) { + if (Objects.isNull(tags) || tags.isEmpty()) { + counter.add(value); + return; + } + counter.add(value, tags.get()); + } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 3d74feddfa261..f53e6837a67f5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -207,7 +208,8 @@ public Coordinator( ElectionStrategy electionStrategy, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; this.transportService = transportService; @@ -261,14 +263,22 @@ public Coordinator( this::handlePublishRequest, this::handleApplyCommit ); - this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService); + this.leaderChecker = new LeaderChecker( + settings, + clusterSettings, + transportService, + this::onLeaderFailure, + nodeHealthService, + clusterManagerMetrics + ); this.followersChecker = new FollowersChecker( settings, clusterSettings, transportService, this::onFollowerCheckRequest, this::removeNode, - nodeHealthService + nodeHealthService, + clusterManagerMetrics ); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 70bb0515bb022..2ec0dabd91786 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -127,6 +128,7 @@ public class FollowersChecker { private final TransportService transportService; private final NodeHealthService nodeHealthService; private volatile FastResponseState fastResponseState; + private ClusterManagerMetrics clusterManagerMetrics; public FollowersChecker( Settings settings, @@ -134,7 +136,8 @@ public FollowersChecker( TransportService transportService, Consumer handleRequestAndUpdateState, BiConsumer onNodeFailure, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; this.transportService = transportService; @@ -161,6 +164,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti handleDisconnectedNode(node); } }); + this.clusterManagerMetrics = clusterManagerMetrics; } private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) { @@ -413,6 +417,7 @@ public String executor() { } void failNode(String reason) { + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0); transportService.getThreadPool().generic().execute(new Runnable() { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 8d4373b865f62..4fd2c0eb13073 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; @@ -119,17 +120,17 @@ public class LeaderChecker { private final TransportService transportService; private final Consumer onLeaderFailure; private final NodeHealthService nodeHealthService; - private AtomicReference currentChecker = new AtomicReference<>(); - private volatile DiscoveryNodes discoveryNodes; + private final ClusterManagerMetrics clusterManagerMetrics; LeaderChecker( final Settings settings, final ClusterSettings clusterSettings, final TransportService transportService, final Consumer onLeaderFailure, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + final ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); @@ -138,6 +139,7 @@ public class LeaderChecker { this.transportService = transportService; this.onLeaderFailure = onLeaderFailure; this.nodeHealthService = nodeHealthService; + this.clusterManagerMetrics = clusterManagerMetrics; clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout); transportService.registerRequestHandler( @@ -293,7 +295,6 @@ public void handleResponse(Empty response) { logger.debug("closed check scheduler received a response, doing nothing"); return; } - failureCountSinceLastSuccess.set(0); scheduleNextWakeUp(); // logs trace message indicating success } @@ -304,7 +305,6 @@ public void handleException(TransportException exp) { logger.debug("closed check scheduler received a response, doing nothing"); return; } - if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp); leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp)); @@ -355,6 +355,7 @@ public String executor() { void leaderFailed(Exception e) { if (isClosed.compareAndSet(false, true)) { + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0); transportService.getThreadPool().generic().execute(new Runnable() { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 288371aa240a0..538dea5b2e60b 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.ElectionStrategy; @@ -133,7 +134,8 @@ public DiscoveryModule( RerouteService rerouteService, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + ClusterManagerMetrics clusterManagerMetrics ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -211,7 +213,8 @@ public DiscoveryModule( electionStrategy, nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + clusterManagerMetrics ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 49545fa8a0c8b..cb1f2caa082fc 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1199,7 +1199,8 @@ protected Node( rerouteService, fsHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + clusterManagerMetrics ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index a106706c00732..d0bc41b459cc3 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -33,6 +33,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; @@ -47,6 +48,9 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.TestInMemoryMetricsRegistry; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -131,6 +135,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req transportService.start(); transportService.acceptIncomingRequests(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final FollowersChecker followersChecker = new FollowersChecker( settings, clusterSettings, @@ -139,7 +145,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req (node, reason) -> { assert false : node; }, - () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info") + () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), + new ClusterManagerMetrics(metricsRegistry) ); followersChecker.setCurrentNodes(discoveryNodesHolder[0]); @@ -193,35 +200,43 @@ protected void onSendRequest(long requestId, String action, TransportRequest req followersChecker.clearCurrentNodes(); deterministicTaskQueue.runAllTasks(); assertThat(checkedNodes, empty()); + assertEquals(Integer.valueOf(0), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatDoesNotRespond() { final Settings settings = randomSettings(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( settings, () -> null, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis() + FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatRejectsCheck() { final Settings settings = randomSettings(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( settings, () -> { throw new OpenSearchException("simulated exception"); }, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailureCounterResetsOnSuccess() { final Settings settings = randomSettings(); final int retryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings); final int maxRecoveries = randomIntBetween(3, 10); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); // passes just enough checks to keep it alive, up to maxRecoveries, and then fails completely testBehaviourOfFailingNode(settings, new Supplier() { @@ -241,18 +256,23 @@ public Empty get() { "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings) .millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatIsDisconnected() { + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( Settings.EMPTY, () -> { throw new ConnectTransportException(null, "simulated exception"); }, "disconnected", 0, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatDisconnects() { @@ -297,6 +317,7 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean nodeFailed = new AtomicBoolean(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); final FollowersChecker followersChecker = new FollowersChecker( settings, @@ -307,7 +328,8 @@ public String toString() { assertTrue(nodeFailed.compareAndSet(false, true)); assertThat(reason, equalTo("disconnected")); }, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + new ClusterManagerMetrics(metricsRegistry) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -318,16 +340,20 @@ public String toString() { deterministicTaskQueue.runAllRunnableTasks(); assertTrue(nodeFailed.get()); assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatIsUnhealthy() { + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( randomSettings(), () -> { throw new NodeHealthCheckFailureException("non writable exception"); }, "health check failed", 0, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } private void testBehaviourOfFailingNode( @@ -335,7 +361,8 @@ private void testBehaviourOfFailingNode( Supplier responder, String failureReason, long expectedFailureTime, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + MetricsRegistry metricsRegistry ) { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); @@ -386,7 +413,6 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean nodeFailed = new AtomicBoolean(); - final FollowersChecker followersChecker = new FollowersChecker( settings, clusterSettings, @@ -396,7 +422,8 @@ public String toString() { assertTrue(nodeFailed.compareAndSet(false, true)); assertThat(reason, equalTo(failureReason)); }, - nodeHealthService + nodeHealthService, + new ClusterManagerMetrics(metricsRegistry) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -501,7 +528,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info")); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE); final long followerTerm = randomLongBetween(1, leaderTerm - 1); @@ -574,7 +605,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info")); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(HEALTHY, "healthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); { // Does not call into the coordinator in the normal case @@ -721,7 +756,11 @@ public void testPreferClusterManagerNodes() { ); final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> { assert false : fcr; - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info")); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(HEALTHY, "healthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); followersChecker.setCurrentNodes(discoveryNodes); List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) .map(cr -> cr.node) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index fe65058333116..decdeddfa37a1 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -34,6 +34,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; import org.opensearch.cluster.node.DiscoveryNode; @@ -44,6 +45,7 @@ import org.opensearch.core.transport.TransportResponse; import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.TestInMemoryMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -175,11 +177,13 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -229,6 +233,7 @@ public String toString() { ); } leaderChecker.updateLeader(null); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } enum Response { @@ -293,10 +298,13 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); + final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); leaderChecker.updateLeader(leader); { @@ -351,6 +359,7 @@ public String toString() { deterministicTaskQueue.runAllRunnableTasks(); assertTrue(leaderFailed.get()); } + assertEquals(Integer.valueOf(3), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } public void testFollowerFailsImmediatelyOnHealthCheckFailure() { @@ -407,10 +416,12 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), endsWith("failed health checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); leaderChecker.updateLeader(leader); @@ -430,6 +441,8 @@ public String toString() { assertTrue(leaderFailed.get()); } + + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } public void testLeaderBehaviour() { @@ -453,12 +466,15 @@ public void testLeaderBehaviour() { transportService.start(); transportService.acceptIncomingRequests(); + final TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker( settings, clusterSettings, transportService, e -> fail("shouldn't be checking anything"), - () -> nodeHealthServiceStatus.get() + () -> nodeHealthServiceStatus.get(), + clusterManagerMetrics ); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() @@ -523,6 +539,7 @@ public void testLeaderBehaviour() { equalTo("rejecting leader check from [" + otherNode + "] sent to a node that is no longer the cluster-manager") ); } + assertEquals(Integer.valueOf(0), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } private class CapturingTransportResponseHandler implements TransportResponseHandler { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 10d5dceb74f55..f84f0326f4a9d 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -273,7 +273,8 @@ protected void onSendRequest( ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService, persistedStateRegistry, - Mockito.mock(RemoteStoreNodeService.class) + Mockito.mock(RemoteStoreNodeService.class), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index b33ebf8333b36..5539b3237c2bf 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -32,6 +32,7 @@ package org.opensearch.discovery; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -48,6 +49,7 @@ import org.opensearch.gateway.GatewayMetaState; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransportService; @@ -128,7 +130,8 @@ private DiscoveryModule newModule(Settings settings, List plugi mock(RerouteService.class), null, new PersistedStateRegistry(), - remoteStoreNodeService + remoteStoreNodeService, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 5b39880930984..86de008b5dee5 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2559,7 +2559,8 @@ public void start(ClusterState initialState) { ElectionStrategy.DEFAULT_INSTANCE, () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java new file mode 100644 index 0000000000000..d9aee5ebfa941 --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This is a simple implementation of Counter which is utilized by TestInMemoryMetricsRegistry for + * Unit Tests. It initializes an atomic integer to add the values of counter which doesn't have any tags + * along with a map to store the values recorded against the tags. + * The map and atomic integer can then be used to get the added values. + */ +public class TestInMemoryCounter implements Counter { + + private AtomicInteger counterValue = new AtomicInteger(0); + private ConcurrentHashMap, Double> counterValueForTags = new ConcurrentHashMap<>(); + + public Integer getCounterValue() { + return this.counterValue.get(); + } + + public ConcurrentHashMap, Double> getCounterValueForTags() { + return this.counterValueForTags; + } + + @Override + public void add(double value) { + counterValue.addAndGet((int) value); + } + + @Override + public synchronized void add(double value, Tags tags) { + HashMap hashMap = (HashMap) tags.getTagsMap(); + if (counterValueForTags.get(hashMap) == null) { + counterValueForTags.put(hashMap, value); + } else { + value = counterValueForTags.get(hashMap) + value; + counterValueForTags.put(hashMap, value); + } + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java new file mode 100644 index 0000000000000..ff28df2b6529d --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This is a simple implementation of Histogram which is utilized by TestInMemoryMetricsRegistry for + * Unit Tests. It initializes an atomic integer to record the value of histogram which doesn't have any tags + * along with a map to store the values recorded against the tags. + * The map and atomic integer can then be used to get the recorded values. + */ +public class TestInMemoryHistogram implements Histogram { + + private AtomicInteger histogramValue = new AtomicInteger(0); + private ConcurrentHashMap, Double> histogramValueForTags = new ConcurrentHashMap<>(); + + public Integer getHistogramValue() { + return this.histogramValue.get(); + } + + public ConcurrentHashMap, Double> getHistogramValueForTags() { + return this.histogramValueForTags; + } + + @Override + public void record(double value) { + histogramValue.addAndGet((int) value); + } + + @Override + public synchronized void record(double value, Tags tags) { + HashMap hashMap = (HashMap) tags.getTagsMap(); + histogramValueForTags.put(hashMap, value); + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java new file mode 100644 index 0000000000000..6d395085b12ea --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * This is a simple implementation of MetricsRegistry which can be utilized by Unit Tests. + * It just initializes and stores counters/histograms within a map, once created. + * The maps can then be used to get the counters/histograms by their names. + */ +public class TestInMemoryMetricsRegistry implements MetricsRegistry { + + private ConcurrentHashMap counterStore = new ConcurrentHashMap<>(); + private ConcurrentHashMap histogramStore = new ConcurrentHashMap<>(); + + public ConcurrentHashMap getCounterStore() { + return this.counterStore; + } + + public ConcurrentHashMap getHistogramStore() { + return this.histogramStore; + } + + @Override + public Counter createCounter(String name, String description, String unit) { + TestInMemoryCounter counter = new TestInMemoryCounter(); + counterStore.putIfAbsent(name, counter); + return counter; + } + + @Override + public Counter createUpDownCounter(String name, String description, String unit) { + /** + * ToDo: To be implemented when required. + */ + return null; + } + + @Override + public Histogram createHistogram(String name, String description, String unit) { + TestInMemoryHistogram histogram = new TestInMemoryHistogram(); + histogramStore.putIfAbsent(name, histogram); + return histogram; + } + + @Override + public Closeable createGauge(String name, String description, String unit, Supplier valueProvider, Tags tags) { + /** + * ToDo: To be implemented when required. + */ + return null; + } + + @Override + public void close() throws IOException {} +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 3ae737bf63923..1c2270bab1260 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1183,7 +1183,8 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy(), nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(