Skip to content

Commit

Permalink
Update last seen cluster state on apply commit
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Oct 9, 2024
1 parent e7757e7 commit 8ffcaed
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -383,14 +384,19 @@ void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
}
}

private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) {
private void handleApplyCommit(
ApplyCommitRequest applyCommitRequest,
Consumer<ClusterState> updateLastSeen,
ActionListener<Void> applyListener
) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);

coordinationState.get().handleCommit(applyCommitRequest);
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(committedState) : committedState;
clusterApplier.setPreCommitState(applierState);
updateLastSeen.accept(coordinationState.get().getLastAcceptedState());

if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// cluster-manager node applies the committed state at the end of the publication process, not here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.TriConsumer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand All @@ -65,7 +66,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -110,7 +110,7 @@ public PublicationTransportHandler(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
TriConsumer<ApplyCommitRequest, Consumer<ClusterState>, ActionListener<Void>> handleApplyCommit,
RemoteClusterStateService remoteClusterStateService
) {
this.transportService = transportService;
Expand Down Expand Up @@ -142,7 +142,7 @@ public PublicationTransportHandler(
false,
false,
ApplyCommitRequest::new,
(request, channel, task) -> handleApplyCommit.accept(request, transportCommitCallback(channel))
(request, channel, task) -> handleApplyCommit.apply(request, this::updateLastSeen, transportCommitCallback(channel))
);
}

Expand Down Expand Up @@ -377,6 +377,10 @@ private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes dis
return true;
}

private void updateLastSeen(final ClusterState clusterState) {
lastSeenClusterState.set(clusterState);
}

// package private for testing
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
this.currentPublishRequestToSelf.set(publishRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ private PublicationTransportHandler getPublicationTransportHandler(
transportService,
writableRegistry(),
handlePublishRequest,
(pu, l) -> {},
(pu, uc, l) -> {},
remoteClusterStateService
);
transportService.start();
Expand Down

0 comments on commit 8ffcaed

Please sign in to comment.