diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 02d5a39d3..e5488cda3 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -512,7 +512,7 @@ where vote: *st.io_state().vote(), last_log_index: st.last_log_id().index(), last_applied: st.io_applied().copied(), - snapshot: st.snapshot_meta.last_log_id, + snapshot: st.io_snapshot_last_log_id().copied(), purged: st.io_purged().copied(), // --- cluster --- @@ -1301,7 +1301,15 @@ where meta.summary(), func_name!() ); + + // Update in-memory state first, then the io state. + // In-memory state should always be ahead or equal to the io state. + + let last_log_id = meta.last_log_id; self.engine.finish_building_snapshot(meta); + + let st = self.engine.state.io_state_mut(); + st.update_snapshot(last_log_id); } sm::Response::ReceiveSnapshotChunk(_) => { tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!()); @@ -1314,7 +1322,9 @@ where ); if let Some(meta) = meta { - self.engine.state.io_state_mut().update_applied(meta.last_log_id); + let st = self.engine.state.io_state_mut(); + st.update_applied(meta.last_log_id); + st.update_snapshot(meta.last_log_id); } } sm::Response::Apply(res) => { diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index e1db564f2..5f4f8dc28 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -227,6 +227,7 @@ where let cmd_res = CommandResult::new(seq, res); let _ = resp_tx.send(Notify::sm(cmd_res)); }); + tracing::info!("{} returning; spawned building snapshot task", func_name!()); } #[tracing::instrument(level = "info", skip_all)] diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index 136ba22b2..af8186e81 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -121,6 +121,10 @@ impl LogStateReader for LogState { todo!() } + fn io_snapshot_last_log_id(&self) -> Option<&LogId> { + todo!() + } + fn io_purged(&self) -> Option<&LogId> { todo!() } diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index b322a37e5..6e21318aa 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -17,6 +17,17 @@ pub(crate) struct LogIOId { /// /// These states are updated only when the io complete and thus may fall behind to the state stored /// in [`RaftState`](`crate::RaftState`),. +/// +/// The log ids that are tracked includes: +/// +/// ```text +/// | log ids +/// | *------------+---------+---------+---------+------------------> +/// | | | | `---> flushed +/// | | | `-------------> applied +/// | | `-----------------------> snapshot +/// | `---------------------------------> purged +/// ``` #[derive(Debug, Clone, Copy)] #[derive(Default)] #[derive(PartialEq, Eq)] @@ -24,7 +35,7 @@ pub(crate) struct IOState { /// Whether it is building a snapshot building_snapshot: bool, - // The last flushed vote. + /// The last flushed vote. pub(crate) vote: Vote, /// The last log id that has been flushed to storage. @@ -33,6 +44,9 @@ pub(crate) struct IOState { /// The last log id that has been applied to state machine. pub(crate) applied: Option>, + /// The last log id in the currently persisted snapshot. + pub(crate) snapshot: Option>, + /// The last log id that has been purged from storage. /// /// `RaftState::last_purged_log_id()` @@ -46,6 +60,7 @@ impl IOState { vote: Vote, flushed: LogIOId, applied: Option>, + snapshot: Option>, purged: Option>, ) -> Self { Self { @@ -53,6 +68,7 @@ impl IOState { vote, flushed, applied, + snapshot, purged, } } @@ -83,6 +99,23 @@ impl IOState { self.applied.as_ref() } + pub(crate) fn update_snapshot(&mut self, log_id: Option>) { + tracing::debug!(snapshot = display(DisplayOption(&log_id)), "{}", func_name!()); + + debug_assert!( + log_id > self.snapshot, + "snapshot log id should be monotonically increasing: current: {:?}, update: {:?}", + self.snapshot, + log_id + ); + + self.snapshot = log_id; + } + + pub(crate) fn snapshot(&self) -> Option<&LogId> { + self.snapshot.as_ref() + } + pub(crate) fn set_building_snapshot(&mut self, building: bool) { self.building_snapshot = building; } diff --git a/openraft/src/raft_state/log_state_reader.rs b/openraft/src/raft_state/log_state_reader.rs index bfd489c8d..4f8814121 100644 --- a/openraft/src/raft_state/log_state_reader.rs +++ b/openraft/src/raft_state/log_state_reader.rs @@ -54,6 +54,11 @@ pub(crate) trait LogStateReader { /// This is actually happened io-state which might fall behind committed log id. fn io_applied(&self) -> Option<&LogId>; + /// The last log id in the last persisted snapshot. + /// + /// This is actually happened io-state which might fall behind `Self::snapshot_last_log_id()`. + fn io_snapshot_last_log_id(&self) -> Option<&LogId>; + /// The last known purged log id, inclusive. /// /// This is actually purged log id from storage. diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 7ba493246..6c48fbf0f 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -115,6 +115,10 @@ where self.io_state.applied() } + fn io_snapshot_last_log_id(&self) -> Option<&LogId> { + self.io_state.snapshot() + } + fn io_purged(&self) -> Option<&LogId> { self.io_state.purged() } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 0643b8647..5236fb688 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -76,9 +76,6 @@ where let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?; - // TODO: `flushed` is not set. - let io_state = IOState::new(vote, LogIOId::default(), last_applied, last_purged_log_id); - let snapshot = self.state_machine.get_current_snapshot().await?; // If there is not a snapshot and there are logs purged, which means the snapshot is not persisted, @@ -97,6 +94,15 @@ where }; let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default(); + // TODO: `flushed` is not set. + let io_state = IOState::new( + vote, + LogIOId::default(), + last_applied, + snapshot_meta.last_log_id, + last_purged_log_id, + ); + let now = Instant::now(); Ok(RaftState {