From f5ff5aa02d16a47ed387d5940dcb009907d4a07e Mon Sep 17 00:00:00 2001 From: GFX9 Date: Wed, 27 Mar 2024 21:54:36 +0800 Subject: [PATCH] fix: prevent batch_index overflow in raw_curp Closes: #368, #800 Signed-off-by: Phoeniix Zhao --- crates/curp/src/server/curp_node.rs | 6 +- crates/curp/src/server/mod.rs | 6 +- crates/curp/src/server/raw_curp/log.rs | 365 +++++++++++++++---------- crates/curp/src/server/raw_curp/mod.rs | 8 +- 4 files changed, 241 insertions(+), 144 deletions(-) diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 51866774f..c4818da9a 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -28,8 +28,10 @@ use utils::{ use super::{ cmd_board::{CmdBoardRef, CommandBoard}, cmd_worker::{conflict_checked_mpmc, start_cmd_workers}, - conflict::spec_pool_new::{SpObject, SpeculativePool}, - conflict::uncommitted_pool::{UcpObject, UncommittedPool}, + conflict::{ + spec_pool_new::{SpObject, SpeculativePool}, + uncommitted_pool::{UcpObject, UncommittedPool}, + }, gc::gc_cmd_board, lease_manager::LeaseManager, raw_curp::{AppendEntries, RawCurp, Vote}, diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index b6fca3a99..8ee55a599 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -10,8 +10,10 @@ use utils::ClientTlsConfig; use utils::{config::CurpConfig, task_manager::TaskManager, tracing::Extract}; use self::curp_node::CurpNode; -pub use self::raw_curp::RawCurp; -pub use self::{conflict::spec_pool_new::SpObject, conflict::uncommitted_pool::UcpObject}; +pub use self::{ + conflict::{spec_pool_new::SpObject, uncommitted_pool::UcpObject}, + raw_curp::RawCurp, +}; use crate::{ cmd::{Command, CommandExecutor}, members::{ClusterInfo, ServerId}, diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 4aee089d3..d120bdbf8 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -3,7 +3,7 @@ use std::{ collections::{HashMap, HashSet, VecDeque}, fmt::Debug, - ops::Range, + ops::{Range, RangeBounds, RangeInclusive, Bound}, sync::Arc, vec, }; @@ -12,7 +12,7 @@ use bincode::serialized_size; use clippy_utilities::{NumericCast, OverflowArithmetic}; use itertools::Itertools; use tokio::sync::mpsc; -use tracing::error; +use tracing::{error, warn}; use crate::{ cmd::Command, @@ -23,13 +23,62 @@ use crate::{ LogIndex, }; +/// Enum representing a range of values in the log. +#[derive(Debug, Eq, PartialEq)] +enum LogRange { + /// Represents a range using the `Range` type. + Range(Range), + /// Represents a range using the `RangeInclusive` type. + RangeInclusive(RangeInclusive), +} + +impl RangeBounds for LogRange { + fn start_bound(&self) -> Bound<&T> { + match *self { + Self::Range(ref range) => range.start_bound(), + Self::RangeInclusive(ref range) => range.start_bound(), + } + } + + fn end_bound(&self) -> Bound<&T> { + match *self { + Self::Range(ref range) => range.end_bound(), + Self::RangeInclusive(ref range) => range.end_bound(), + } + } +} + +impl From> for LogRange { + fn from(range: Range) -> Self { + Self::Range(range) + } +} + +impl From> for LogRange { + fn from(range: RangeInclusive) -> Self { + Self::RangeInclusive(range) + } +} + /// Curp logs /// There exists a fake log entry 0 whose term equals 0 /// For the leader, there should never be a gap between snapshot and entries pub(super) struct Log { /// Log entries, should be persisted + /// A VecDeque to store log entries, it will be serialized and persisted /// Note that the logical index in `LogEntry` is different from physical index - entries: LogEntryVecDeque, + entries: VecDeque>>, + /// entry size of each item in entries + entry_size: VecDeque, + /// the right index of the batch (offset) + /// batch_range: [i, li_to_pi(batch_index[i])] + batch_index: VecDeque, + /// the first entry idx of the current batch window + first_entry_at_cur_batch: usize, + /// the current batch window size + cur_batch_size: u64, + /// Batch size limit + batch_limit: u64, /// The last log index that has been compacted pub(super) base_index: LogIndex, /// The last log term that has been compacted @@ -77,30 +126,7 @@ impl FallbackContext { } } -/// That's a struct to store log entries and calculate batch of log -#[derive(Debug)] -struct LogEntryVecDeque { - /// A VecDeque to store log entries, it will be serialized and persisted - entries: VecDeque>>, - /// The sum of serialized size of previous log entries - /// batch_index[i+1] = batch_index[i] + size(entries[i]) - batch_index: VecDeque, - /// Batch size limit - batch_limit: u64, -} - -impl LogEntryVecDeque { - /// return a log entries with cap - fn new(cap: usize, batch_limit: u64) -> Self { - let mut batch_index = VecDeque::with_capacity(cap.overflow_add(1)); - batch_index.push_back(0); - Self { - entries: VecDeque::with_capacity(cap), - batch_index, - batch_limit, - } - } - +impl Log { /// Shortens the log entries, keeping the first `len` elements and dropping /// the rest. /// `batch_index` will keep len+1 elem @@ -113,19 +139,54 @@ impl LogEntryVecDeque { fn push_back(&mut self, entry: Arc>) -> Result<(), bincode::Error> { let entry_size = serialized_size(&entry)?; + if entry_size > self.batch_limit { + warn!("entry_size of an entry > batch_limit, which may be too small.",); + } + self.entries.push_back(entry); - let Some(&pre_entries_size) = self.batch_index.back() else { - unreachable!("batch_index cannot be None") - }; - self.batch_index - .push_back(pre_entries_size.overflow_add(entry_size)); + self.entry_size.push_back(entry_size); + self.batch_index.push_back(0); // placeholder + self.cur_batch_size += entry_size; + + // it's safe to do so: + // 1. The `self.first_entry_at_cur_batch` is always less than `self.batch_index.len()` + // 2. The `self.entries.len()` is always larger than or equal to 1 + // 3. When the condition `self.cur_batch_size > self.batch_limit && entry < self.batch_limit` is met, the `self.entries.len()` is always larger than or equal to 2 + #[allow(clippy::indexing_slicing)] + while self.cur_batch_size >= self.batch_limit { + self.batch_index[self.first_entry_at_cur_batch] = if self.cur_batch_size == self.batch_limit || entry_size > self.batch_limit { + self.entries[self.entries.len() - 1].index + } else { + self.entries[self.entries.len() - 2].index + }; + self.cur_batch_size -= self.entry_size[self.first_entry_at_cur_batch]; + self.first_entry_at_cur_batch += 1; + } Ok(()) } /// pop a log entry from the front of queue fn pop_front(&mut self) -> Option>> { if self.entries.front().is_some() { - _ = self.batch_index.pop_front(); + let front_size = *self + .entry_size + .front() + .unwrap_or_else(|| unreachable!("The entry_size cannot be empty")); + + if self.first_entry_at_cur_batch == 0 { + self.cur_batch_size -= front_size; + } else { + self.first_entry_at_cur_batch -= 1; + } + + let _ = self + .batch_index + .pop_front() + .unwrap_or_else(|| unreachable!("The batch_index cannot be empty")); + let _ = self + .entry_size + .pop_front() + .unwrap_or_else(|| unreachable!("The pop_front cannot be empty")); self.entries.pop_front() } else { None @@ -134,63 +195,45 @@ impl LogEntryVecDeque { /// restore log entries from Vec fn restore(&mut self, entries: Vec>) { - let mut batch_index = VecDeque::with_capacity(entries.capacity()); - batch_index.push_back(0); - for entry in &entries { - #[allow(clippy::expect_used)] - let entry_size = - serialized_size(entry).expect("log entry {entry:?} cannot be serialized"); - if let Some(cur_size) = batch_index.back() { - batch_index.push_back(cur_size.overflow_add(entry_size)); - } - } + self.batch_index = VecDeque::with_capacity(entries.capacity()); + self.entries = VecDeque::with_capacity(entries.capacity()); + self.entry_size = VecDeque::with_capacity(entries.capacity()); - self.entries = entries.into_iter().map(Arc::new).collect(); - self.batch_index = batch_index; + self.cur_batch_size = 0; + self.first_entry_at_cur_batch = 0; + + for entry in entries { + let _unuse = self.push_back(Arc::from(entry)); + } } /// clear whole log entries fn clear(&mut self) { self.entries.clear(); + self.entry_size.clear(); self.batch_index.clear(); - self.batch_index.push_back(0); + self.cur_batch_size = 0; + self.first_entry_at_cur_batch = 0; } /// Get the range [left, right) of the log entry, whose size should be equal or smaller than `batch_limit` - fn get_range_by_batch(&self, left: usize) -> Range { - #[allow(clippy::indexing_slicing)] - let target = self.batch_index[left].overflow_add(self.batch_limit); - // remove the fake index 0 in `batch_index` - match self.batch_index.binary_search(&target) { - Ok(right) => left..right, - Err(right) => left..right - 1, + #[allow(clippy::indexing_slicing)] // it's safe to do so since we validate `left` at very begin place + fn get_range_by_batch(&self, left: usize) -> LogRange { + if left >= self.batch_index.len() { + return LogRange::Range(self.batch_index.len()..self.batch_index.len() ); } - } - /// Get a range of log entry - fn get_from(&self, left: usize) -> Vec>> { - let range = self.get_range_by_batch(left); - self.entries.range(range).cloned().collect_vec() - } - - /// check whether the log entry range [li,..) exceeds the batch limit or not - fn has_next_batch(&self, left: usize) -> bool { - if let (Some(&cur_size), Some(&last_size)) = - (self.batch_index.get(left), self.batch_index.back()) - { - let target_size = cur_size.overflow_add(self.batch_limit); - target_size <= last_size - } else { - false + if self.entry_size[left] == self.batch_limit { + return LogRange::RangeInclusive(left..=left); } - } -} -impl std::ops::Deref for LogEntryVecDeque { - type Target = VecDeque>>; + let right = if self.batch_index[left] == 0 { + self.entries.len() - 1 + } else { + self.li_to_pi(self.batch_index[left]) + }; - fn deref(&self) -> &Self::Target { - &self.entries + LogRange::RangeInclusive(left..=right) } } @@ -220,15 +263,20 @@ impl Log { entries_cap: usize, ) -> Self { Self { - entries: LogEntryVecDeque::new(entries_cap, batch_limit), + entries: VecDeque::with_capacity(entries_cap), + entry_size: VecDeque::with_capacity(entries_cap), + batch_index: VecDeque::with_capacity(entries_cap), + first_entry_at_cur_batch: 0, + cur_batch_size: 0, + batch_limit, commit_index: 0, base_index: 0, base_term: 0, last_as: 0, last_exe: 0, log_tx, - entries_cap, fallback_contexts: HashMap::new(), + entries_cap, } } @@ -301,7 +349,7 @@ impl Log { } } // Truncate entries - self.entries.truncate(pi); + self.truncate(pi); // Push the remaining entries and record the conf change entries for entry in entries .into_iter() @@ -312,8 +360,7 @@ impl Log { conf_changes.push(Arc::clone(&entry)); } #[allow(clippy::expect_used)] // It's safe to expect here. - self.entries - .push_back(Arc::clone(&entry)) + self.push_back(Arc::clone(&entry)) .expect("log entry {entry:?} cannot be serialized"); self.send_persist(entry); @@ -349,21 +396,27 @@ impl Log { ) -> Result>, bincode::Error> { let index = self.last_log_index() + 1; let entry = Arc::new(LogEntry::new(index, term, propose_id, entry)); - self.entries.push_back(Arc::clone(&entry))?; + self.push_back(Arc::clone(&entry))?; self.send_persist(Arc::clone(&entry)); Ok(entry) } /// check whether the log entry range [li,..) exceeds the batch limit or not + #[allow(clippy::indexing_slicing)] // it's safe to do so since the length of `batch_index` is always same as `entry_size` pub(super) fn has_next_batch(&self, li: u64) -> bool { - let idx = self.li_to_pi(li); - self.entries.has_next_batch(idx) + let left = self.li_to_pi(li); + if let Some(&batch_end) = self.batch_index.get(left) { + batch_end != 0 || self.entry_size[left] == self.batch_limit + } else { + false + } } /// Get a range of log entry pub(super) fn get_from(&self, li: LogIndex) -> Vec>> { - let left_bound = self.li_to_pi(li); - self.entries.get_from(left_bound) + let left = self.li_to_pi(li); + let range = self.get_range_by_batch(left); + self.entries.range(range).cloned().collect_vec() } /// Get existing cmd ids @@ -391,13 +444,13 @@ impl Log { self.last_as = meta.last_included_index; self.last_exe = meta.last_included_index; self.commit_index = meta.last_included_index; - self.entries.clear(); + self.clear(); } /// Restore log entries, provided entries must be in order pub(super) fn restore_entries(&mut self, entries: Vec>) { // restore batch index - self.entries.restore(entries); + self.restore(entries); self.compact(); } @@ -419,7 +472,8 @@ impl Log { .front() .map_or(false, |e| e.index <= compact_from) { - match self.entries.pop_front() { + let res = self.pop_front(); + match res { Some(entry) => { self.base_index = entry.index; self.base_term = entry.term; @@ -448,6 +502,24 @@ impl Log { false }); } + + #[cfg(test)] + /// set batch limit and reconstruct `batch_index` + pub(super) fn set_batch_limit(&mut self, batch_limit: u64) { + #![allow(clippy::indexing_slicing)] + self.batch_limit = batch_limit; + self.cur_batch_size = 0; + self.first_entry_at_cur_batch = 0; + self.batch_index.clear(); + self.entry_size.clear(); + + let prev_entries = self.entries.clone(); + self.entries.clear(); + + let _unused = prev_entries.into_iter().for_each(|item| { + let _u = self.push_back(item); + }); + } } #[cfg(test)] @@ -469,10 +541,6 @@ mod tests { } } - fn set_batch_limit(log: &mut Log, batch_limit: u64) { - log.entries.batch_limit = batch_limit; - } - #[test] fn test_log_up_to_date() { let (log_tx, _log_rx) = mpsc::unbounded_channel(); @@ -575,68 +643,68 @@ mod tests { .enumerate() .map(|(idx, cmd)| log.push(1, ProposeId(0, idx.numeric_cast()), cmd).unwrap()) .collect::>(); - let log_entry_size = log.entries.batch_index[1]; + let log_entry_size = log.entry_size[0]; - set_batch_limit(&mut log, 3 * log_entry_size - 1); - let bound_1 = log.entries.get_range_by_batch(3); + log.set_batch_limit(3 * log_entry_size - 1); + let bound_1 = log.get_range_by_batch(3); assert_eq!( bound_1, - 3..5, + LogRange::RangeInclusive(3..=4), "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + log.batch_index, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(8)); assert!(!log.has_next_batch(9)); - set_batch_limit(&mut log, 4 * log_entry_size); - let bound_2 = log.entries.get_range_by_batch(3); + log.set_batch_limit(4 * log_entry_size); + let bound_2 = log.get_range_by_batch(3); assert_eq!( bound_2, - 3..7, + LogRange::RangeInclusive(3..=6), "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + log.batch_index, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(7)); assert!(!log.has_next_batch(8)); - set_batch_limit(&mut log, 5 * log_entry_size + 2); - let bound_3 = log.entries.get_range_by_batch(3); + log.set_batch_limit(5 * log_entry_size + 2); + let bound_3 = log.get_range_by_batch(3); assert_eq!( bound_3, - 3..8, + LogRange::RangeInclusive(3..=7), "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + log.batch_index, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(5)); assert!(!log.has_next_batch(6)); - set_batch_limit(&mut log, 100 * log_entry_size); - let bound_4 = log.entries.get_range_by_batch(3); + log.set_batch_limit(100 * log_entry_size); + let bound_4 = log.get_range_by_batch(3); assert_eq!( bound_4, - 3..10, + LogRange::RangeInclusive(3..=9), "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + log.batch_index, + log.batch_limit, log_entry_size ); assert!(!log.has_next_batch(1)); assert!(!log.has_next_batch(5)); - set_batch_limit(&mut log, log_entry_size - 1); - let bound_5 = log.entries.get_range_by_batch(3); + log.set_batch_limit(log_entry_size - 1); + let bound_5 = log.get_range_by_batch(3); assert_eq!( bound_5, - 3..3, + LogRange::RangeInclusive(3..=3), "batch_index = {:?}, batch = {}, log_entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, + log.batch_index, + log.batch_limit, log_entry_size ); assert!(log.has_next_batch(10)); @@ -664,24 +732,8 @@ mod tests { log.restore_entries(entries); assert_eq!(log.entries.len(), 10); - assert_eq!(log.entries.batch_index.len(), 11); - assert_eq!(log.entries.batch_index[0], 0); - let entry_size = log.entries.batch_index[1]; - - log.entries - .batch_index - .iter() - .enumerate() - .for_each(|(idx, &size)| { - assert_eq!( - size, - entry_size * idx.numeric_cast::(), - "batch_index = {:?}, batch = {}, entry_size = {}", - log.entries.batch_index, - log.entries.batch_limit, - entry_size - ); - }); + assert_eq!(log.batch_index.len(), 10); + assert_eq!(log.entry_size.len(), 10); } #[test] @@ -698,6 +750,45 @@ mod tests { log.compact(); assert_eq!(log.base_index, 12); assert_eq!(log.entries.front().unwrap().index, 13); - assert_eq!(log.entries.batch_index.len(), 19); + assert_eq!(log.batch_index.len(), 18); + } + + #[test] + fn get_from_should_success_after_compact() { + let (log_tx, _log_rx) = mpsc::unbounded_channel(); + let mut log = Log::::new(log_tx, default_batch_max_size(), 10); + for i in 0..30 { + log.push(0, ProposeId(0, i), Arc::new(TestCommand::default())) + .unwrap(); + } + let log_entry_size = log.entry_size[0]; + log.set_batch_limit(2 * log_entry_size); + log.last_as = 22; + log.last_exe = 22; + log.compact(); + assert_eq!(log.base_index, 12); + assert_eq!(log.entries.front().unwrap().index, 13); + assert_eq!(log.batch_index.len(), 18); + + let batch_1 = log.get_range_by_batch(3); + assert_eq!( + batch_1, + LogRange::RangeInclusive(3..=4), + "batch_index = {:?}, batch = {}, log_entry_size = {}", + log.batch_index, + log.batch_limit, + log_entry_size + ); + + let batch_2 = log.get_range_by_batch(1024); + assert_eq!( + batch_2, + LogRange::Range(18..18), + "batch_index = {:?}, batch = {}, log_entry_size = {}", + log.batch_index, + log.batch_limit, + log_entry_size + ); + assert!(log.has_next_batch(15)); } } diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index a129a9c71..b8e2437b5 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -47,9 +47,11 @@ use self::{ state::{CandidateState, LeaderState, State}, }; use super::{ - cmd_worker::CEEventTxApi, conflict::spec_pool_new::SpeculativePool, - conflict::uncommitted_pool::UncommittedPool, lease_manager::LeaseManagerRef, - storage::StorageApi, DB, + cmd_worker::CEEventTxApi, + conflict::{spec_pool_new::SpeculativePool, uncommitted_pool::UncommittedPool}, + lease_manager::LeaseManagerRef, + storage::StorageApi, + DB, }; use crate::{ cmd::Command,