diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 45bc4c8881..0784eaf047 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -717,6 +717,8 @@ public void onNext(InstallSnapshotReplyProto reply) { LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}", reply.getResult(), getServer().getId(), getFollowerId()); break; + case SNAPSHOT_EXPIRED: + LOG.warn("{}: Follower could not install snapshot as it is expired.", this); default: break; } diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index b2e96e283e..7cf2fd87c1 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -155,6 +155,7 @@ enum InstallSnapshotResult { CONF_MISMATCH = 4; SNAPSHOT_INSTALLED = 5; SNAPSHOT_UNAVAILABLE = 6; + SNAPSHOT_EXPIRED = 7; } message RequestVoteRequestProto { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 8de9a37569..70027e6dda 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -71,6 +71,8 @@ class SnapshotInstallationHandler { private final AtomicBoolean isSnapshotNull = new AtomicBoolean(); private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX); private final AtomicInteger nextChunkIndex = new AtomicInteger(-1); + /** The callId of the chunk with index 0. */ + private final AtomicLong chunk0CallId = new AtomicLong(-1); SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) { this.server = server; @@ -176,8 +178,22 @@ private CompletableFuture checkAndInstallSnapshot(Ins state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); + long callId = chunk0CallId.get(); + // 1. leaderTerm < currentTerm will never come here + // 2. leaderTerm == currentTerm && callId == request.getCallId() + // means the snapshotRequest is staled with the same leader + // 3. leaderTerm > currentTerm means this is a new snapshot request from a new leader, + // chunk0CallId will be reset when a snapshot request with requestIndex == 0 is received . + if (callId > request.getServerRequest().getCallId() && currentTerm == leaderTerm) { + LOG.warn("{}: Snapshot Request Staled: chunk 0 callId is {} but {}", getMemberId(), callId, + ServerStringUtils.toInstallSnapshotRequestString(request)); + InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SNAPSHOT_EXPIRED); + return future.thenApply(dummy -> reply); + } if (snapshotChunkRequest.getRequestIndex() == 0) { nextChunkIndex.set(0); + chunk0CallId.set(request.getServerRequest().getCallId()); } else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) { throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get() + "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex()); @@ -205,6 +221,7 @@ private CompletableFuture checkAndInstallSnapshot(Ins // re-load the state machine if this is the last chunk if (snapshotChunkRequest.getDone()) { state.reloadStateMachine(lastIncluded); + chunk0CallId.set(-1); } } finally { server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index 8ec6c19db1..8c1675c7c3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -155,6 +155,7 @@ public void run() throws InterruptedException, IOException { case SUCCESS: case SNAPSHOT_UNAVAILABLE: case ALREADY_INSTALLED: + case SNAPSHOT_EXPIRED: getFollower().setAttemptedToInstallSnapshot(); break; default: