From 98ab9ac26fa99675aff43bbb2de8a55a47e870d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 6 Jul 2024 10:31:21 +0800 Subject: [PATCH] Refactor: Remove `RaftLogReader` trait dependency from `RaftLogStorage` This commit removes the dependency of the `RaftLogReader` trait from the `RaftLogStorage` interface in Openraft. `trait RaftLogStorage: RaftLogReader + ...` becomes `trait RaftLogStorage: ...`. This change reflects the infrequent need for reading log entries directly from storage, as it primarily occurs when applying logs to the state machine. Inside Openraft, logs can now be read by acquiring a reader via `RaftLogStorage.get_log_reader().await.xxx()`, rather than requiring `RaftLogStorage` to implement the `RaftLogReader` trait. Upgrade(non-breaking) tip: Implementations are advised (though it is not mandatory) to remove the `impl RaftLogReader for YourRaftLogStore` from their codebase. - Fix: #1118 --- openraft/src/core/raft_core.rs | 2 +- openraft/src/storage/helper.rs | 11 ++-- .../{log_store_ext.rs => log_reader_ext.rs} | 0 openraft/src/storage/mod.rs | 4 +- openraft/src/storage/v2.rs | 2 +- openraft/src/testing/suite.rs | 51 +++++++++++++++++++ .../append_entries/t11_append_conflicts.rs | 2 +- 7 files changed, 63 insertions(+), 9 deletions(-) rename openraft/src/storage/{log_store_ext.rs => log_reader_ext.rs} (100%) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 28507c480..267859b29 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -717,7 +717,7 @@ where return Ok(()); } - let entries = self.log_store.get_log_entries(since..end).await?; + let entries = self.log_store.get_log_reader().await.get_log_entries(since..end).await?; tracing::debug!( entries = display(DisplaySlice::<_>(entries.as_slice())), "about to apply" diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 4464541d0..44c48b4a0 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -15,6 +15,7 @@ use crate::utime::UTime; use crate::EffectiveMembership; use crate::LogIdOptionExt; use crate::MembershipState; +use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftState; use crate::RaftTypeConfig; @@ -59,7 +60,8 @@ where /// When the Raft node is first started, it will call this interface to fetch the last known /// state from stable storage. pub async fn get_initial_state(&mut self) -> Result, StorageError> { - let vote = self.log_store.read_vote().await?; + let mut log_reader = self.log_store.get_log_reader().await; + let vote = log_reader.read_vote().await?; let vote = vote.unwrap_or_default(); let mut committed = self.log_store.read_committed().await?; @@ -92,7 +94,7 @@ where tracing::info!("re-apply log {}..{} to state machine", start, end); - let entries = self.log_store.get_log_entries(start..end).await?; + let entries = log_reader.get_log_entries(start..end).await?; self.state_machine.apply(entries).await?; last_applied = committed; @@ -119,7 +121,7 @@ where last_purged_log_id.display(), last_log_id.display() ); - let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?; + let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, &mut log_reader).await?; let snapshot = self.state_machine.get_current_snapshot().await?; @@ -234,10 +236,11 @@ where let step = 64; let mut res = vec![]; + let mut log_reader = self.log_store.get_log_reader().await; while start < end { let step_start = std::cmp::max(start, end.saturating_sub(step)); - let entries = self.log_store.try_get_log_entries(step_start..end).await?; + let entries = log_reader.try_get_log_entries(step_start..end).await?; for ent in entries.iter().rev() { if let Some(mem) = ent.get_membership() { diff --git a/openraft/src/storage/log_store_ext.rs b/openraft/src/storage/log_reader_ext.rs similarity index 100% rename from openraft/src/storage/log_store_ext.rs rename to openraft/src/storage/log_reader_ext.rs diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index e26a4cdc2..d9aec2111 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -2,7 +2,7 @@ mod callback; mod helper; -mod log_store_ext; +mod log_reader_ext; mod snapshot_signature; mod v2; @@ -11,7 +11,7 @@ use std::fmt::Debug; use std::ops::RangeBounds; pub use helper::StorageHelper; -pub use log_store_ext::RaftLogReaderExt; +pub use log_reader_ext::RaftLogReaderExt; use openraft_macros::add_async_trait; use openraft_macros::since; pub use snapshot_signature::SnapshotSignature; diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index 0fbb2b9d9..126a025d4 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -35,7 +35,7 @@ use crate::Vote; /// write request before a former write request is completed. This rule applies to both `vote` and /// `log` IO. E.g., Saving a vote and appending a log entry must be serialized too. #[add_async_trait] -pub trait RaftLogStorage: RaftLogReader + OptionalSend + OptionalSync + 'static +pub trait RaftLogStorage: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Log reader type. diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 927895c41..475125d4c 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -2,6 +2,7 @@ use std::collections::BTreeSet; use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; +use std::ops::RangeBounds; use std::time::Duration; use anyerror::AnyError; @@ -26,6 +27,8 @@ use crate::LogId; use crate::Membership; use crate::NodeId; use crate::OptionalSend; +use crate::OptionalSync; +use crate::RaftLogReader; use crate::RaftSnapshotBuilder; use crate::RaftTypeConfig; use crate::StorageError; @@ -45,6 +48,54 @@ macro_rules! btreeset { }}; } +/// Allows [`RaftLogStorage`] to access methods provided by [`RaftLogReader`] in ths test. +trait ReaderExt: RaftLogStorage +where C: RaftTypeConfig +{ + /// Proxy method to invoke [`RaftLogReaderExt::get_log_id`]. + async fn get_log_id(&mut self, log_index: u64) -> Result, StorageError> { + self.get_log_reader().await.get_log_id(log_index).await + } + + /// Proxy method to invoke [`RaftLogReaderExt::try_get_log_entry`]. + async fn try_get_log_entry(&mut self, log_index: u64) -> Result, StorageError> { + self.get_log_reader().await.try_get_log_entry(log_index).await + } + + /// Proxy method to invoke [`RaftLogReaderExt::get_log_entries`]. + async fn get_log_entries + Clone + Debug + OptionalSend + OptionalSync>( + &mut self, + range: RB, + ) -> Result, StorageError> { + self.get_log_reader().await.get_log_entries(range).await + } + + /// Proxy method to invoke [`RaftLogReader::try_get_log_entries`]. + async fn try_get_log_entries + Clone + Debug + OptionalSend>( + &mut self, + range: RB, + ) -> Result, StorageError> { + self.get_log_reader().await.try_get_log_entries(range).await + } + + /// Proxy method to invoke [`RaftLogReader::read_vote`]. + async fn read_vote(&mut self) -> Result>, StorageError> { + self.get_log_reader().await.read_vote().await + } + + /// Proxy method to invoke [`RaftLogReader::limited_get_log_entries`]. + async fn limited_get_log_entries(&mut self, start: u64, end: u64) -> Result, StorageError> { + self.get_log_reader().await.limited_get_log_entries(start, end).await + } +} + +impl ReaderExt for S +where + C: RaftTypeConfig, + S: RaftLogStorage, +{ +} + /// Test suite to ensure a `RaftStore` impl works as expected. /// /// Additional traits are required to be implemented by the store builder for testing: diff --git a/tests/tests/append_entries/t11_append_conflicts.rs b/tests/tests/append_entries/t11_append_conflicts.rs index af3f68394..48e6e5367 100644 --- a/tests/tests/append_entries/t11_append_conflicts.rs +++ b/tests/tests/append_entries/t11_append_conflicts.rs @@ -227,7 +227,7 @@ where C: RaftTypeConfig, LS: RaftLogStorage, { - let logs = log_store.get_log_entries(..).await?; + let logs = log_store.get_log_reader().await.get_log_entries(..).await?; let skip = 0; let want: Vec> = terms .iter()