diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 28507c480..267859b29 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -717,7 +717,7 @@ where return Ok(()); } - let entries = self.log_store.get_log_entries(since..end).await?; + let entries = self.log_store.get_log_reader().await.get_log_entries(since..end).await?; tracing::debug!( entries = display(DisplaySlice::<_>(entries.as_slice())), "about to apply" diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 4464541d0..44c48b4a0 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -15,6 +15,7 @@ use crate::utime::UTime; use crate::EffectiveMembership; use crate::LogIdOptionExt; use crate::MembershipState; +use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftState; use crate::RaftTypeConfig; @@ -59,7 +60,8 @@ where /// When the Raft node is first started, it will call this interface to fetch the last known /// state from stable storage. pub async fn get_initial_state(&mut self) -> Result, StorageError> { - let vote = self.log_store.read_vote().await?; + let mut log_reader = self.log_store.get_log_reader().await; + let vote = log_reader.read_vote().await?; let vote = vote.unwrap_or_default(); let mut committed = self.log_store.read_committed().await?; @@ -92,7 +94,7 @@ where tracing::info!("re-apply log {}..{} to state machine", start, end); - let entries = self.log_store.get_log_entries(start..end).await?; + let entries = log_reader.get_log_entries(start..end).await?; self.state_machine.apply(entries).await?; last_applied = committed; @@ -119,7 +121,7 @@ where last_purged_log_id.display(), last_log_id.display() ); - let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?; + let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, &mut log_reader).await?; let snapshot = self.state_machine.get_current_snapshot().await?; @@ -234,10 +236,11 @@ where let step = 64; let mut res = vec![]; + let mut log_reader = self.log_store.get_log_reader().await; while start < end { let step_start = std::cmp::max(start, end.saturating_sub(step)); - let entries = self.log_store.try_get_log_entries(step_start..end).await?; + let entries = log_reader.try_get_log_entries(step_start..end).await?; for ent in entries.iter().rev() { if let Some(mem) = ent.get_membership() { diff --git a/openraft/src/storage/log_store_ext.rs b/openraft/src/storage/log_reader_ext.rs similarity index 100% rename from openraft/src/storage/log_store_ext.rs rename to openraft/src/storage/log_reader_ext.rs diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index e26a4cdc2..d9aec2111 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -2,7 +2,7 @@ mod callback; mod helper; -mod log_store_ext; +mod log_reader_ext; mod snapshot_signature; mod v2; @@ -11,7 +11,7 @@ use std::fmt::Debug; use std::ops::RangeBounds; pub use helper::StorageHelper; -pub use log_store_ext::RaftLogReaderExt; +pub use log_reader_ext::RaftLogReaderExt; use openraft_macros::add_async_trait; use openraft_macros::since; pub use snapshot_signature::SnapshotSignature; diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index 0fbb2b9d9..126a025d4 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -35,7 +35,7 @@ use crate::Vote; /// write request before a former write request is completed. This rule applies to both `vote` and /// `log` IO. E.g., Saving a vote and appending a log entry must be serialized too. #[add_async_trait] -pub trait RaftLogStorage: RaftLogReader + OptionalSend + OptionalSync + 'static +pub trait RaftLogStorage: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Log reader type. diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 927895c41..475125d4c 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -2,6 +2,7 @@ use std::collections::BTreeSet; use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; +use std::ops::RangeBounds; use std::time::Duration; use anyerror::AnyError; @@ -26,6 +27,8 @@ use crate::LogId; use crate::Membership; use crate::NodeId; use crate::OptionalSend; +use crate::OptionalSync; +use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftTypeConfig; use crate::StorageError; @@ -45,6 +48,54 @@ macro_rules! btreeset { }}; } +/// Allows [`RaftLogStorage`] to access methods provided by [`RaftLogReader`] in ths test. +trait ReaderExt: RaftLogStorage +where C: RaftTypeConfig +{ + /// Proxy method to invoke [`RaftLogReaderExt::get_log_id`]. + async fn get_log_id(&mut self, log_index: u64) -> Result, StorageError> { + self.get_log_reader().await.get_log_id(log_index).await + } + + /// Proxy method to invoke [`RaftLogReaderExt::try_get_log_entry`]. + async fn try_get_log_entry(&mut self, log_index: u64) -> Result, StorageError> { + self.get_log_reader().await.try_get_log_entry(log_index).await + } + + /// Proxy method to invoke [`RaftLogReaderExt::get_log_entries`]. + async fn get_log_entries + Clone + Debug + OptionalSend + OptionalSync>( + &mut self, + range: RB, + ) -> Result, StorageError> { + self.get_log_reader().await.get_log_entries(range).await + } + + /// Proxy method to invoke [`RaftLogReader::try_get_log_entries`]. + async fn try_get_log_entries + Clone + Debug + OptionalSend>( + &mut self, + range: RB, + ) -> Result, StorageError> { + self.get_log_reader().await.try_get_log_entries(range).await + } + + /// Proxy method to invoke [`RaftLogReader::read_vote`]. + async fn read_vote(&mut self) -> Result>, StorageError> { + self.get_log_reader().await.read_vote().await + } + + /// Proxy method to invoke [`RaftLogReader::limited_get_log_entries`]. + async fn limited_get_log_entries(&mut self, start: u64, end: u64) -> Result, StorageError> { + self.get_log_reader().await.limited_get_log_entries(start, end).await + } +} + +impl ReaderExt for S +where + C: RaftTypeConfig, + S: RaftLogStorage, +{ +} + /// Test suite to ensure a `RaftStore` impl works as expected. /// /// Additional traits are required to be implemented by the store builder for testing: diff --git a/tests/tests/append_entries/t11_append_conflicts.rs b/tests/tests/append_entries/t11_append_conflicts.rs index af3f68394..48e6e5367 100644 --- a/tests/tests/append_entries/t11_append_conflicts.rs +++ b/tests/tests/append_entries/t11_append_conflicts.rs @@ -227,7 +227,7 @@ where C: RaftTypeConfig, LS: RaftLogStorage, { - let logs = log_store.get_log_entries(..).await?; + let logs = log_store.get_log_reader().await.get_log_entries(..).await?; let skip = 0; let want: Vec> = terms .iter()