Skip to content

Commit

Permalink
Fix: Do not report snapshot.last_log_id to metrics until snapshot is …
Browse files Browse the repository at this point in the history
…finished building/installing

Before this commit `RaftMetrics.snapshot` contains the last log id of a
snapshot that is **going** to install. Therefore there is chance the
metrics is updated but the store does not.

In this commit, `RaftMetrics.snapshot` will only be updated when a
snapshot is finished building or installing, by adding a new field
`snpashot` to `IOState` for tracking persisted snapshot meta data.

- Fix: databendlabs#912
  • Loading branch information
drmingdrmer committed Nov 28, 2023
1 parent 63e69b9 commit c7725c7
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 6 deletions.
14 changes: 12 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ where
vote: *st.io_state().vote(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().copied(),
snapshot: st.snapshot_meta.last_log_id,
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),

// --- cluster ---
Expand Down Expand Up @@ -1301,7 +1301,15 @@ where
meta.summary(),
func_name!()
);

// Update in-memory state first, then the io state.
// In-memory state should always be ahead or equal to the io state.

let last_log_id = meta.last_log_id;
self.engine.finish_building_snapshot(meta);

let st = self.engine.state.io_state_mut();
st.update_snapshot(last_log_id);
}
sm::Response::ReceiveSnapshotChunk(_) => {
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());
Expand All @@ -1314,7 +1322,9 @@ where
);

if let Some(meta) = meta {
self.engine.state.io_state_mut().update_applied(meta.last_log_id);
let st = self.engine.state.io_state_mut();
st.update_applied(meta.last_log_id);
st.update_snapshot(meta.last_log_id);
}
}
sm::Response::Apply(res) => {
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ where
let cmd_res = CommandResult::new(seq, res);
let _ = resp_tx.send(Notify::sm(cmd_res));
});
tracing::info!("{} returning; spawned building snapshot task", func_name!());
}

#[tracing::instrument(level = "info", skip_all)]
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/progress/entry/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl LogStateReader<u64> for LogState {
todo!()
}

fn io_snapshot_last_log_id(&self) -> Option<&LogId<u64>> {
todo!()
}

fn io_purged(&self) -> Option<&LogId<u64>> {
todo!()
}
Expand Down
35 changes: 34 additions & 1 deletion openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@ pub(crate) struct LogIOId<NID: NodeId> {
///
/// These states are updated only when the io complete and thus may fall behind to the state stored
/// in [`RaftState`](`crate::RaftState`),.
///
/// The log ids that are tracked includes:
///
/// ```text
/// | log ids
/// | *------------+---------+---------+---------+------------------>
/// | | | | `---> flushed
/// | | | `-------------> applied
/// | | `-----------------------> snapshot
/// | `---------------------------------> purged
/// ```
#[derive(Debug, Clone, Copy)]
#[derive(Default)]
#[derive(PartialEq, Eq)]
pub(crate) struct IOState<NID: NodeId> {
/// Whether it is building a snapshot
building_snapshot: bool,

// The last flushed vote.
/// The last flushed vote.
pub(crate) vote: Vote<NID>,

/// The last log id that has been flushed to storage.
Expand All @@ -33,6 +44,9 @@ pub(crate) struct IOState<NID: NodeId> {
/// The last log id that has been applied to state machine.
pub(crate) applied: Option<LogId<NID>>,

/// The last log id in the currently persisted snapshot.
pub(crate) snapshot: Option<LogId<NID>>,

/// The last log id that has been purged from storage.
///
/// `RaftState::last_purged_log_id()`
Expand All @@ -46,13 +60,15 @@ impl<NID: NodeId> IOState<NID> {
vote: Vote<NID>,
flushed: LogIOId<NID>,
applied: Option<LogId<NID>>,
snapshot: Option<LogId<NID>>,
purged: Option<LogId<NID>>,
) -> Self {
Self {
building_snapshot: false,
vote,
flushed,
applied,
snapshot,
purged,
}
}
Expand Down Expand Up @@ -83,6 +99,23 @@ impl<NID: NodeId> IOState<NID> {
self.applied.as_ref()
}

pub(crate) fn update_snapshot(&mut self, log_id: Option<LogId<NID>>) {
tracing::debug!(snapshot = display(DisplayOption(&log_id)), "{}", func_name!());

debug_assert!(
log_id > self.snapshot,
"snapshot log id should be monotonically increasing: current: {:?}, update: {:?}",
self.snapshot,
log_id
);

self.snapshot = log_id;
}

pub(crate) fn snapshot(&self) -> Option<&LogId<NID>> {
self.snapshot.as_ref()
}

pub(crate) fn set_building_snapshot(&mut self, building: bool) {
self.building_snapshot = building;
}
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/raft_state/log_state_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ pub(crate) trait LogStateReader<NID: NodeId> {
/// This is actually happened io-state which might fall behind committed log id.
fn io_applied(&self) -> Option<&LogId<NID>>;

/// The last log id in the last persisted snapshot.
///
/// This is actually happened io-state which might fall behind `Self::snapshot_last_log_id()`.
fn io_snapshot_last_log_id(&self) -> Option<&LogId<NID>>;

/// The last known purged log id, inclusive.
///
/// This is actually purged log id from storage.
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ where
self.io_state.applied()
}

fn io_snapshot_last_log_id(&self) -> Option<&LogId<NID>> {
self.io_state.snapshot()
}

fn io_purged(&self) -> Option<&LogId<NID>> {
self.io_state.purged()
}
Expand Down
12 changes: 9 additions & 3 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ where

let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;

// TODO: `flushed` is not set.
let io_state = IOState::new(vote, LogIOId::default(), last_applied, last_purged_log_id);

let snapshot = self.state_machine.get_current_snapshot().await?;

// If there is not a snapshot and there are logs purged, which means the snapshot is not persisted,
Expand All @@ -97,6 +94,15 @@ where
};
let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default();

// TODO: `flushed` is not set.
let io_state = IOState::new(
vote,
LogIOId::default(),
last_applied,
snapshot_meta.last_log_id,
last_purged_log_id,
);

let now = Instant::now();

Ok(RaftState {
Expand Down

0 comments on commit c7725c7

Please sign in to comment.