From 04b4cb6fc87871a0cd9cdc973aff5137f53c02e3 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Fri, 23 Feb 2024 12:40:51 +0530 Subject: [PATCH 1/7] Adds counter metrics for leader and follower check failures Signed-off-by: Harsh Garg --- .../cluster/coordination/Coordinator.java | 16 ++++++++++--- .../coordination/FollowersChecker.java | 18 +++++++++++++- .../cluster/coordination/LeaderChecker.java | 24 +++++++++++++++---- .../opensearch/discovery/DiscoveryModule.java | 7 ++++-- .../main/java/org/opensearch/node/Node.java | 3 ++- .../coordination/FollowersCheckerTests.java | 23 +++++++++++++----- .../coordination/LeaderCheckerTests.java | 10 ++++---- .../cluster/coordination/NodeJoinTests.java | 4 +++- .../discovery/DiscoveryModuleTests.java | 6 ++++- .../snapshots/SnapshotResiliencyTests.java | 4 +++- .../AbstractCoordinatorTestCase.java | 6 ++++- 11 files changed, 95 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 3d74feddfa261..d702003c98b25 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -87,6 +87,7 @@ import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.TransportService; @@ -207,7 +208,8 @@ public Coordinator( ElectionStrategy electionStrategy, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + MetricsRegistry metricsRegistry ) { this.settings = settings; this.transportService = transportService; @@ -261,14 +263,22 @@ public Coordinator( this::handlePublishRequest, this::handleApplyCommit ); - this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService); + this.leaderChecker = new LeaderChecker( + settings, + clusterSettings, + transportService, + this::onLeaderFailure, + nodeHealthService, + metricsRegistry + ); this.followersChecker = new FollowersChecker( settings, clusterSettings, transportService, this::onFollowerCheckRequest, this::removeNode, - nodeHealthService + nodeHealthService, + metricsRegistry ); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 70bb0515bb022..210a5028dfd4d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -48,6 +48,8 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.Transport; @@ -112,6 +114,8 @@ public class FollowersChecker { Setting.Property.NodeScope ); + private static final String UNIT = "1"; + private final Settings settings; private final TimeValue followerCheckInterval; @@ -127,6 +131,7 @@ public class FollowersChecker { private final TransportService transportService; private final NodeHealthService nodeHealthService; private volatile FastResponseState fastResponseState; + private Counter followerChecksFailureCounter; public FollowersChecker( Settings settings, @@ -134,7 +139,8 @@ public FollowersChecker( TransportService transportService, Consumer handleRequestAndUpdateState, BiConsumer onNodeFailure, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + MetricsRegistry metricsRegistry ) { this.settings = settings; this.transportService = transportService; @@ -161,6 +167,15 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti handleDisconnectedNode(node); } }); + initializeMetrics(metricsRegistry); + } + + private void initializeMetrics(MetricsRegistry metricsRegistry) { + this.followerChecksFailureCounter = metricsRegistry.createCounter( + "follower.checker.failure.count", + "Counter for number of failed follower checks", + UNIT + ); } private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) { @@ -401,6 +416,7 @@ public void handleException(TransportException exp) { return; } + followerChecksFailureCounter.add(1); failNode(reason); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 8d4373b865f62..2c2467c0c7643 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -50,6 +50,8 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.NodeDisconnectedException; @@ -111,6 +113,8 @@ public class LeaderChecker { Setting.Property.NodeScope ); + private static final String UNIT = "1"; + private final Settings settings; private final TimeValue leaderCheckInterval; @@ -119,17 +123,17 @@ public class LeaderChecker { private final TransportService transportService; private final Consumer onLeaderFailure; private final NodeHealthService nodeHealthService; - private AtomicReference currentChecker = new AtomicReference<>(); - private volatile DiscoveryNodes discoveryNodes; + private Counter leaderCheckFailureCounter; LeaderChecker( final Settings settings, final ClusterSettings clusterSettings, final TransportService transportService, final Consumer onLeaderFailure, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + final MetricsRegistry metricsRegistry ) { this.settings = settings; leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); @@ -158,6 +162,15 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti handleDisconnectedNode(node); } }); + initializeMetrics(metricsRegistry); + } + + private void initializeMetrics(MetricsRegistry metricsRegistry) { + this.leaderCheckFailureCounter = metricsRegistry.createCounter( + "leader.checker.failure.count", + "Counter for number of failed leader checks", + UNIT + ); } private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) { @@ -293,7 +306,6 @@ public void handleResponse(Empty response) { logger.debug("closed check scheduler received a response, doing nothing"); return; } - failureCountSinceLastSuccess.set(0); scheduleNextWakeUp(); // logs trace message indicating success } @@ -304,14 +316,15 @@ public void handleException(TransportException exp) { logger.debug("closed check scheduler received a response, doing nothing"); return; } - if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp); leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp)); + leaderCheckFailureCounter.add(1); return; } else if (exp.getCause() instanceof NodeHealthCheckFailureException) { logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp); leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp)); + leaderCheckFailureCounter.add(1); return; } long failureCount = failureCountSinceLastSuccess.incrementAndGet(); @@ -329,6 +342,7 @@ public void handleException(TransportException exp) { leaderFailed( new OpenSearchException("node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp) ); + leaderCheckFailureCounter.add(1); return; } diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 288371aa240a0..cdd1fda03dd4f 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -55,6 +55,7 @@ import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -133,7 +134,8 @@ public DiscoveryModule( RerouteService rerouteService, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + MetricsRegistry metricsRegistry ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -211,7 +213,8 @@ public DiscoveryModule( electionStrategy, nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + metricsRegistry ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ea449afe1c811..4c3f981f999a3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1129,7 +1129,8 @@ protected Node( rerouteService, fsHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + metricsRegistry ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index a106706c00732..d456231c30e4e 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -47,6 +47,8 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -75,6 +77,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.mockito.Mockito; + import static java.util.Collections.emptySet; import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME; import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING; @@ -139,7 +143,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req (node, reason) -> { assert false : node; }, - () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info") + () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), + Mockito.mock(MetricsRegistry.class) ); followersChecker.setCurrentNodes(discoveryNodesHolder[0]); @@ -307,7 +312,8 @@ public String toString() { assertTrue(nodeFailed.compareAndSet(false, true)); assertThat(reason, equalTo("disconnected")); }, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + Mockito.mock(MetricsRegistry.class) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -396,7 +402,8 @@ public String toString() { assertTrue(nodeFailed.compareAndSet(false, true)); assertThat(reason, equalTo(failureReason)); }, - nodeHealthService + nodeHealthService, + NoopMetricsRegistry.INSTANCE ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -501,7 +508,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info")); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), + Mockito.mock(MetricsRegistry.class) + ); final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE); final long followerTerm = randomLongBetween(1, leaderTerm - 1); @@ -574,7 +585,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info")); + }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class)); { // Does not call into the coordinator in the normal case @@ -721,7 +732,7 @@ public void testPreferClusterManagerNodes() { ); final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> { assert false : fcr; - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info")); + }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class)); followersChecker.setCurrentNodes(discoveryNodes); List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) .map(cr -> cr.node) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index fe65058333116..7194d1009f49f 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -44,6 +44,7 @@ import org.opensearch.core.transport.TransportResponse; import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -179,7 +180,7 @@ public String toString() { final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -296,7 +297,7 @@ public String toString() { final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); leaderChecker.updateLeader(leader); { @@ -410,7 +411,7 @@ public String toString() { final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), endsWith("failed health checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); leaderChecker.updateLeader(leader); @@ -458,7 +459,8 @@ public void testLeaderBehaviour() { clusterSettings, transportService, e -> fail("shouldn't be checking anything"), - () -> nodeHealthServiceStatus.get() + () -> nodeHealthServiceStatus.get(), + NoopMetricsRegistry.INSTANCE ); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index d94f3fb304fe2..da7a2e4510166 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -61,6 +61,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -270,7 +271,8 @@ protected void onSendRequest( ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService, persistedStateRegistry, - Mockito.mock(RemoteStoreNodeService.class) + Mockito.mock(RemoteStoreNodeService.class), + Mockito.mock(MetricsRegistry.class) ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index b33ebf8333b36..80830f04ceb3d 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -48,6 +48,7 @@ import org.opensearch.gateway.GatewayMetaState; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransportService; @@ -66,6 +67,8 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; +import org.mockito.Mockito; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -128,7 +131,8 @@ private DiscoveryModule newModule(Settings settings, List plugi mock(RerouteService.class), null, new PersistedStateRegistry(), - remoteStoreNodeService + remoteStoreNodeService, + Mockito.mock(MetricsRegistry.class) ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 58315ba031b84..61ca7f370d85d 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -226,6 +226,7 @@ import org.opensearch.search.query.QueryPhase; import org.opensearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; @@ -2549,7 +2550,8 @@ public void start(ClusterState initialState) { ElectionStrategy.DEFAULT_INSTANCE, () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + Mockito.mock(MetricsRegistry.class) ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 0754cc1793dc8..357c14a966c65 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -89,6 +89,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.disruption.DisruptableMockTransport; @@ -127,6 +128,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.mockito.Mockito; + import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; @@ -1180,7 +1183,8 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy(), nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + Mockito.mock(MetricsRegistry.class) ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( From 63eba9378489040356ec3b8edb5450e935c70939 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Thu, 7 Mar 2024 02:41:34 +0530 Subject: [PATCH 2/7] Refactoring UTs to assert counters Signed-off-by: Harsh Garg --- .../coordination/FollowersChecker.java | 2 +- .../coordination/FollowersCheckerTests.java | 31 +++++++++------ .../coordination/LeaderCheckerTests.java | 39 ++++++++++++++++--- .../cluster/coordination/NodeJoinTests.java | 4 +- .../discovery/DiscoveryModuleTests.java | 6 +-- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../AbstractCoordinatorTestCase.java | 6 +-- 7 files changed, 61 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 210a5028dfd4d..c50f50c06370f 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -172,7 +172,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti private void initializeMetrics(MetricsRegistry metricsRegistry) { this.followerChecksFailureCounter = metricsRegistry.createCounter( - "follower.checker.failure.count", + "followers.checker.failure.count", "Counter for number of failed follower checks", UNIT ); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index d456231c30e4e..fda7660cd3121 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -47,6 +47,7 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -77,8 +78,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.mockito.Mockito; - import static java.util.Collections.emptySet; import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME; import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING; @@ -95,6 +94,12 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class FollowersCheckerTests extends OpenSearchTestCase { @@ -144,7 +149,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req assert false : node; }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), - Mockito.mock(MetricsRegistry.class) + NoopMetricsRegistry.INSTANCE ); followersChecker.setCurrentNodes(discoveryNodesHolder[0]); @@ -313,7 +318,7 @@ public String toString() { assertThat(reason, equalTo("disconnected")); }, () -> new StatusInfo(HEALTHY, "healthy-info"), - Mockito.mock(MetricsRegistry.class) + NoopMetricsRegistry.INSTANCE ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -393,6 +398,10 @@ public String toString() { final AtomicBoolean nodeFailed = new AtomicBoolean(); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Counter counter = mock(Counter.class); + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(counter); + final FollowersChecker followersChecker = new FollowersChecker( settings, clusterSettings, @@ -403,7 +412,7 @@ public String toString() { assertThat(reason, equalTo(failureReason)); }, nodeHealthService, - NoopMetricsRegistry.INSTANCE + metricsRegistry ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -449,6 +458,8 @@ public String toString() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertTrue(nodeFailed.get()); assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); + + verify(counter, atLeastOnce()).add(anyDouble()); } public void testFollowerCheckRequestEqualsHashCodeSerialization() { @@ -508,11 +519,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, - (node, reason) -> { assert false : node; }, - () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), - Mockito.mock(MetricsRegistry.class) - ); + }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), NoopMetricsRegistry.INSTANCE); final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE); final long followerTerm = randomLongBetween(1, leaderTerm - 1); @@ -585,7 +592,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class)); + }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); { // Does not call into the coordinator in the normal case @@ -732,7 +739,7 @@ public void testPreferClusterManagerNodes() { ); final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> { assert false : fcr; - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class)); + }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); followersChecker.setCurrentNodes(discoveryNodes); List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) .map(cr -> cr.node) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index 7194d1009f49f..8029fb3aba7ea 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -44,7 +44,8 @@ import org.opensearch.core.transport.TransportResponse; import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -79,6 +80,13 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.matchesRegex; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; public class LeaderCheckerTests extends OpenSearchTestCase { @@ -176,11 +184,14 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Counter leaderCheckFailedCounter = mock(Counter.class); + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -230,6 +241,8 @@ public String toString() { ); } leaderChecker.updateLeader(null); + verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); + verify(leaderCheckFailedCounter, times(1)).add(anyDouble()); } enum Response { @@ -294,10 +307,14 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Counter leaderCheckFailedCounter = mock(Counter.class); + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter); + final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); leaderChecker.updateLeader(leader); { @@ -352,6 +369,8 @@ public String toString() { deterministicTaskQueue.runAllRunnableTasks(); assertTrue(leaderFailed.get()); } + verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); + verify(leaderCheckFailedCounter, times(2)).add(anyDouble()); } public void testFollowerFailsImmediatelyOnHealthCheckFailure() { @@ -408,10 +427,13 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Counter leaderChecksFailedCounter = mock(Counter.class); + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), endsWith("failed health checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); leaderChecker.updateLeader(leader); @@ -431,6 +453,8 @@ public String toString() { assertTrue(leaderFailed.get()); } + verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); + verify(leaderChecksFailedCounter, times(1)).add(anyDouble()); } public void testLeaderBehaviour() { @@ -454,13 +478,16 @@ public void testLeaderBehaviour() { transportService.start(); transportService.acceptIncomingRequests(); + final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + final Counter leaderChecksFailedCounter = mock(Counter.class); + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter); final LeaderChecker leaderChecker = new LeaderChecker( settings, clusterSettings, transportService, e -> fail("shouldn't be checking anything"), () -> nodeHealthServiceStatus.get(), - NoopMetricsRegistry.INSTANCE + metricsRegistry ); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() @@ -525,6 +552,8 @@ public void testLeaderBehaviour() { equalTo("rejecting leader check from [" + otherNode + "] sent to a node that is no longer the cluster-manager") ); } + verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); + verifyNoInteractions(leaderChecksFailedCounter); } private class CapturingTransportResponseHandler implements TransportResponseHandler { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index da7a2e4510166..6c44c000205f7 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -61,7 +61,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeService; -import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -272,7 +272,7 @@ protected void onSendRequest( nodeHealthService, persistedStateRegistry, Mockito.mock(RemoteStoreNodeService.class), - Mockito.mock(MetricsRegistry.class) + NoopMetricsRegistry.INSTANCE ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 80830f04ceb3d..44e037e5a9845 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -48,7 +48,7 @@ import org.opensearch.gateway.GatewayMetaState; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; -import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransportService; @@ -67,8 +67,6 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; -import org.mockito.Mockito; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -132,7 +130,7 @@ private DiscoveryModule newModule(Settings settings, List plugi null, new PersistedStateRegistry(), remoteStoreNodeService, - Mockito.mock(MetricsRegistry.class) + NoopMetricsRegistry.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 61ca7f370d85d..9922f9e6fa12b 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -226,7 +226,6 @@ import org.opensearch.search.query.QueryPhase; import org.opensearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.opensearch.tasks.TaskResourceTrackingService; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; @@ -2551,7 +2550,7 @@ public void start(ClusterState initialState) { () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, remoteStoreNodeService, - Mockito.mock(MetricsRegistry.class) + NoopMetricsRegistry.INSTANCE ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 357c14a966c65..fae2cd59791c1 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -89,7 +89,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.repositories.RepositoriesService; -import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.disruption.DisruptableMockTransport; @@ -128,8 +128,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.mockito.Mockito; - import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; @@ -1184,7 +1182,7 @@ protected Optional getDisruptableMockTransport(Transpo nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - Mockito.mock(MetricsRegistry.class) + NoopMetricsRegistry.INSTANCE ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( From 48e38b07607674cb6e61c0957cee7359e1a80dd6 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Tue, 26 Mar 2024 10:43:38 +0530 Subject: [PATCH 3/7] Adding TestMetricsRegistry for UTs Signed-off-by: Harsh Garg --- .../coordination/ClusterManagerMetrics.java | 52 +++++++++++++ .../cluster/coordination/Coordinator.java | 7 +- .../coordination/FollowersChecker.java | 20 +---- .../cluster/coordination/LeaderChecker.java | 22 +----- .../opensearch/discovery/DiscoveryModule.java | 6 +- .../main/java/org/opensearch/node/Node.java | 4 +- .../coordination/FollowersCheckerTests.java | 67 +++++++++------- .../coordination/LeaderCheckerTests.java | 50 +++++------- .../cluster/coordination/NodeJoinTests.java | 2 +- .../discovery/DiscoveryModuleTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../telemetry/TestInMemoryCounter.java | 52 +++++++++++++ .../telemetry/TestInMemoryHistogram.java | 47 ++++++++++++ .../TestInMemoryMetricsRegistry.java | 76 +++++++++++++++++++ .../AbstractCoordinatorTestCase.java | 2 +- 15 files changed, 308 insertions(+), 105 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java new file mode 100644 index 0000000000000..27a269202f048 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.Objects; +import java.util.Optional; + +/** + * Class containing metrics (counters/latency) specific to ClusterManager. + */ +public final class ClusterManagerMetrics { + + private static final String COUNTER_METRICS_UNIT = "1"; + + public final Counter leaderCheckFailureCounter; + public final Counter followerChecksFailureCounter; + + public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { + this.followerChecksFailureCounter = metricsRegistry.createCounter( + "followers.checker.failure.count", + "Counter for number of failed follower checks", + COUNTER_METRICS_UNIT + ); + this.leaderCheckFailureCounter = metricsRegistry.createCounter( + "leader.checker.failure.count", + "Counter for number of failed leader checks", + COUNTER_METRICS_UNIT + ); + } + + public void incrementCounter(Counter counter, Double value) { + incrementCounter(counter, value, Optional.empty()); + } + + public void incrementCounter(Counter counter, Double value, Optional tags) { + if (Objects.isNull(tags) || tags.isEmpty()) { + counter.add(value); + return; + } + counter.add(value, tags.get()); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index d702003c98b25..2165984418af5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -87,7 +87,6 @@ import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.TransportService; @@ -209,7 +208,7 @@ public Coordinator( NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, RemoteStoreNodeService remoteStoreNodeService, - MetricsRegistry metricsRegistry + ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; this.transportService = transportService; @@ -269,7 +268,7 @@ public Coordinator( transportService, this::onLeaderFailure, nodeHealthService, - metricsRegistry + clusterManagerMetrics ); this.followersChecker = new FollowersChecker( settings, @@ -278,7 +277,7 @@ public Coordinator( this::onFollowerCheckRequest, this::removeNode, nodeHealthService, - metricsRegistry + clusterManagerMetrics ); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index c50f50c06370f..312c3bc038c80 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -48,8 +48,6 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.Transport; @@ -114,8 +112,6 @@ public class FollowersChecker { Setting.Property.NodeScope ); - private static final String UNIT = "1"; - private final Settings settings; private final TimeValue followerCheckInterval; @@ -131,7 +127,7 @@ public class FollowersChecker { private final TransportService transportService; private final NodeHealthService nodeHealthService; private volatile FastResponseState fastResponseState; - private Counter followerChecksFailureCounter; + private ClusterManagerMetrics clusterManagerMetrics; public FollowersChecker( Settings settings, @@ -140,7 +136,7 @@ public FollowersChecker( Consumer handleRequestAndUpdateState, BiConsumer onNodeFailure, NodeHealthService nodeHealthService, - MetricsRegistry metricsRegistry + ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; this.transportService = transportService; @@ -167,15 +163,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti handleDisconnectedNode(node); } }); - initializeMetrics(metricsRegistry); - } - - private void initializeMetrics(MetricsRegistry metricsRegistry) { - this.followerChecksFailureCounter = metricsRegistry.createCounter( - "followers.checker.failure.count", - "Counter for number of failed follower checks", - UNIT - ); + this.clusterManagerMetrics = clusterManagerMetrics; } private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) { @@ -416,7 +404,6 @@ public void handleException(TransportException exp) { return; } - followerChecksFailureCounter.add(1); failNode(reason); } @@ -442,6 +429,7 @@ public void run() { followerCheckers.remove(discoveryNode); } onNodeFailure.accept(discoveryNode, reason); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 2c2467c0c7643..0c12fc1d1ee2c 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -50,8 +50,6 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.NodeDisconnectedException; @@ -113,8 +111,6 @@ public class LeaderChecker { Setting.Property.NodeScope ); - private static final String UNIT = "1"; - private final Settings settings; private final TimeValue leaderCheckInterval; @@ -125,7 +121,7 @@ public class LeaderChecker { private final NodeHealthService nodeHealthService; private AtomicReference currentChecker = new AtomicReference<>(); private volatile DiscoveryNodes discoveryNodes; - private Counter leaderCheckFailureCounter; + private final ClusterManagerMetrics clusterManagerMetrics; LeaderChecker( final Settings settings, @@ -133,7 +129,7 @@ public class LeaderChecker { final TransportService transportService, final Consumer onLeaderFailure, NodeHealthService nodeHealthService, - final MetricsRegistry metricsRegistry + final ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); @@ -142,6 +138,7 @@ public class LeaderChecker { this.transportService = transportService; this.onLeaderFailure = onLeaderFailure; this.nodeHealthService = nodeHealthService; + this.clusterManagerMetrics = clusterManagerMetrics; clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout); transportService.registerRequestHandler( @@ -162,15 +159,6 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti handleDisconnectedNode(node); } }); - initializeMetrics(metricsRegistry); - } - - private void initializeMetrics(MetricsRegistry metricsRegistry) { - this.leaderCheckFailureCounter = metricsRegistry.createCounter( - "leader.checker.failure.count", - "Counter for number of failed leader checks", - UNIT - ); } private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) { @@ -319,12 +307,10 @@ public void handleException(TransportException exp) { if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp); leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp)); - leaderCheckFailureCounter.add(1); return; } else if (exp.getCause() instanceof NodeHealthCheckFailureException) { logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp); leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp)); - leaderCheckFailureCounter.add(1); return; } long failureCount = failureCountSinceLastSuccess.incrementAndGet(); @@ -342,7 +328,6 @@ public void handleException(TransportException exp) { leaderFailed( new OpenSearchException("node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp) ); - leaderCheckFailureCounter.add(1); return; } @@ -373,6 +358,7 @@ void leaderFailed(Exception e) { @Override public void run() { onLeaderFailure.accept(e); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0); } @Override diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index cdd1fda03dd4f..08e2fbd4dd08a 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.ElectionStrategy; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -55,7 +56,6 @@ import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -135,7 +135,7 @@ public DiscoveryModule( NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, RemoteStoreNodeService remoteStoreNodeService, - MetricsRegistry metricsRegistry + ClusterManagerMetrics clusterManagerMetrics ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -214,7 +214,7 @@ public DiscoveryModule( nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - metricsRegistry + clusterManagerMetrics ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4c3f981f999a3..b91917c7a0ba5 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -66,6 +66,7 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexTemplateMetadata; @@ -644,6 +645,7 @@ protected Node( final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... @@ -1130,7 +1132,7 @@ protected Node( fsHealthService, persistedStateRegistry, remoteStoreNodeService, - metricsRegistry + clusterManagerMetrics ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index fda7660cd3121..229caa8cf4b52 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -47,7 +47,7 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.TestInMemoryMetricsRegistry; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -94,12 +94,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class FollowersCheckerTests extends OpenSearchTestCase { @@ -149,7 +143,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req assert false : node; }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); followersChecker.setCurrentNodes(discoveryNodesHolder[0]); @@ -207,31 +201,38 @@ protected void onSendRequest(long requestId, String action, TransportRequest req public void testFailsNodeThatDoesNotRespond() { final Settings settings = randomSettings(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( settings, () -> null, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis() + FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatRejectsCheck() { final Settings settings = randomSettings(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( settings, () -> { throw new OpenSearchException("simulated exception"); }, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailureCounterResetsOnSuccess() { final Settings settings = randomSettings(); final int retryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings); final int maxRecoveries = randomIntBetween(3, 10); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); // passes just enough checks to keep it alive, up to maxRecoveries, and then fails completely testBehaviourOfFailingNode(settings, new Supplier() { @@ -251,18 +252,23 @@ public Empty get() { "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings) .millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatIsDisconnected() { + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( Settings.EMPTY, () -> { throw new ConnectTransportException(null, "simulated exception"); }, "disconnected", 0, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatDisconnects() { @@ -318,7 +324,7 @@ public String toString() { assertThat(reason, equalTo("disconnected")); }, () -> new StatusInfo(HEALTHY, "healthy-info"), - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -332,13 +338,16 @@ public String toString() { } public void testFailsNodeThatIsUnhealthy() { + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( randomSettings(), () -> { throw new NodeHealthCheckFailureException("non writable exception"); }, "health check failed", 0, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } private void testBehaviourOfFailingNode( @@ -346,7 +355,8 @@ private void testBehaviourOfFailingNode( Supplier responder, String failureReason, long expectedFailureTime, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + MetricsRegistry metricsRegistry ) { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); @@ -397,11 +407,6 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean nodeFailed = new AtomicBoolean(); - - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter counter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(counter); - final FollowersChecker followersChecker = new FollowersChecker( settings, clusterSettings, @@ -412,7 +417,7 @@ public String toString() { assertThat(reason, equalTo(failureReason)); }, nodeHealthService, - metricsRegistry + new ClusterManagerMetrics(metricsRegistry) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -458,8 +463,6 @@ public String toString() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertTrue(nodeFailed.get()); assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); - - verify(counter, atLeastOnce()).add(anyDouble()); } public void testFollowerCheckRequestEqualsHashCodeSerialization() { @@ -519,7 +522,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), NoopMetricsRegistry.INSTANCE); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE); final long followerTerm = randomLongBetween(1, leaderTerm - 1); @@ -592,7 +599,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(HEALTHY, "healthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); { // Does not call into the coordinator in the normal case @@ -739,7 +750,11 @@ public void testPreferClusterManagerNodes() { ); final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> { assert false : fcr; - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(HEALTHY, "healthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); followersChecker.setCurrentNodes(discoveryNodes); List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) .map(cr -> cr.node) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index 8029fb3aba7ea..87ba9947f90ed 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -44,8 +44,8 @@ import org.opensearch.core.transport.TransportResponse; import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; -import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.TestInMemoryMetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -80,13 +80,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.matchesRegex; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; public class LeaderCheckerTests extends OpenSearchTestCase { @@ -184,14 +177,13 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderCheckFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -241,8 +233,7 @@ public String toString() { ); } leaderChecker.updateLeader(null); - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verify(leaderCheckFailedCounter, times(1)).add(anyDouble()); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } enum Response { @@ -307,14 +298,13 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderCheckFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); leaderChecker.updateLeader(leader); { @@ -369,8 +359,7 @@ public String toString() { deterministicTaskQueue.runAllRunnableTasks(); assertTrue(leaderFailed.get()); } - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verify(leaderCheckFailedCounter, times(2)).add(anyDouble()); + assertEquals(Integer.valueOf(3), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } public void testFollowerFailsImmediatelyOnHealthCheckFailure() { @@ -427,13 +416,12 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderChecksFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), endsWith("failed health checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); leaderChecker.updateLeader(leader); @@ -453,8 +441,8 @@ public String toString() { assertTrue(leaderFailed.get()); } - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verify(leaderChecksFailedCounter, times(1)).add(anyDouble()); + + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } public void testLeaderBehaviour() { @@ -478,16 +466,14 @@ public void testLeaderBehaviour() { transportService.start(); transportService.acceptIncomingRequests(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderChecksFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE); final LeaderChecker leaderChecker = new LeaderChecker( settings, clusterSettings, transportService, e -> fail("shouldn't be checking anything"), () -> nodeHealthServiceStatus.get(), - metricsRegistry + clusterManagerMetrics ); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() @@ -552,8 +538,6 @@ public void testLeaderBehaviour() { equalTo("rejecting leader check from [" + otherNode + "] sent to a node that is no longer the cluster-manager") ); } - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verifyNoInteractions(leaderChecksFailedCounter); } private class CapturingTransportResponseHandler implements TransportResponseHandler { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 6c44c000205f7..25b7264356c61 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -272,7 +272,7 @@ protected void onSendRequest( nodeHealthService, persistedStateRegistry, Mockito.mock(RemoteStoreNodeService.class), - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 44e037e5a9845..b558d416aedf4 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -33,6 +33,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.node.DiscoveryNode; @@ -130,7 +131,7 @@ private DiscoveryModule newModule(Settings settings, List plugi null, new PersistedStateRegistry(), remoteStoreNodeService, - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9922f9e6fa12b..a15154ad113a8 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -125,6 +125,7 @@ import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.AbstractCoordinatorTestCase; import org.opensearch.cluster.coordination.ClusterBootstrapService; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; import org.opensearch.cluster.coordination.CoordinationState; import org.opensearch.cluster.coordination.Coordinator; @@ -2550,7 +2551,7 @@ public void start(ClusterState initialState) { () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, remoteStoreNodeService, - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java new file mode 100644 index 0000000000000..d9aee5ebfa941 --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This is a simple implementation of Counter which is utilized by TestInMemoryMetricsRegistry for + * Unit Tests. It initializes an atomic integer to add the values of counter which doesn't have any tags + * along with a map to store the values recorded against the tags. + * The map and atomic integer can then be used to get the added values. + */ +public class TestInMemoryCounter implements Counter { + + private AtomicInteger counterValue = new AtomicInteger(0); + private ConcurrentHashMap, Double> counterValueForTags = new ConcurrentHashMap<>(); + + public Integer getCounterValue() { + return this.counterValue.get(); + } + + public ConcurrentHashMap, Double> getCounterValueForTags() { + return this.counterValueForTags; + } + + @Override + public void add(double value) { + counterValue.addAndGet((int) value); + } + + @Override + public synchronized void add(double value, Tags tags) { + HashMap hashMap = (HashMap) tags.getTagsMap(); + if (counterValueForTags.get(hashMap) == null) { + counterValueForTags.put(hashMap, value); + } else { + value = counterValueForTags.get(hashMap) + value; + counterValueForTags.put(hashMap, value); + } + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java new file mode 100644 index 0000000000000..ff28df2b6529d --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This is a simple implementation of Histogram which is utilized by TestInMemoryMetricsRegistry for + * Unit Tests. It initializes an atomic integer to record the value of histogram which doesn't have any tags + * along with a map to store the values recorded against the tags. + * The map and atomic integer can then be used to get the recorded values. + */ +public class TestInMemoryHistogram implements Histogram { + + private AtomicInteger histogramValue = new AtomicInteger(0); + private ConcurrentHashMap, Double> histogramValueForTags = new ConcurrentHashMap<>(); + + public Integer getHistogramValue() { + return this.histogramValue.get(); + } + + public ConcurrentHashMap, Double> getHistogramValueForTags() { + return this.histogramValueForTags; + } + + @Override + public void record(double value) { + histogramValue.addAndGet((int) value); + } + + @Override + public synchronized void record(double value, Tags tags) { + HashMap hashMap = (HashMap) tags.getTagsMap(); + histogramValueForTags.put(hashMap, value); + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java new file mode 100644 index 0000000000000..761efc0fe96aa --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * This is a simple implementation of MetricsRegistry which can be utilized by Unit Tests. + * It just initializes and stores counters/histograms within a map, once created. + * The maps can then be used to get the counters/histograms by their names. + */ +public class TestInMemoryMetricsRegistry implements MetricsRegistry { + + private ConcurrentHashMap counterStore; + private ConcurrentHashMap histogramStore; + + public TestInMemoryMetricsRegistry() { + this.counterStore = new ConcurrentHashMap<>(); + this.histogramStore = new ConcurrentHashMap<>(); + } + + public ConcurrentHashMap getCounterStore() { + return this.counterStore; + } + + public ConcurrentHashMap getHistogramStore() { + return this.histogramStore; + } + + @Override + public Counter createCounter(String name, String description, String unit) { + TestInMemoryCounter counter = new TestInMemoryCounter(); + counterStore.putIfAbsent(name, counter); + return counter; + } + + @Override + public Counter createUpDownCounter(String name, String description, String unit) { + /** + * ToDo: To be implemented when required. + */ + return null; + } + + @Override + public Histogram createHistogram(String name, String description, String unit) { + TestInMemoryHistogram histogram = new TestInMemoryHistogram(); + histogramStore.putIfAbsent(name, histogram); + return histogram; + } + + @Override + public Closeable createGauge(String name, String description, String unit, Supplier valueProvider, Tags tags) { + /** + * ToDo: To be implemented when required. + */ + return null; + } + + @Override + public void close() throws IOException {} +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index fae2cd59791c1..3c383993d24fb 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1182,7 +1182,7 @@ protected Optional getDisruptableMockTransport(Transpo nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( From 616651927a76436b135b04511b6ae5e91561513b Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Wed, 10 Apr 2024 04:13:08 +0530 Subject: [PATCH 4/7] Adding asserts in unit tests for no leader or failure checks Signed-off-by: Harsh Garg --- .../cluster/coordination/ClusterManagerMetrics.java | 4 ++-- .../cluster/coordination/FollowersCheckerTests.java | 9 +++++++-- .../cluster/coordination/LeaderCheckerTests.java | 5 +++-- .../telemetry/TestInMemoryMetricsRegistry.java | 9 ++------- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java index 27a269202f048..5b64787ecb8a1 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java @@ -26,12 +26,12 @@ public final class ClusterManagerMetrics { public final Counter followerChecksFailureCounter; public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { - this.followerChecksFailureCounter = metricsRegistry.createCounter( + followerChecksFailureCounter = metricsRegistry.createCounter( "followers.checker.failure.count", "Counter for number of failed follower checks", COUNTER_METRICS_UNIT ); - this.leaderCheckFailureCounter = metricsRegistry.createCounter( + leaderCheckFailureCounter = metricsRegistry.createCounter( "leader.checker.failure.count", "Counter for number of failed leader checks", COUNTER_METRICS_UNIT diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index 229caa8cf4b52..023ddbbdbe3ad 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -134,6 +134,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req transportService.start(); transportService.acceptIncomingRequests(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final FollowersChecker followersChecker = new FollowersChecker( settings, clusterSettings, @@ -143,7 +145,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req assert false : node; }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(metricsRegistry) ); followersChecker.setCurrentNodes(discoveryNodesHolder[0]); @@ -197,6 +199,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req followersChecker.clearCurrentNodes(); deterministicTaskQueue.runAllTasks(); assertThat(checkedNodes, empty()); + assertEquals(Integer.valueOf(0), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatDoesNotRespond() { @@ -313,6 +316,7 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean nodeFailed = new AtomicBoolean(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); final FollowersChecker followersChecker = new FollowersChecker( settings, @@ -324,7 +328,7 @@ public String toString() { assertThat(reason, equalTo("disconnected")); }, () -> new StatusInfo(HEALTHY, "healthy-info"), - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(metricsRegistry) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -335,6 +339,7 @@ public String toString() { deterministicTaskQueue.runAllRunnableTasks(); assertTrue(nodeFailed.get()); assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatIsUnhealthy() { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index 87ba9947f90ed..2671bad0b8b9e 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -45,7 +45,6 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.StatusInfo; import org.opensearch.telemetry.TestInMemoryMetricsRegistry; -import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -466,7 +465,8 @@ public void testLeaderBehaviour() { transportService.start(); transportService.acceptIncomingRequests(); - final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE); + final TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker( settings, clusterSettings, @@ -538,6 +538,7 @@ public void testLeaderBehaviour() { equalTo("rejecting leader check from [" + otherNode + "] sent to a node that is no longer the cluster-manager") ); } + assertEquals(Integer.valueOf(0), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } private class CapturingTransportResponseHandler implements TransportResponseHandler { diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java index 761efc0fe96aa..6d395085b12ea 100644 --- a/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java @@ -25,13 +25,8 @@ */ public class TestInMemoryMetricsRegistry implements MetricsRegistry { - private ConcurrentHashMap counterStore; - private ConcurrentHashMap histogramStore; - - public TestInMemoryMetricsRegistry() { - this.counterStore = new ConcurrentHashMap<>(); - this.histogramStore = new ConcurrentHashMap<>(); - } + private ConcurrentHashMap counterStore = new ConcurrentHashMap<>(); + private ConcurrentHashMap histogramStore = new ConcurrentHashMap<>(); public ConcurrentHashMap getCounterStore() { return this.counterStore; From dd39aee0ed463db09744bd8341191d716221bd65 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Wed, 22 May 2024 18:19:34 +0530 Subject: [PATCH 5/7] Moving leaderCheckFailureCounter outside of Transport call Signed-off-by: Harsh Garg --- .../cluster/ClusterManagerMetrics.java | 27 ++++++++++ .../coordination/ClusterManagerMetrics.java | 52 ------------------- .../cluster/coordination/Coordinator.java | 1 + .../coordination/FollowersChecker.java | 1 + .../cluster/coordination/LeaderChecker.java | 3 +- .../opensearch/discovery/DiscoveryModule.java | 2 +- .../main/java/org/opensearch/node/Node.java | 2 - .../coordination/FollowersCheckerTests.java | 1 + .../coordination/LeaderCheckerTests.java | 1 + .../discovery/DiscoveryModuleTests.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 1 - 11 files changed, 35 insertions(+), 58 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java diff --git a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java index d48f82a388245..a98349a4af5cd 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java @@ -8,6 +8,7 @@ package org.opensearch.cluster; +import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; @@ -23,6 +24,7 @@ public final class ClusterManagerMetrics { private static final String LATENCY_METRIC_UNIT_MS = "ms"; + private static final String COUNTER_METRICS_UNIT = "1"; public final Histogram clusterStateAppliersHistogram; public final Histogram clusterStateListenersHistogram; @@ -30,6 +32,9 @@ public final class ClusterManagerMetrics { public final Histogram clusterStateComputeHistogram; public final Histogram clusterStatePublishHistogram; + public final Counter leaderCheckFailureCounter; + public final Counter followerChecksFailureCounter; + public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { clusterStateAppliersHistogram = metricsRegistry.createHistogram( "cluster.state.appliers.latency", @@ -56,6 +61,16 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { "Histogram for recording time taken to publish a new cluster state", LATENCY_METRIC_UNIT_MS ); + followerChecksFailureCounter = metricsRegistry.createCounter( + "followers.checker.failure.count", + "Counter for number of failed follower checks", + COUNTER_METRICS_UNIT + ); + leaderCheckFailureCounter = metricsRegistry.createCounter( + "leader.checker.failure.count", + "Counter for number of failed leader checks", + COUNTER_METRICS_UNIT + ); } public void recordLatency(Histogram histogram, Double value) { @@ -69,4 +84,16 @@ public void recordLatency(Histogram histogram, Double value, Optional tags } histogram.record(value, tags.get()); } + + public void incrementCounter(Counter counter, Double value) { + incrementCounter(counter, value, Optional.empty()); + } + + public void incrementCounter(Counter counter, Double value, Optional tags) { + if (Objects.isNull(tags) || tags.isEmpty()) { + counter.add(value); + return; + } + counter.add(value, tags.get()); + } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java deleted file mode 100644 index 5b64787ecb8a1..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.coordination; - -import org.opensearch.telemetry.metrics.Counter; -import org.opensearch.telemetry.metrics.MetricsRegistry; -import org.opensearch.telemetry.metrics.tags.Tags; - -import java.util.Objects; -import java.util.Optional; - -/** - * Class containing metrics (counters/latency) specific to ClusterManager. - */ -public final class ClusterManagerMetrics { - - private static final String COUNTER_METRICS_UNIT = "1"; - - public final Counter leaderCheckFailureCounter; - public final Counter followerChecksFailureCounter; - - public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { - followerChecksFailureCounter = metricsRegistry.createCounter( - "followers.checker.failure.count", - "Counter for number of failed follower checks", - COUNTER_METRICS_UNIT - ); - leaderCheckFailureCounter = metricsRegistry.createCounter( - "leader.checker.failure.count", - "Counter for number of failed leader checks", - COUNTER_METRICS_UNIT - ); - } - - public void incrementCounter(Counter counter, Double value) { - incrementCounter(counter, value, Optional.empty()); - } - - public void incrementCounter(Counter counter, Double value, Optional tags) { - if (Objects.isNull(tags) || tags.isEmpty()) { - counter.add(value); - return; - } - counter.add(value, tags.get()); - } -} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 2165984418af5..f53e6837a67f5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskConfig; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 312c3bc038c80..a53b174ae94b5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 0c12fc1d1ee2c..4fd2c0eb13073 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; @@ -354,11 +355,11 @@ public String executor() { void leaderFailed(Exception e) { if (isClosed.compareAndSet(false, true)) { + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0); transportService.getThreadPool().generic().execute(new Runnable() { @Override public void run() { onLeaderFailure.accept(e); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0); } @Override diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 08e2fbd4dd08a..538dea5b2e60b 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -34,8 +34,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.ElectionStrategy; import org.opensearch.cluster.coordination.PersistedStateRegistry; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 8ce6bd407bc3a..76d751c28303c 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -67,7 +67,6 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; -import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexTemplateMetadata; @@ -670,7 +669,6 @@ protected Node( final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); - final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index 023ddbbdbe3ad..d0bc41b459cc3 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -33,6 +33,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index 2671bad0b8b9e..decdeddfa37a1 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -34,6 +34,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; import org.opensearch.cluster.node.DiscoveryNode; diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index b558d416aedf4..5539b3237c2bf 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -32,8 +32,8 @@ package org.opensearch.discovery; import org.opensearch.Version; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.node.DiscoveryNode; diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index c759efb713101..6399208cf9bf8 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -126,7 +126,6 @@ import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.AbstractCoordinatorTestCase; import org.opensearch.cluster.coordination.ClusterBootstrapService; -import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; import org.opensearch.cluster.coordination.CoordinationState; import org.opensearch.cluster.coordination.Coordinator; From c295e40a0f4c1746c526e470d970e4ea65d6d146 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Wed, 22 May 2024 18:31:15 +0530 Subject: [PATCH 6/7] Added ChangeLog Signed-off-by: Harsh Garg --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4e920f3a202d..102136ca0098b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add leader and follower check failure counter metrics ([#12439](https://github.com/opensearch-project/OpenSearch/pull/12439)) - Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333)) - Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423)) - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) From 13954dca19a28053ddcfab8de679cc9e6499d095 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Thu, 23 May 2024 16:27:31 +0530 Subject: [PATCH 7/7] Moving followerChecksFailureCounter outside of Transport call Signed-off-by: Harsh Garg --- .../org/opensearch/cluster/coordination/FollowersChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index a53b174ae94b5..2ec0dabd91786 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -417,6 +417,7 @@ public String executor() { } void failNode(String reason) { + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0); transportService.getThreadPool().generic().execute(new Runnable() { @Override public void run() { @@ -430,7 +431,6 @@ public void run() { followerCheckers.remove(discoveryNode); } onNodeFailure.accept(discoveryNode, reason); - clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0); } @Override