Skip to content

Commit

Permalink
Election scheduler should be cancelled after cluster state publication (
Browse files Browse the repository at this point in the history
opensearch-project#11699)

Signed-off-by: Sooraj Sinha <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
soosinha authored and shiv0408 committed Apr 25, 2024
1 parent e64e852 commit 6e5764f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))
- Mark fuzzy filter GA and remove experimental setting ([12631](https://github.com/opensearch-project/OpenSearch/pull/12631))
- Keep the election scheduler open until cluster state has been applied ([#11699](https://github.com/opensearch-project/OpenSearch/pull/11699))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ public void onFailure(String source, Exception e) {

@Override
public void onSuccess(String source) {
closePrevotingAndElectionScheduler();
applyListener.onResponse(null);
}
});
Expand Down Expand Up @@ -472,17 +473,29 @@ private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, Disco
}

private void closePrevotingAndElectionScheduler() {
closePrevoting();
closeElectionScheduler();
}

private void closePrevoting() {
if (prevotingRound != null) {
prevotingRound.close();
prevotingRound = null;
}
}

private void closeElectionScheduler() {
if (electionScheduler != null) {
electionScheduler.close();
electionScheduler = null;
}
}

// package-visible for testing
boolean isElectionSchedulerRunning() {
return electionScheduler != null;
}

private void updateMaxTermSeen(final long term) {
synchronized (mutex) {
maxTermSeen = Math.max(maxTermSeen, term);
Expand Down Expand Up @@ -724,7 +737,7 @@ void becomeLeader(String method) {
lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
closePrevoting();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

assert leaderChecker.leader() == null : leaderChecker.leader();
Expand Down Expand Up @@ -761,7 +774,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
closePrevoting();
cancelActivePublication("become follower: " + method);
preVoteCollector.update(getPreVoteResponse(), leaderNode);

Expand Down Expand Up @@ -927,7 +940,6 @@ public void invariant() {
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert becomingClusterManager || getStateForClusterManagerService().nodes().getClusterManagerNodeId() != null
: getStateForClusterManagerService();
Expand Down Expand Up @@ -972,7 +984,6 @@ assert getLocalNode().equals(applierState.nodes().getClusterManagerNode())
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert getStateForClusterManagerService().nodes().getClusterManagerNodeId() == null : getStateForClusterManagerService();
assert leaderChecker.currentNodeIsClusterManager() == false;
Expand Down Expand Up @@ -1693,6 +1704,7 @@ public void onSuccess(String source) {
updateMaxTermSeen(getCurrentTerm());

if (mode == Mode.LEADER) {
closePrevotingAndElectionScheduler();
// if necessary, abdicate to another node or improve the voting configuration
boolean attemptReconfiguration = true;
final ClusterState state = getLastAcceptedState(); // committed state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void testNodesJoinAfterStableCluster() {
public void testExpandsConfigurationWhenGrowingFromOneNodeToThreeButDoesNotShrink() {
try (Cluster cluster = new Cluster(1)) {
cluster.runRandomly();
cluster.stabilise();
cluster.stabilise(DEFAULT_STABILISATION_TIME * 2);

final ClusterNode leader = cluster.getAnyLeader();

Expand Down Expand Up @@ -1750,7 +1750,7 @@ public void testDoesNotPerformElectionWhenRestartingFollower() {
public void testImproveConfigurationPerformsVotingConfigExclusionStateCheck() {
try (Cluster cluster = new Cluster(1)) {
cluster.runRandomly();
cluster.stabilise();
cluster.stabilise(DEFAULT_STABILISATION_TIME * 2);

final Coordinator coordinator = cluster.getAnyLeader().coordinator;
final ClusterState currentState = coordinator.getLastAcceptedState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.ClusterNode;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.coordination.LinearizabilityChecker.History;
import org.opensearch.cluster.coordination.LinearizabilityChecker.SequentialSpec;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
Expand Down Expand Up @@ -653,6 +654,12 @@ void stabilise(long stabilisationDurationMillis) {
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)
);
}
if (clusterNode.coordinator.getMode() == Mode.LEADER || clusterNode.coordinator.getMode() == Mode.FOLLOWER) {
assertFalse(
"Election scheduler should stop after cluster has stabilised",
clusterNode.coordinator.isElectionSchedulerRunning()
);
}
}

final Set<String> connectedNodeIds = clusterNodes.stream()
Expand Down

0 comments on commit 6e5764f

Please sign in to comment.