diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 62a133729..f7996649f 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -1,7 +1,7 @@ #![allow(clippy::arithmetic_side_effects)] // u64 is large enough and won't overflow use std::{ - cmp::min, + cmp::{min, Ordering}, collections::{HashMap, HashSet, VecDeque}, fmt::Debug, ops::{Bound, Range, RangeBounds, RangeInclusive}, @@ -9,7 +9,6 @@ use std::{ vec, }; -use bincode::serialized_size; use clippy_utilities::NumericCast; use itertools::Itertools; use tokio::sync::mpsc; @@ -24,6 +23,15 @@ use crate::{ LogIndex, }; +/// A `LogEntry` wrapper +#[derive(Debug, Clone)] +struct Entry { + /// The inner `LogEntry` + inner: Arc>, + /// The serialized size of the inner `LogEntry` + size: u64, +} + /// Enum representing a range of values in the log. #[derive(Debug, Eq, PartialEq)] enum LogRange { @@ -66,8 +74,8 @@ impl From> for LogRange { /// For the leader, there should never be a gap between snapshot and entries /// /// Examples: -/// This example will describe the relationship among `entry_size`, `batch_end`, `batch_limit`, `first_idx_in_cur_batch` and `cur_batch_size` -/// if `batch_limit` = 11 and `entry_size` = vec![1,5,6,2,3,4,2], then the relationship between `entry_size` and `batch_end` looks like: +/// This example will describe the relationship among `batch_end`, `batch_limit`, `first_idx_in_cur_batch` and `cur_batch_size` +/// if `batch_limit` = 11 and entry sizes is vec![1,5,6,2,3,4,2], then the relationship between entry size and `batch_end` looks like: /// ------------------------------------------- /// `entry_size[i]`| 1 | 5 | 6 | 2 | 3 | 4 | 2 | /// ---------------+---------------------------+ @@ -85,9 +93,7 @@ pub(super) struct Log { /// Log entries, should be persisted /// A VecDeque to store log entries, it will be serialized and persisted /// Note that the logical index in `LogEntry` is different from physical index - entries: VecDeque>>, - /// entry size of each item in entries - entry_size: VecDeque, + entries: VecDeque>, /// Each element `batch_end[i]` represents the right inclusive bound of a log batch whose size is less than or equal to `batch_limit` /// if you want to fetch a batch which begins at the index `i`, you can fetch it directly by `i..=batch_end[i]` batch_end: VecDeque, @@ -150,29 +156,61 @@ impl Log { /// `batch_end` will keep len elem fn truncate(&mut self, len: usize) { self.entries.truncate(len); - self.entry_size.truncate(len); self.batch_end.truncate(len); + let last_index = if len == 0 { return } else { len - 1 }; self.first_idx_in_cur_batch = min(self.first_idx_in_cur_batch, len); - #[allow(clippy::indexing_slicing)] - while self.li_to_pi(self.batch_end[self.first_idx_in_cur_batch - 1]) >= len { - self.batch_end[self.first_idx_in_cur_batch - 1] = 0; - self.first_idx_in_cur_batch -= 1; + // it's safe since we check `self.first_idx_in_cur_batch` at the very first beginning + loop { + if self.first_idx_in_cur_batch == 0 { + break; + } + let end = self.li_to_pi(self.batch_end[self.first_idx_in_cur_batch - 1]); + match end.cmp(&last_index) { + // All the `batch_end[i]` lager than `len - 1` should be reset to zero + Ordering::Greater => { + self.batch_end[self.first_idx_in_cur_batch - 1] = 0; + self.first_idx_in_cur_batch -= 1; + } + Ordering::Equal => { + // when the `end == last_index`, it means that we should compare the sum of `get_range_by_batch(self.first_idx_in_cur_batch - 1)` with `batch_limit` + // Less: indicates that it should be a part of the current batch so we should update the relevant element in `batch_end` + // Equal: indicates that it shouldn't be a part of the current batch. We terminate this loop when it happens. + // Greater: never gonna be happened + let real_batch_size: u64 = self + .entries + .range(self.get_range_by_batch(self.first_idx_in_cur_batch - 1)) + .map(|entry| entry.size) + .sum(); + if real_batch_size < self.batch_limit { + self.batch_end[self.first_idx_in_cur_batch - 1] = 0; + self.first_idx_in_cur_batch -= 1; + } else { + break; + } + } + Ordering::Less => { + break; + } + } + } + + // recalculate the `cur_batch_size` + self.cur_batch_size = 0; + for entry in self.entries.iter().skip(self.first_idx_in_cur_batch) { + self.cur_batch_size += entry.size; } } /// push a log entry into the back of queue - fn push_back(&mut self, entry: Arc>) -> Result<(), bincode::Error> { - let entry_size = serialized_size(&entry)?; - - if entry_size > self.batch_limit { + fn push_back(&mut self, inner: Arc>, size: u64) { + if size > self.batch_limit { warn!("entry_size of an entry > batch_limit, which may be too small.",); } - self.entries.push_back(entry); - self.entry_size.push_back(entry_size); + self.entries.push_back(Entry { inner, size }); self.batch_end.push_back(0); // placeholder - self.cur_batch_size += entry_size; + self.cur_batch_size += size; // it's safe to do so: // 1. The `self.first_idx_in_cur_batch` is always less than `self.batch_end.len()` @@ -185,63 +223,53 @@ impl Log { // 3. increase the `first_idx_in_cur_batch` while self.cur_batch_size >= self.batch_limit { self.batch_end[self.first_idx_in_cur_batch] = - if self.cur_batch_size == self.batch_limit || entry_size > self.batch_limit { - self.entries[self.entries.len() - 1].index + if self.cur_batch_size == self.batch_limit || size > self.batch_limit { + self.entries[self.entries.len() - 1].inner.index } else { - self.entries[self.entries.len() - 2].index + self.entries[self.entries.len() - 2].inner.index }; - self.cur_batch_size -= self.entry_size[self.first_idx_in_cur_batch]; + self.cur_batch_size -= self.entries[self.first_idx_in_cur_batch].size; self.first_idx_in_cur_batch += 1; } - Ok(()) } /// pop a log entry from the front of queue fn pop_front(&mut self) -> Option>> { - if self.entries.front().is_some() { - let front_size = *self - .entry_size - .front() - .unwrap_or_else(|| unreachable!("The entry_size cannot be empty")); - + if let Some(entry) = self.entries.pop_front() { if self.first_idx_in_cur_batch == 0 { - self.cur_batch_size -= front_size; + self.cur_batch_size -= entry.size; } else { self.first_idx_in_cur_batch -= 1; } - let _ = self .batch_end .pop_front() .unwrap_or_else(|| unreachable!("The batch_end cannot be empty")); - let _ = self - .entry_size - .pop_front() - .unwrap_or_else(|| unreachable!("The pop_front cannot be empty")); - self.entries.pop_front() + Some(entry.inner) } else { None } } /// restore log entries from Vec - fn restore(&mut self, entries: Vec>) { + fn restore(&mut self, entries: Vec>) -> Result<(), bincode::Error> { self.batch_end = VecDeque::with_capacity(entries.capacity()); self.entries = VecDeque::with_capacity(entries.capacity()); - self.entry_size = VecDeque::with_capacity(entries.capacity()); self.cur_batch_size = 0; self.first_idx_in_cur_batch = 0; for entry in entries { - let _unuse = self.push_back(Arc::from(entry)); + let entry = Arc::from(entry); + let size = bincode::serialized_size(&entry)?; + self.push_back(entry, size); } + Ok(()) } /// clear whole log entries fn clear(&mut self) { self.entries.clear(); - self.entry_size.clear(); self.batch_end.clear(); self.cur_batch_size = 0; self.first_idx_in_cur_batch = 0; @@ -254,7 +282,7 @@ impl Log { return LogRange::Range(self.batch_end.len()..self.batch_end.len()); } - if self.entry_size[left] == self.batch_limit { + if self.entries[left].size == self.batch_limit { return LogRange::RangeInclusive(left..=left); } @@ -295,7 +323,6 @@ impl Log { ) -> Self { Self { entries: VecDeque::with_capacity(entries_cap), - entry_size: VecDeque::with_capacity(entries_cap), batch_end: VecDeque::with_capacity(entries_cap), batch_limit, first_idx_in_cur_batch: 0, @@ -315,14 +342,14 @@ impl Log { pub(super) fn last_log_index(&self) -> LogIndex { self.entries .back() - .map_or(self.base_index, |entry| entry.index) + .map_or(self.base_index, |entry| entry.inner.index) } /// Get last log term pub(super) fn last_log_term(&self) -> u64 { self.entries .back() - .map_or(self.base_term, |entry| entry.term) + .map_or(self.base_term, |entry| entry.inner.term) } /// Transform logical index to physical index of `self.entries` @@ -338,7 +365,7 @@ impl Log { /// Get log entry pub(super) fn get(&self, i: LogIndex) -> Option<&Arc>> { (i > self.base_index) - .then(|| self.entries.get(self.li_to_pi(i))) + .then(|| self.entries.get(self.li_to_pi(i)).map(|entry| &entry.inner)) .flatten() } @@ -367,7 +394,7 @@ impl Log { if self .entries .get(pi) - .map_or(true, |old_entry| old_entry.term != entry.term) + .map_or(true, |old_entry| old_entry.inner.term != entry.term) { break; } @@ -375,8 +402,8 @@ impl Log { } // Record entries that need to be fallback in the truncated entries for e in self.entries.range(pi..) { - if matches!(e.entry_data, EntryData::ConfChange(_)) { - let _ig = need_fallback_indexes.insert(e.index); + if matches!(e.inner.entry_data, EntryData::ConfChange(_)) { + let _ig = need_fallback_indexes.insert(e.inner.index); } } // Truncate entries @@ -391,8 +418,10 @@ impl Log { conf_changes.push(Arc::clone(&entry)); } #[allow(clippy::expect_used)] // It's safe to expect here. - self.push_back(Arc::clone(&entry)) - .expect("log entry {entry:?} cannot be serialized"); + self.push_back( + Arc::clone(&entry), + bincode::serialized_size(&entry).expect("log entry {entry:?} cannot be serialized"), + ); self.send_persist(entry); } @@ -427,7 +456,8 @@ impl Log { ) -> Result>, bincode::Error> { let index = self.last_log_index() + 1; let entry = Arc::new(LogEntry::new(index, term, propose_id, entry)); - self.push_back(Arc::clone(&entry))?; + let size = bincode::serialized_size(&entry)?; + self.push_back(Arc::clone(&entry), size); self.send_persist(Arc::clone(&entry)); Ok(entry) } @@ -437,7 +467,7 @@ impl Log { pub(super) fn has_next_batch(&self, li: u64) -> bool { let left = self.li_to_pi(li); if let Some(&batch_end) = self.batch_end.get(left) { - batch_end != 0 || self.entry_size[left] == self.batch_limit + batch_end != 0 || self.entries[left].size == self.batch_limit } else { false } @@ -447,12 +477,19 @@ impl Log { pub(super) fn get_from(&self, li: LogIndex) -> Vec>> { let left = self.li_to_pi(li); let range = self.get_range_by_batch(left); - self.entries.range(range).cloned().collect_vec() + self.entries + .range(range) + .map(|entry| &entry.inner) + .cloned() + .collect_vec() } /// Get existing cmd ids pub(super) fn get_cmd_ids(&self) -> HashSet { - self.entries.iter().map(|entry| entry.propose_id).collect() + self.entries + .iter() + .map(|entry| entry.inner.propose_id) + .collect() } /// Get previous log entry's term and index @@ -479,10 +516,14 @@ impl Log { } /// Restore log entries, provided entries must be in order - pub(super) fn restore_entries(&mut self, entries: Vec>) { + pub(super) fn restore_entries( + &mut self, + entries: Vec>, + ) -> Result<(), bincode::Error> { // restore batch index - self.restore(entries); + self.restore(entries)?; self.compact(); + Ok(()) } /// Compact log @@ -490,18 +531,19 @@ impl Log { let Some(first_entry) = self.entries.front() else { return; }; - if self.last_as <= first_entry.index { + if self.last_as <= first_entry.inner.index { return; } - let compact_from = if self.last_as - first_entry.index >= self.entries_cap.numeric_cast() { - self.last_as - self.entries_cap.numeric_cast::() - } else { - return; - }; + let compact_from = + if self.last_as - first_entry.inner.index >= self.entries_cap.numeric_cast() { + self.last_as - self.entries_cap.numeric_cast::() + } else { + return; + }; while self .entries .front() - .map_or(false, |e| e.index <= compact_from) + .map_or(false, |entry| entry.inner.index <= compact_from) { let res = self.pop_front(); match res { @@ -542,13 +584,12 @@ impl Log { self.cur_batch_size = 0; self.first_idx_in_cur_batch = 0; self.batch_end.clear(); - self.entry_size.clear(); let prev_entries = self.entries.clone(); self.entries.clear(); - let _unused = prev_entries.into_iter().for_each(|item| { - let _u = self.push_back(item); + let _unused = prev_entries.into_iter().for_each(|entry| { + let _u = self.push_back(entry.inner, entry.size); }); } } @@ -568,7 +609,7 @@ mod tests { fn index(&self, i: usize) -> &Self::Output { let pi = self.li_to_pi(i.numeric_cast()); - &self.entries[pi] + &self.entries[pi].inner } } @@ -674,7 +715,7 @@ mod tests { .enumerate() .map(|(idx, cmd)| log.push(1, ProposeId(0, idx.numeric_cast()), cmd).unwrap()) .collect::>(); - let log_entry_size = log.entry_size[0]; + let log_entry_size = log.entries[0].size; log.set_batch_limit(3 * log_entry_size - 1); let bound_1 = log.get_range_by_batch(3); @@ -761,10 +802,9 @@ mod tests { let mut log = Log::::new(tx, default_batch_max_size(), default_log_entries_cap()); - log.restore_entries(entries); + log.restore_entries(entries).unwrap(); assert_eq!(log.entries.len(), 10); assert_eq!(log.batch_end.len(), 10); - assert_eq!(log.entry_size.len(), 10); } #[test] @@ -780,10 +820,9 @@ mod tests { log.last_exe = 22; log.compact(); assert_eq!(log.base_index, 12); - assert_eq!(log.entries.front().unwrap().index, 13); + assert_eq!(log.entries.front().unwrap().inner.index, 13); assert_eq!(log.batch_end.len(), 18); assert!(log.entries.len() == log.batch_end.len()); - assert!(log.entry_size.len() == log.entries.len()); } #[test] @@ -794,13 +833,13 @@ mod tests { log.push(0, ProposeId(0, i), Arc::new(TestCommand::default())) .unwrap(); } - let log_entry_size = log.entry_size[0]; + let log_entry_size = log.entries[0].size; log.set_batch_limit(2 * log_entry_size); log.last_as = 22; log.last_exe = 22; log.compact(); assert_eq!(log.base_index, 12); - assert_eq!(log.entries.front().unwrap().index, 13); + assert_eq!(log.entries.front().unwrap().inner.index, 13); assert_eq!(log.batch_end.len(), 18); assert_eq!(log.first_idx_in_cur_batch, 17); @@ -825,4 +864,51 @@ mod tests { ); assert!(log.has_next_batch(15)); } + + #[test] + fn batch_info_should_update_correctly_after_truncated() { + let (log_tx, _log_rx) = mpsc::unbounded_channel(); + let mut log = Log::::new(log_tx, 11, 10); + let mock_entries_sizes = vec![1, 5, 6, 2, 3, 4, 5]; + let test_cmd = Arc::new(TestCommand::default()); + + let _entries = repeat(Arc::clone(&test_cmd)) + .take(mock_entries_sizes.len()) + .enumerate() + .map(|(idx, cmd)| { + Arc::new(LogEntry::new( + (idx + 1).numeric_cast(), + 1, + ProposeId(0, idx.numeric_cast()), + cmd, + )) + }) + .zip(mock_entries_sizes) + .map(|(entry, size)| log.push_back(entry, size)) + .collect::>(); + + assert_eq!(log.cur_batch_size, 9); + assert_eq!(log.first_idx_in_cur_batch, 5); + + // case 1. truncate len > first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5, 6, 2, 3, 4], the `batch_end` should be [2, 3, 5, 0, 0, 0] + log.truncate(6); + assert_eq!(log.first_idx_in_cur_batch, 3); + assert_eq!(log.cur_batch_size, 9); + assert_eq!(log.batch_end, VecDeque::from(vec![2, 3, 5, 0, 0, 0])); + + // case 2. truncate len = first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5, 6, 2, 3], the `batch_end` should be [2, 3, 5, 0, 0] + log.truncate(5); + assert_eq!(log.first_idx_in_cur_batch, 3); + assert_eq!(log.cur_batch_size, 5); + assert_eq!(log.batch_end, VecDeque::from(vec![2, 3, 5, 0, 0])); + + // case 3. truncate len < first_idx_in_cur_batch + // after truncate, the `entries` should be [1, 5], the `batch_end` should be [0, 0] + log.truncate(2); + assert_eq!(log.first_idx_in_cur_batch, 0); + assert_eq!(log.cur_batch_size, 6); + assert_eq!(log.batch_end, VecDeque::from(vec![0, 0])); + } } diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index b8e2437b5..e3f24d22b 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -218,7 +218,9 @@ impl RawCurpBuilder { log_w.last_as = last_applied; log_w.last_exe = last_applied; log_w.commit_index = last_applied; - log_w.restore_entries(args.entries); + log_w + .restore_entries(args.entries) + .map_err(|e| RawCurpBuilderError::ValidationError(e.to_string()))?; } Ok(raw_curp) diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 96166c7e4..e68ba1584 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -17,8 +17,6 @@ use tokio::{ }; use tokio_stream::StreamExt; -use crate::log_entry::LogEntry; - use super::{ codec::{DataFrame, DataFrameOwned, WAL}, error::{CorruptType, WALError}, @@ -26,6 +24,7 @@ use super::{ util::{get_checksum, parse_u64, validate_data, LockedFile}, WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, }; +use crate::log_entry::LogEntry; /// The size of wal file header in bytes const WAL_HEADER_SIZE: usize = 56; @@ -307,9 +306,8 @@ mod tests { use curp_test_utils::test_cmd::TestCommand; - use crate::log_entry::EntryData; - use super::*; + use crate::log_entry::EntryData; #[test] fn gen_parse_header_is_correct() {