Skip to content

Commit

Permalink
RATIS-2183. Detect staled snapshot request. (#1173)
Browse files Browse the repository at this point in the history
  • Loading branch information
133tosakarin authored Nov 1, 2024
1 parent 237b7c0 commit e75a0d5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,8 @@ public void onNext(InstallSnapshotReplyProto reply) {
LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}",
reply.getResult(), getServer().getId(), getFollowerId());
break;
case SNAPSHOT_EXPIRED:
LOG.warn("{}: Follower could not install snapshot as it is expired.", this);
default:
break;
}
Expand Down
1 change: 1 addition & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ enum InstallSnapshotResult {
CONF_MISMATCH = 4;
SNAPSHOT_INSTALLED = 5;
SNAPSHOT_UNAVAILABLE = 6;
SNAPSHOT_EXPIRED = 7;
}

message RequestVoteRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class SnapshotInstallationHandler {
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
private final AtomicInteger nextChunkIndex = new AtomicInteger(-1);
/** The callId of the chunk with index 0. */
private final AtomicLong chunk0CallId = new AtomicLong(-1);

SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) {
this.server = server;
Expand Down Expand Up @@ -176,8 +178,22 @@ private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(Ins
state.setLeader(leaderId, "installSnapshot");

server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
long callId = chunk0CallId.get();
// 1. leaderTerm < currentTerm will never come here
// 2. leaderTerm == currentTerm && callId == request.getCallId()
// means the snapshotRequest is staled with the same leader
// 3. leaderTerm > currentTerm means this is a new snapshot request from a new leader,
// chunk0CallId will be reset when a snapshot request with requestIndex == 0 is received .
if (callId > request.getServerRequest().getCallId() && currentTerm == leaderTerm) {
LOG.warn("{}: Snapshot Request Staled: chunk 0 callId is {} but {}", getMemberId(), callId,
ServerStringUtils.toInstallSnapshotRequestString(request));
InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SNAPSHOT_EXPIRED);
return future.thenApply(dummy -> reply);
}
if (snapshotChunkRequest.getRequestIndex() == 0) {
nextChunkIndex.set(0);
chunk0CallId.set(request.getServerRequest().getCallId());
} else if (nextChunkIndex.get() != snapshotChunkRequest.getRequestIndex()) {
throw new IOException("Snapshot request already failed at chunk index " + nextChunkIndex.get()
+ "; ignoring request with chunk index " + snapshotChunkRequest.getRequestIndex());
Expand Down Expand Up @@ -205,6 +221,7 @@ private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(Ins
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
state.reloadStateMachine(lastIncluded);
chunk0CallId.set(-1);
}
} finally {
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void run() throws InterruptedException, IOException {
case SUCCESS:
case SNAPSHOT_UNAVAILABLE:
case ALREADY_INSTALLED:
case SNAPSHOT_EXPIRED:
getFollower().setAttemptedToInstallSnapshot();
break;
default:
Expand Down

0 comments on commit e75a0d5

Please sign in to comment.