Skip to content

Commit

Permalink
Adding support for dynamically updating Leader/follower checker timeo…
Browse files Browse the repository at this point in the history
…uts (#10528) (#11166)

* making leader check timeout dynamic



* making follower check timeout dynamic



* fixing existing unit tests



* fixing checkstyle violations



* adding tests for leader/follower check timeout



* setting maximum and minimum timeout value for leader/follower checker



* adding tests for checking boundary cases



* Fixing checkstyle violations



* changed the log file and added other suggested changes



* fixing checkstyle violations



* Addressing review comments



* addressing proposed changes



* Applying checkstyle fixes



* Fixing flakiness for existing tests



* Applying checkstyle fixes



* Fixing the timeout value limits for randomSettings



---------


(cherry picked from commit 0452d14)

Signed-off-by: Niyati Aggarwal <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent c2b5587 commit 879b9af
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- [BUG] Disable sort optimization for HALF_FLOAT ([#10999](https://github.com/opensearch-project/OpenSearch/pull/10999))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Use iterative approach to evaluate Regex.simpleMatch ([#11060](https://github.com/opensearch-project/OpenSearch/pull/11060))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ public Coordinator(
this::handlePublishRequest,
this::handleApplyCommit
);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
this::onFollowerCheckRequest,
this::removeNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -98,7 +99,9 @@ public class FollowersChecker {
"cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
TimeValue.timeValueMillis(60000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the number of failed checks that must happen before the follower is considered to have failed.
Expand All @@ -112,7 +115,7 @@ public class FollowersChecker {
private final Settings settings;

private final TimeValue followerCheckInterval;
private final TimeValue followerCheckTimeout;
private TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;
Expand All @@ -127,6 +130,7 @@ public class FollowersChecker {

public FollowersChecker(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
Expand All @@ -141,7 +145,7 @@ public FollowersChecker(
followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout);
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(
FOLLOWER_CHECK_ACTION_NAME,
Expand All @@ -159,6 +163,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
this.followerCheckTimeout = followerCheckTimeout;
}

/**
* Update the set of known nodes, starting to check any new ones and stopping checking any previously-known-but-now-unknown ones.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -97,7 +98,9 @@ public class LeaderChecker {
"cluster.fault_detection.leader_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
TimeValue.timeValueMillis(60000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the number of failed checks that must happen before the leader is considered to have failed.
Expand All @@ -111,7 +114,7 @@ public class LeaderChecker {
private final Settings settings;

private final TimeValue leaderCheckInterval;
private final TimeValue leaderCheckTimeout;
private TimeValue leaderCheckTimeout;
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
Expand All @@ -123,6 +126,7 @@ public class LeaderChecker {

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService
Expand All @@ -134,6 +138,7 @@ public class LeaderChecker {
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);

transportService.registerRequestHandler(
LEADER_CHECK_ACTION_NAME,
Expand All @@ -155,6 +160,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) {
this.leaderCheckTimeout = leaderCheckTimeout;
}

public DiscoveryNode leader() {
CheckScheduler checkScheduler = currentChecker.get();
return checkScheduler == null ? null : checkScheduler.leader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static void sendErrorResponse(TransportChannel channel, String actionName, Trans

/**
* Returns the contextual property associated with this specific transport channel (the
* implementation of how such properties are managed depends on the the particular
* implementation of how such properties are managed depends on the particular
* transport engine).
*
* @param name the name of the property
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueSeconds;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class CoordinationCheckerSettingsTests extends OpenSearchSingleNodeTestCase {
public void testFollowerCheckTimeoutValueUpdate() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();

assertAcked(response);
assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testFollowerCheckTimeoutMaxValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build();

assertThrows(
"failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testFollowerCheckTimeoutMinValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLeaderCheckTimeoutValueUpdate() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();
assertAcked(response);
assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testLeaderCheckTimeoutMaxValue() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build();

assertThrows(
"failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLeaderCheckTimeoutMinValue() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Settings.Builder;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -96,7 +97,7 @@ public class FollowersCheckerTests extends OpenSearchTestCase {
public void testChecksExpectedNodes() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();

final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DiscoveryNodes[] discoveryNodesHolder = new DiscoveryNodes[] {
DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build() };

Expand Down Expand Up @@ -132,6 +133,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req

final FollowersChecker followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
Expand Down Expand Up @@ -257,6 +259,7 @@ public void testFailsNodeThatDisconnects() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand Down Expand Up @@ -297,6 +300,7 @@ public String toString() {

final FollowersChecker followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
Expand Down Expand Up @@ -336,6 +340,7 @@ private void testBehaviourOfFailingNode(
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand Down Expand Up @@ -384,6 +389,7 @@ public String toString() {

final FollowersChecker followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
Expand Down Expand Up @@ -464,6 +470,7 @@ public void testUnhealthyNodeRejectsImmediately() {
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand All @@ -488,7 +495,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
final AtomicBoolean calledCoordinator = new AtomicBoolean();
final AtomicReference<RuntimeException> coordinatorException = new AtomicReference<>();

final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
final FollowersChecker followersChecker = new FollowersChecker(settings, clusterSettings, transportService, fcr -> {
assertTrue(calledCoordinator.compareAndSet(false, true));
final RuntimeException exception = coordinatorException.get();
if (exception != null) {
Expand Down Expand Up @@ -536,6 +543,7 @@ public void testResponder() {
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand All @@ -560,7 +568,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
final AtomicBoolean calledCoordinator = new AtomicBoolean();
final AtomicReference<RuntimeException> coordinatorException = new AtomicReference<>();

final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
final FollowersChecker followersChecker = new FollowersChecker(settings, clusterSettings, transportService, fcr -> {
assertTrue(calledCoordinator.compareAndSet(false, true));
final RuntimeException exception = coordinatorException.get();
if (exception != null) {
Expand Down Expand Up @@ -700,6 +708,7 @@ public void testPreferClusterManagerNodes() {
DiscoveryNodes discoveryNodes = discoNodesBuilder.localNodeId(nodes.get(0).getId()).build();
CapturingTransport capturingTransport = new CapturingTransport();
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), nodes.get(0).getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
TransportService transportService = capturingTransport.createTransportService(
Settings.EMPTY,
Expand All @@ -710,15 +719,9 @@ public void testPreferClusterManagerNodes() {
emptySet(),
NoopTracer.INSTANCE
);
final FollowersChecker followersChecker = new FollowersChecker(
Settings.EMPTY,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
assert false : node;
},
() -> new StatusInfo(HEALTHY, "healthy-info")
);
final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> {
assert false : fcr;
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"));
followersChecker.setCurrentNodes(discoveryNodes);
List<DiscoveryNode> followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear())
.map(cr -> cr.node)
Expand Down Expand Up @@ -754,7 +757,7 @@ private static Settings randomSettings() {
settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms");
}
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 100000) + "ms");
settingsBuilder.put(FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 60000) + "ms");
}
return settingsBuilder.build();
}
Expand Down
Loading

0 comments on commit 879b9af

Please sign in to comment.