diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 51866774f..c4818da9a 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -28,8 +28,10 @@ use utils::{ use super::{ cmd_board::{CmdBoardRef, CommandBoard}, cmd_worker::{conflict_checked_mpmc, start_cmd_workers}, - conflict::spec_pool_new::{SpObject, SpeculativePool}, - conflict::uncommitted_pool::{UcpObject, UncommittedPool}, + conflict::{ + spec_pool_new::{SpObject, SpeculativePool}, + uncommitted_pool::{UcpObject, UncommittedPool}, + }, gc::gc_cmd_board, lease_manager::LeaseManager, raw_curp::{AppendEntries, RawCurp, Vote}, diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index b6fca3a99..8ee55a599 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -10,8 +10,10 @@ use utils::ClientTlsConfig; use utils::{config::CurpConfig, task_manager::TaskManager, tracing::Extract}; use self::curp_node::CurpNode; -pub use self::raw_curp::RawCurp; -pub use self::{conflict::spec_pool_new::SpObject, conflict::uncommitted_pool::UcpObject}; +pub use self::{ + conflict::{spec_pool_new::SpObject, uncommitted_pool::UcpObject}, + raw_curp::RawCurp, +}; use crate::{ cmd::{Command, CommandExecutor}, members::{ClusterInfo, ServerId}, diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 4aee089d3..f7996649f 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -1,18 +1,18 @@ #![allow(clippy::arithmetic_side_effects)] // u64 is large enough and won't overflow use std::{ + cmp::{min, Ordering}, collections::{HashMap, HashSet, VecDeque}, fmt::Debug, - ops::Range, + ops::{Bound, Range, RangeBounds, RangeInclusive}, sync::Arc, vec, }; -use bincode::serialized_size; -use clippy_utilities::{NumericCast, OverflowArithmetic}; +use clippy_utilities::NumericCast; use itertools::Itertools; use tokio::sync::mpsc; -use tracing::error; +use tracing::{error, warn}; use crate::{ cmd::Command, @@ -23,13 +23,86 @@ use crate::{ LogIndex, }; +/// A `LogEntry` wrapper +#[derive(Debug, Clone)] +struct Entry { + /// The inner `LogEntry` + inner: Arc>, + /// The serialized size of the inner `LogEntry` + size: u64, +} + +/// Enum representing a range of values in the log. +#[derive(Debug, Eq, PartialEq)] +enum LogRange { + /// Represents a range using the `Range` type. + Range(Range), + /// Represents a range using the `RangeInclusive` type. + RangeInclusive(RangeInclusive), +} + +impl RangeBounds for LogRange { + fn start_bound(&self) -> Bound<&T> { + match *self { + Self::Range(ref range) => range.start_bound(), + Self::RangeInclusive(ref range) => range.start_bound(), + } + } + + fn end_bound(&self) -> Bound<&T> { + match *self { + Self::Range(ref range) => range.end_bound(), + Self::RangeInclusive(ref range) => range.end_bound(), + } + } +} + +impl From> for LogRange { + fn from(range: Range) -> Self { + Self::Range(range) + } +} + +impl From> for LogRange { + fn from(range: RangeInclusive) -> Self { + Self::RangeInclusive(range) + } +} + /// Curp logs /// There exists a fake log entry 0 whose term equals 0 /// For the leader, there should never be a gap between snapshot and entries +/// +/// Examples: +/// This example will describe the relationship among `batch_end`, `batch_limit`, `first_idx_in_cur_batch` and `cur_batch_size` +/// if `batch_limit` = 11 and entry sizes is vec![1,5,6,2,3,4,2], then the relationship between entry size and `batch_end` looks like: +/// ------------------------------------------- +/// `entry_size[i]`| 1 | 5 | 6 | 2 | 3 | 4 | 2 | +/// ---------------+---------------------------+ +/// i | 0 | 1 | 2 | 3 | 4 | 5 | 6 | +/// ---------------+---+---+---+---+---+---+---+ +/// `batch_end[i]` | 1 | 2 | 4 | 5 | 0 | 0 | 0 | +/// ------------------------------------------- +/// ↑ +/// `first_idx_in_cur_batch` +/// (0, `batch_end[0]`) = (0,1), which means the `entries[0..=1]` is a valid batch whose size is 1+5=6, less than the `batch_limit` +/// (1, `batch_end[1]`) = (1,2), which means the `entries[1..=2]` is a valid batch whose size is 5+6=11, equal to the `batch_limit` +/// ... +/// (`first_idx_in_cur_batch`, `batch_end[first_idx_in_cur_batch]`) = (4, 0), which means the `entries[4..]` is a valid batch (aka. current batch) whose size (aka. `cur_batch_size`) is 3+4+2=9, less than the `batch_limit` pub(super) struct Log { /// Log entries, should be persisted + /// A VecDeque to store log entries, it will be serialized and persisted /// Note that the logical index in `LogEntry` is different from physical index - entries: LogEntryVecDeque, + entries: VecDeque>, + /// Each element `batch_end[i]` represents the right inclusive bound of a log batch whose size is less than or equal to `batch_limit` + /// if you want to fetch a batch which begins at the index `i`, you can fetch it directly by `i..=batch_end[i]` + batch_end: VecDeque, + /// Batch size limit + batch_limit: u64, + /// the first entry idx of the current batch + first_idx_in_cur_batch: usize, + /// the current batch size + cur_batch_size: u64, /// The last log index that has been compacted pub(super) base_index: LogIndex, /// The last log term that has been compacted @@ -77,120 +150,149 @@ impl FallbackContext { } } -/// That's a struct to store log entries and calculate batch of log -#[derive(Debug)] -struct LogEntryVecDeque { - /// A VecDeque to store log entries, it will be serialized and persisted - entries: VecDeque>>, - /// The sum of serialized size of previous log entries - /// batch_index[i+1] = batch_index[i] + size(entries[i]) - batch_index: VecDeque, - /// Batch size limit - batch_limit: u64, -} - -impl LogEntryVecDeque { - /// return a log entries with cap - fn new(cap: usize, batch_limit: u64) -> Self { - let mut batch_index = VecDeque::with_capacity(cap.overflow_add(1)); - batch_index.push_back(0); - Self { - entries: VecDeque::with_capacity(cap), - batch_index, - batch_limit, - } - } - +impl Log { /// Shortens the log entries, keeping the first `len` elements and dropping /// the rest. - /// `batch_index` will keep len+1 elem + /// `batch_end` will keep len elem fn truncate(&mut self, len: usize) { self.entries.truncate(len); - self.batch_index.truncate(len.overflow_add(1)); + self.batch_end.truncate(len); + let last_index = if len == 0 { return } else { len - 1 }; + self.first_idx_in_cur_batch = min(self.first_idx_in_cur_batch, len); + #[allow(clippy::indexing_slicing)] + // it's safe since we check `self.first_idx_in_cur_batch` at the very first beginning + loop { + if self.first_idx_in_cur_batch == 0 { + break; + } + let end = self.li_to_pi(self.batch_end[self.first_idx_in_cur_batch - 1]); + match end.cmp(&last_index) { + // All the `batch_end[i]` lager than `len - 1` should be reset to zero + Ordering::Greater => { + self.batch_end[self.first_idx_in_cur_batch - 1] = 0; + self.first_idx_in_cur_batch -= 1; + } + Ordering::Equal => { + // when the `end == last_index`, it means that we should compare the sum of `get_range_by_batch(self.first_idx_in_cur_batch - 1)` with `batch_limit` + // Less: indicates that it should be a part of the current batch so we should update the relevant element in `batch_end` + // Equal: indicates that it shouldn't be a part of the current batch. We terminate this loop when it happens. + // Greater: never gonna be happened + let real_batch_size: u64 = self + .entries + .range(self.get_range_by_batch(self.first_idx_in_cur_batch - 1)) + .map(|entry| entry.size) + .sum(); + if real_batch_size < self.batch_limit { + self.batch_end[self.first_idx_in_cur_batch - 1] = 0; + self.first_idx_in_cur_batch -= 1; + } else { + break; + } + } + Ordering::Less => { + break; + } + } + } + + // recalculate the `cur_batch_size` + self.cur_batch_size = 0; + for entry in self.entries.iter().skip(self.first_idx_in_cur_batch) { + self.cur_batch_size += entry.size; + } } /// push a log entry into the back of queue - fn push_back(&mut self, entry: Arc>) -> Result<(), bincode::Error> { - let entry_size = serialized_size(&entry)?; + fn push_back(&mut self, inner: Arc>, size: u64) { + if size > self.batch_limit { + warn!("entry_size of an entry > batch_limit, which may be too small.",); + } - self.entries.push_back(entry); - let Some(&pre_entries_size) = self.batch_index.back() else { - unreachable!("batch_index cannot be None") - }; - self.batch_index - .push_back(pre_entries_size.overflow_add(entry_size)); - Ok(()) + self.entries.push_back(Entry { inner, size }); + self.batch_end.push_back(0); // placeholder + self.cur_batch_size += size; + + // it's safe to do so: + // 1. The `self.first_idx_in_cur_batch` is always less than `self.batch_end.len()` + // 2. The `self.entries.len()` is always larger than or equal to 1 + // 3. When the condition `self.cur_batch_size > self.batch_limit && entry < self.batch_limit` is met, the `self.entries.len()` is always larger than or equal to 2 + #[allow(clippy::indexing_slicing)] + // when the `cur_batch_size` >= `batch_limit` is true, we should do the following three things: + // 1. update the `batch_end[first_idx_in_cur_batch]` + // 2. remove the size of `entries[first_idx_in_cur_batch]` from `cur_batch_size` + // 3. increase the `first_idx_in_cur_batch` + while self.cur_batch_size >= self.batch_limit { + self.batch_end[self.first_idx_in_cur_batch] = + if self.cur_batch_size == self.batch_limit || size > self.batch_limit { + self.entries[self.entries.len() - 1].inner.index + } else { + self.entries[self.entries.len() - 2].inner.index + }; + self.cur_batch_size -= self.entries[self.first_idx_in_cur_batch].size; + self.first_idx_in_cur_batch += 1; + } } /// pop a log entry from the front of queue fn pop_front(&mut self) -> Option>> { - if self.entries.front().is_some() { - _ = self.batch_index.pop_front(); - self.entries.pop_front() + if let Some(entry) = self.entries.pop_front() { + if self.first_idx_in_cur_batch == 0 { + self.cur_batch_size -= entry.size; + } else { + self.first_idx_in_cur_batch -= 1; + } + let _ = self + .batch_end + .pop_front() + .unwrap_or_else(|| unreachable!("The batch_end cannot be empty")); + Some(entry.inner) } else { None } } /// restore log entries from Vec - fn restore(&mut self, entries: Vec>) { - let mut batch_index = VecDeque::with_capacity(entries.capacity()); - batch_index.push_back(0); - for entry in &entries { - #[allow(clippy::expect_used)] - let entry_size = - serialized_size(entry).expect("log entry {entry:?} cannot be serialized"); - if let Some(cur_size) = batch_index.back() { - batch_index.push_back(cur_size.overflow_add(entry_size)); - } - } + fn restore(&mut self, entries: Vec>) -> Result<(), bincode::Error> { + self.batch_end = VecDeque::with_capacity(entries.capacity()); + self.entries = VecDeque::with_capacity(entries.capacity()); - self.entries = entries.into_iter().map(Arc::new).collect(); - self.batch_index = batch_index; + self.cur_batch_size = 0; + self.first_idx_in_cur_batch = 0; + + for entry in entries { + let entry = Arc::from(entry); + let size = bincode::serialized_size(&entry)?; + self.push_back(entry, size); + } + Ok(()) } /// clear whole log entries fn clear(&mut self) { self.entries.clear(); - self.batch_index.clear(); - self.batch_index.push_back(0); + self.batch_end.clear(); + self.cur_batch_size = 0; + self.first_idx_in_cur_batch = 0; } /// Get the range [left, right) of the log entry, whose size should be equal or smaller than `batch_limit` - fn get_range_by_batch(&self, left: usize) -> Range { - #[allow(clippy::indexing_slicing)] - let target = self.batch_index[left].overflow_add(self.batch_limit); - // remove the fake index 0 in `batch_index` - match self.batch_index.binary_search(&target) { - Ok(right) => left..right, - Err(right) => left..right - 1, + #[allow(clippy::indexing_slicing)] // it's safe to do so since we validate `left` at very begin place + fn get_range_by_batch(&self, left: usize) -> LogRange { + if left >= self.batch_end.len() { + return LogRange::Range(self.batch_end.len()..self.batch_end.len()); } - } - - /// Get a range of log entry - fn get_from(&self, left: usize) -> Vec>> { - let range = self.get_range_by_batch(left); - self.entries.range(range).cloned().collect_vec() - } - /// check whether the log entry range [li,..) exceeds the batch limit or not - fn has_next_batch(&self, left: usize) -> bool { - if let (Some(&cur_size), Some(&last_size)) = - (self.batch_index.get(left), self.batch_index.back()) - { - let target_size = cur_size.overflow_add(self.batch_limit); - target_size <= last_size - } else { - false + if self.entries[left].size == self.batch_limit { + return LogRange::RangeInclusive(left..=left); } - } -} -impl std::ops::Deref for LogEntryVecDeque { - type Target = VecDeque>>; + let right = if self.batch_end[left] == 0 { + self.entries.len() - 1 + } else { + self.li_to_pi(self.batch_end[left]) + }; - fn deref(&self) -> &Self::Target { - &self.entries + LogRange::RangeInclusive(left..=right) } } @@ -220,15 +322,19 @@ impl Log { entries_cap: usize, ) -> Self { Self { - entries: LogEntryVecDeque::new(entries_cap, batch_limit), + entries: VecDeque::with_capacity(entries_cap), + batch_end: VecDeque::with_capacity(entries_cap), + batch_limit, + first_idx_in_cur_batch: 0, + cur_batch_size: 0, commit_index: 0, base_index: 0, base_term: 0, last_as: 0, last_exe: 0, log_tx, - entries_cap, fallback_contexts: HashMap::new(), + entries_cap, } } @@ -236,14 +342,14 @@ impl Log { pub(super) fn last_log_index(&self) -> LogIndex { self.entries .back() - .map_or(self.base_index, |entry| entry.index) + .map_or(self.base_index, |entry| entry.inner.index) } /// Get last log term pub(super) fn last_log_term(&self) -> u64 { self.entries .back() - .map_or(self.base_term, |entry| entry.term) + .map_or(self.base_term, |entry| entry.inner.term) } /// Transform logical index to physical index of `self.entries` @@ -259,7 +365,7 @@ impl Log { /// Get log entry pub(super) fn get(&self, i: LogIndex) -> Option<&Arc>> { (i > self.base_index) - .then(|| self.entries.get(self.li_to_pi(i))) + .then(|| self.entries.get(self.li_to_pi(i)).map(|entry| &entry.inner)) .flatten() } @@ -288,7 +394,7 @@ impl Log { if self .entries .get(pi) - .map_or(true, |old_entry| old_entry.term != entry.term) + .map_or(true, |old_entry| old_entry.inner.term != entry.term) { break; } @@ -296,12 +402,12 @@ impl Log { } // Record entries that need to be fallback in the truncated entries for e in self.entries.range(pi..) { - if matches!(e.entry_data, EntryData::ConfChange(_)) { - let _ig = need_fallback_indexes.insert(e.index); + if matches!(e.inner.entry_data, EntryData::ConfChange(_)) { + let _ig = need_fallback_indexes.insert(e.inner.index); } } // Truncate entries - self.entries.truncate(pi); + self.truncate(pi); // Push the remaining entries and record the conf change entries for entry in entries .into_iter() @@ -312,9 +418,10 @@ impl Log { conf_changes.push(Arc::clone(&entry)); } #[allow(clippy::expect_used)] // It's safe to expect here. - self.entries - .push_back(Arc::clone(&entry)) - .expect("log entry {entry:?} cannot be serialized"); + self.push_back( + Arc::clone(&entry), + bincode::serialized_size(&entry).expect("log entry {entry:?} cannot be serialized"), + ); self.send_persist(entry); } @@ -349,26 +456,40 @@ impl Log { ) -> Result>, bincode::Error> { let index = self.last_log_index() + 1; let entry = Arc::new(LogEntry::new(index, term, propose_id, entry)); - self.entries.push_back(Arc::clone(&entry))?; + let size = bincode::serialized_size(&entry)?; + self.push_back(Arc::clone(&entry), size); self.send_persist(Arc::clone(&entry)); Ok(entry) } /// check whether the log entry range [li,..) exceeds the batch limit or not + #[allow(clippy::indexing_slicing)] // it's safe to do so since the length of `batch_end` is always same as `entry_size` pub(super) fn has_next_batch(&self, li: u64) -> bool { - let idx = self.li_to_pi(li); - self.entries.has_next_batch(idx) + let left = self.li_to_pi(li); + if let Some(&batch_end) = self.batch_end.get(left) { + batch_end != 0 || self.entries[left].size == self.batch_limit + } else { + false + } } /// Get a range of log entry pub(super) fn get_from(&self, li: LogIndex) -> Vec>> { - let left_bound = self.li_to_pi(li); - self.entries.get_from(left_bound) + let left = self.li_to_pi(li); + let range = self.get_range_by_batch(left); + self.entries + .range(range) + .map(|entry| &entry.inner) + .cloned() + .collect_vec() } /// Get existing cmd ids pub(super) fn get_cmd_ids(&self) -> HashSet { - self.entries.iter().map(|entry| entry.propose_id).collect() + self.entries + .iter() + .map(|entry| entry.inner.propose_id) + .collect() } /// Get previous log entry's term and index @@ -391,14 +512,18 @@ impl Log { self.last_as = meta.last_included_index; self.last_exe = meta.last_included_index; self.commit_index = meta.last_included_index; - self.entries.clear(); + self.clear(); } /// Restore log entries, provided entries must be in order - pub(super) fn restore_entries(&mut self, entries: Vec>) { + pub(super) fn restore_entries( + &mut self, + entries: Vec>, + ) -> Result<(), bincode::Error> { // restore batch index - self.entries.restore(entries); + self.restore(entries)?; self.compact(); + Ok(()) } /// Compact log @@ -406,20 +531,22 @@ impl Log { let Some(first_entry) = self.entries.front() else { return; }; - if self.last_as <= first_entry.index { + if self.last_as <= first_entry.inner.index { return; } - let compact_from = if self.last_as - first_entry.index >= self.entries_cap.numeric_cast() { - self.last_as - self.entries_cap.numeric_cast::() - } else { - return; - }; + let compact_from = + if self.last_as - first_entry.inner.index >= self.entries_cap.numeric_cast() { + self.last_as - self.entries_cap.numeric_cast::() + } else { + return; + }; while self .entries .front() - .map_or(false, |e| e.index <= compact_from) + .map_or(false, |entry| entry.inner.index <= compact_from) { - match self.entries.pop_front() { + let res = self.pop_front(); + match res { Some(entry) => { self.base_index = entry.index; self.base_term = entry.term; @@ -448,6 +575,23 @@ impl Log { false }); } + + #[cfg(test)] + /// set batch limit and reconstruct `batch_end` + pub(super) fn set_batch_limit(&mut self, batch_limit: u64) { + #![allow(clippy::indexing_slicing)] + self.batch_limit = batch_limit; + self.cur_batch_size = 0; + self.first_idx_in_cur_batch = 0; + self.batch_end.clear(); + + let prev_entries = self.entries.clone(); + self.entries.clear(); + + let _unused = prev_entries.into_iter().for_each(|entry| { + let _u = self.push_back(entry.inner, entry.size); + }); + } } #[cfg(test)] @@ -465,14 +609,10 @@ mod tests { fn index(&self, i: usize) -> &Self::Output { let pi = self.li_to_pi(i.numeric_cast()); - &self.entries[pi] + &self.entries[pi].inner } } - fn set_batch_limit(log: &mut Log, batch_limit: u64) { - log.entries.batch_limit = batch_limit; - } - #[test] fn test_log_up_to_date() { let (log_tx, _log_rx) = mpsc::unbounded_channel(); @@ -575,68 +715,68 @@ mod tests { .enumerate() .map(|(idx, cmd)| log.push(1, ProposeId(0, idx.numeric_cast()), cmd).unwrap()) .collect::>(); - let log_entry_size = log.entries.batch_index[1]; + let log_entry_size = log.entries[0].size; - set_batch_limit(&mut log, 3 * log_entry_size - 1); - let bound_1 = log.entries.get_range_by_batch(3); + log.set_batch_limit(3 * log_entry_size - 1); + let bound_1 = log.get_range_by_batch(3); assert_eq!( bound_1, - 3..5, - "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + LogRange::RangeInclusive(3..=4), + "batch_end = {:?}, batch = {}, log_entry_size = {}", + log.batch_end, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(8)); assert!(!log.has_next_batch(9)); - set_batch_limit(&mut log, 4 * log_entry_size); - let bound_2 = log.entries.get_range_by_batch(3); + log.set_batch_limit(4 * log_entry_size); + let bound_2 = log.get_range_by_batch(3); assert_eq!( bound_2, - 3..7, - "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + LogRange::RangeInclusive(3..=6), + "batch_end = {:?}, batch = {}, log_entry_size = {}", + log.batch_end, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(7)); assert!(!log.has_next_batch(8)); - set_batch_limit(&mut log, 5 * log_entry_size + 2); - let bound_3 = log.entries.get_range_by_batch(3); + log.set_batch_limit(5 * log_entry_size + 2); + let bound_3 = log.get_range_by_batch(3); assert_eq!( bound_3, - 3..8, - "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + LogRange::RangeInclusive(3..=7), + "batch_end = {:?}, batch = {}, log_entry_size = {}", + log.batch_end, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(5)); assert!(!log.has_next_batch(6)); - set_batch_limit(&mut log, 100 * log_entry_size); - let bound_4 = log.entries.get_range_by_batch(3); + log.set_batch_limit(100 * log_entry_size); + let bound_4 = log.get_range_by_batch(3); assert_eq!( bound_4, - 3..10, - "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + LogRange::RangeInclusive(3..=9), + "batch_end = {:?}, batch = {}, log_entry_size = {}", + log.batch_end, + log.batch_limit, log_entry_size ); assert!(!log.has_next_batch(1)); assert!(!log.has_next_batch(5)); - set_batch_limit(&mut log, log_entry_size - 1); - let bound_5 = log.entries.get_range_by_batch(3); + log.set_batch_limit(log_entry_size - 1); + let bound_5 = log.get_range_by_batch(3); assert_eq!( bound_5, - 3..3, - "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + LogRange::RangeInclusive(3..=3), + "batch_end = {:?}, batch = {}, log_entry_size = {}", + log.batch_end, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(10)); @@ -662,26 +802,9 @@ mod tests { let mut log = Log::::new(tx, default_batch_max_size(), default_log_entries_cap()); - log.restore_entries(entries); + log.restore_entries(entries).unwrap(); assert_eq!(log.entries.len(), 10); - assert_eq!(log.entries.batch_index.len(), 11); - assert_eq!(log.entries.batch_index[0], 0); - let entry_size = log.entries.batch_index[1]; - - log.entries - .batch_index - .iter() - .enumerate() - .for_each(|(idx, &size)| { - assert_eq!( - size, - entry_size * idx.numeric_cast::(), - "batch_index = {:?}, batch = {}, entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, - entry_size - ); - }); + assert_eq!(log.batch_end.len(), 10); } #[test] @@ -697,7 +820,95 @@ mod tests { log.last_exe = 22; log.compact(); assert_eq!(log.base_index, 12); - assert_eq!(log.entries.front().unwrap().index, 13); - assert_eq!(log.entries.batch_index.len(), 19); + assert_eq!(log.entries.front().unwrap().inner.index, 13); + assert_eq!(log.batch_end.len(), 18); + assert!(log.entries.len() == log.batch_end.len()); + } + + #[test] + fn get_from_should_success_after_compact() { + let (log_tx, _log_rx) = mpsc::unbounded_channel(); + let mut log = Log::::new(log_tx, default_batch_max_size(), 10); + for i in 0..30 { + log.push(0, ProposeId(0, i), Arc::new(TestCommand::default())) + .unwrap(); + } + let log_entry_size = log.entries[0].size; + log.set_batch_limit(2 * log_entry_size); + log.last_as = 22; + log.last_exe = 22; + log.compact(); + assert_eq!(log.base_index, 12); + assert_eq!(log.entries.front().unwrap().inner.index, 13); + assert_eq!(log.batch_end.len(), 18); + assert_eq!(log.first_idx_in_cur_batch, 17); + + let batch_1 = log.get_range_by_batch(3); + assert_eq!( + batch_1, + LogRange::RangeInclusive(3..=4), + "batch_end = {:?}, batch = {}, log_entry_size = {}", + log.batch_end, + log.batch_limit, + log_entry_size + ); + + let batch_2 = log.get_range_by_batch(1024); + assert_eq!( + batch_2, + LogRange::Range(18..18), + "batch_end = {:?}, batch = {}, log_entry_size = {}", + log.batch_end, + log.batch_limit, + log_entry_size + ); + assert!(log.has_next_batch(15)); + } + + #[test] + fn batch_info_should_update_correctly_after_truncated() { + let (log_tx, _log_rx) = mpsc::unbounded_channel(); + let mut log = Log::::new(log_tx, 11, 10); + let mock_entries_sizes = vec![1, 5, 6, 2, 3, 4, 5]; + let test_cmd = Arc::new(TestCommand::default()); + + let _entries = repeat(Arc::clone(&test_cmd)) + .take(mock_entries_sizes.len()) + .enumerate() + .map(|(idx, cmd)| { + Arc::new(LogEntry::new( + (idx + 1).numeric_cast(), + 1, + ProposeId(0, idx.numeric_cast()), + cmd, + )) + }) + .zip(mock_entries_sizes) + .map(|(entry, size)| log.push_back(entry, size)) + .collect::>(); + + assert_eq!(log.cur_batch_size, 9); + assert_eq!(log.first_idx_in_cur_batch, 5); + + // case 1. truncate len > first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5, 6, 2, 3, 4], the `batch_end` should be [2, 3, 5, 0, 0, 0] + log.truncate(6); + assert_eq!(log.first_idx_in_cur_batch, 3); + assert_eq!(log.cur_batch_size, 9); + assert_eq!(log.batch_end, VecDeque::from(vec![2, 3, 5, 0, 0, 0])); + + // case 2. truncate len = first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5, 6, 2, 3], the `batch_end` should be [2, 3, 5, 0, 0] + log.truncate(5); + assert_eq!(log.first_idx_in_cur_batch, 3); + assert_eq!(log.cur_batch_size, 5); + assert_eq!(log.batch_end, VecDeque::from(vec![2, 3, 5, 0, 0])); + + // case 3. truncate len < first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5], the `batch_end` should be [0, 0] + log.truncate(2); + assert_eq!(log.first_idx_in_cur_batch, 0); + assert_eq!(log.cur_batch_size, 6); + assert_eq!(log.batch_end, VecDeque::from(vec![0, 0])); } } diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index a129a9c71..e3f24d22b 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -47,9 +47,11 @@ use self::{ state::{CandidateState, LeaderState, State}, }; use super::{ - cmd_worker::CEEventTxApi, conflict::spec_pool_new::SpeculativePool, - conflict::uncommitted_pool::UncommittedPool, lease_manager::LeaseManagerRef, - storage::StorageApi, DB, + cmd_worker::CEEventTxApi, + conflict::{spec_pool_new::SpeculativePool, uncommitted_pool::UncommittedPool}, + lease_manager::LeaseManagerRef, + storage::StorageApi, + DB, }; use crate::{ cmd::Command, @@ -216,7 +218,9 @@ impl RawCurpBuilder { log_w.last_as = last_applied; log_w.last_exe = last_applied; log_w.commit_index = last_applied; - log_w.restore_entries(args.entries); + log_w + .restore_entries(args.entries) + .map_err(|e| RawCurpBuilderError::ValidationError(e.to_string()))?; } Ok(raw_curp) diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 96166c7e4..e68ba1584 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -17,8 +17,6 @@ use tokio::{ }; use tokio_stream::StreamExt; -use crate::log_entry::LogEntry; - use super::{ codec::{DataFrame, DataFrameOwned, WAL}, error::{CorruptType, WALError}, @@ -26,6 +24,7 @@ use super::{ util::{get_checksum, parse_u64, validate_data, LockedFile}, WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, }; +use crate::log_entry::LogEntry; /// The size of wal file header in bytes const WAL_HEADER_SIZE: usize = 56; @@ -307,9 +306,8 @@ mod tests { use curp_test_utils::test_cmd::TestCommand; - use crate::log_entry::EntryData; - use super::*; + use crate::log_entry::EntryData; #[test] fn gen_parse_header_is_correct() {