Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds counter metrics for leader and follower check failures #12439

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Objects;
import java.util.Optional;

/**
* Class containing metrics (counters/latency) specific to ClusterManager.
*/
public final class ClusterManagerMetrics {

private static final String COUNTER_METRICS_UNIT = "1";

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
this.followerChecksFailureCounter = metricsRegistry.createCounter(
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
"followers.checker.failure.count",
"Counter for number of failed follower checks",
COUNTER_METRICS_UNIT
);
this.leaderCheckFailureCounter = metricsRegistry.createCounter(
"leader.checker.failure.count",
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
}

public void incrementCounter(Counter counter, Double value) {
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
incrementCounter(counter, value, Optional.empty());
}

public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
counter.add(value);
return;
}
counter.add(value, tags.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -261,14 +262,22 @@ public Coordinator(
this::handlePublishRequest,
this::handleApplyCommit
);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.leaderChecker = new LeaderChecker(
settings,
clusterSettings,
transportService,
this::onLeaderFailure,
nodeHealthService,
clusterManagerMetrics
);
this.followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
this::onFollowerCheckRequest,
this::removeNode,
nodeHealthService
nodeHealthService,
clusterManagerMetrics
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,16 @@ public class FollowersChecker {
private final TransportService transportService;
private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;
private ClusterManagerMetrics clusterManagerMetrics;

public FollowersChecker(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -161,6 +163,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
Expand Down Expand Up @@ -426,6 +429,7 @@ public void run() {
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode, reason);
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ public class LeaderChecker {
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;
private final ClusterManagerMetrics clusterManagerMetrics;

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
final ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
Expand All @@ -138,6 +138,7 @@ public class LeaderChecker {
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
this.clusterManagerMetrics = clusterManagerMetrics;
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);

transportService.registerRequestHandler(
Expand Down Expand Up @@ -293,7 +294,6 @@ public void handleResponse(Empty response) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

failureCountSinceLastSuccess.set(0);
scheduleNextWakeUp(); // logs trace message indicating success
}
Expand All @@ -304,7 +304,6 @@ public void handleException(TransportException exp) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
Expand Down Expand Up @@ -359,6 +358,7 @@ void leaderFailed(Exception e) {
@Override
public void run() {
onLeaderFailure.accept(e);
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0);
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.ElectionStrategy;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
Expand Down Expand Up @@ -133,7 +134,8 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -211,7 +213,8 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
clusterManagerMetrics
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.coordination.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.metadata.AliasValidator;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
Expand Down Expand Up @@ -644,6 +645,7 @@ protected Node(

final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry);

ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
Expand Down Expand Up @@ -1129,7 +1131,8 @@ protected Node(
rerouteService,
fsHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
clusterManagerMetrics
);
final SearchPipelineService searchPipelineService = new SearchPipelineService(
clusterService,
Expand Down
Loading
Loading