From 158a7e8d6ea193f0fc8ef23ecfa47bd3878624ef Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:21:12 +0530 Subject: [PATCH] Update last seen cluster state in commit phase (#16215) * Update last seen cluster state on apply commit Signed-off-by: Sooraj Sinha --- CHANGELOG.md | 1 + .../opensearch/cluster/coordination/Coordinator.java | 8 +++++++- .../coordination/PublicationTransportHandler.java | 10 +++++++--- .../coordination/PublicationTransportHandlerTests.java | 2 +- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e60f28fe1ee99..3090d0b702f6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Enable coordinator search.request_stats_enabled by default ([#16290](https://github.com/opensearch-project/OpenSearch/pull/16290)) - Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296)) - Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273)) +- Update last seen cluster state in the commit phase ([#16215](https://github.com/opensearch-project/OpenSearch/pull/16215)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 2dfd7d08c0bb2..e4cbe80caab65 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -106,6 +106,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -384,7 +385,11 @@ void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) { } } - private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener applyListener) { + private void handleApplyCommit( + ApplyCommitRequest applyCommitRequest, + Consumer updateLastSeen, + ActionListener applyListener + ) { synchronized (mutex) { logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest); @@ -392,6 +397,7 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState()); applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(committedState) : committedState; clusterApplier.setPreCommitState(applierState); + updateLastSeen.accept(coordinationState.get().getLastAcceptedState()); if (applyCommitRequest.getSourceNode().equals(getLocalNode())) { // cluster-manager node applies the committed state at the end of the publication process, not here. diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 42aa55433dd5f..d30efde52bffb 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.TriConsumer; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -65,7 +66,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -110,7 +110,7 @@ public PublicationTransportHandler( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, Function handlePublishRequest, - BiConsumer> handleApplyCommit, + TriConsumer, ActionListener> handleApplyCommit, RemoteClusterStateService remoteClusterStateService ) { this.transportService = transportService; @@ -142,7 +142,7 @@ public PublicationTransportHandler( false, false, ApplyCommitRequest::new, - (request, channel, task) -> handleApplyCommit.accept(request, transportCommitCallback(channel)) + (request, channel, task) -> handleApplyCommit.apply(request, this::updateLastSeen, transportCommitCallback(channel)) ); } @@ -377,6 +377,10 @@ private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes dis return true; } + private void updateLastSeen(final ClusterState clusterState) { + lastSeenClusterState.set(clusterState); + } + // package private for testing void setCurrentPublishRequestToSelf(PublishRequest publishRequest) { this.currentPublishRequestToSelf.set(publishRequest); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 266928c919fe2..616559e91536d 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -466,7 +466,7 @@ private PublicationTransportHandler getPublicationTransportHandler( transportService, writableRegistry(), handlePublishRequest, - (pu, l) -> {}, + (pu, uc, l) -> {}, remoteClusterStateService ); transportService.start();