Skip to content

Commit

Permalink
Refactor: Remove RaftLogReader trait dependency from RaftLogStorage
Browse files Browse the repository at this point in the history
This commit removes the dependency of the `RaftLogReader` trait from the
`RaftLogStorage` interface in Openraft.

`trait RaftLogStorage<C>: RaftLogReader<C> + ...` becomes
`trait RaftLogStorage<C>: ...`.

This change reflects the infrequent need for reading log entries
directly from storage, as it primarily occurs when applying logs to the
state machine.

Inside Openraft, logs can now be read by acquiring a reader via
`RaftLogStorage.get_log_reader().await.xxx()`, rather than requiring
`RaftLogStorage` to implement the `RaftLogReader` trait.

Upgrade(non-breaking) tip:

Implementations are advised (though it is not mandatory) to remove the
`impl RaftLogReader for YourRaftLogStore` from their codebase.

- Fix: databendlabs#1118
  • Loading branch information
drmingdrmer committed Jul 6, 2024
1 parent b60e7dc commit 98ab9ac
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 9 deletions.
2 changes: 1 addition & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ where
return Ok(());
}

let entries = self.log_store.get_log_entries(since..end).await?;
let entries = self.log_store.get_log_reader().await.get_log_entries(since..end).await?;
tracing::debug!(
entries = display(DisplaySlice::<_>(entries.as_slice())),
"about to apply"
Expand Down
11 changes: 7 additions & 4 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::utime::UTime;
use crate::EffectiveMembership;
use crate::LogIdOptionExt;
use crate::MembershipState;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftState;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -59,7 +60,8 @@ where
/// When the Raft node is first started, it will call this interface to fetch the last known
/// state from stable storage.
pub async fn get_initial_state(&mut self) -> Result<RaftState<C>, StorageError<C>> {
let vote = self.log_store.read_vote().await?;
let mut log_reader = self.log_store.get_log_reader().await;
let vote = log_reader.read_vote().await?;
let vote = vote.unwrap_or_default();

let mut committed = self.log_store.read_committed().await?;
Expand Down Expand Up @@ -92,7 +94,7 @@ where

tracing::info!("re-apply log {}..{} to state machine", start, end);

let entries = self.log_store.get_log_entries(start..end).await?;
let entries = log_reader.get_log_entries(start..end).await?;
self.state_machine.apply(entries).await?;

last_applied = committed;
Expand All @@ -119,7 +121,7 @@ where
last_purged_log_id.display(),
last_log_id.display()
);
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, &mut log_reader).await?;

let snapshot = self.state_machine.get_current_snapshot().await?;

Expand Down Expand Up @@ -234,10 +236,11 @@ where
let step = 64;

let mut res = vec![];
let mut log_reader = self.log_store.get_log_reader().await;

while start < end {
let step_start = std::cmp::max(start, end.saturating_sub(step));
let entries = self.log_store.try_get_log_entries(step_start..end).await?;
let entries = log_reader.try_get_log_entries(step_start..end).await?;

for ent in entries.iter().rev() {
if let Some(mem) = ent.get_membership() {
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

mod callback;
mod helper;
mod log_store_ext;
mod log_reader_ext;
mod snapshot_signature;
mod v2;

Expand All @@ -11,7 +11,7 @@ use std::fmt::Debug;
use std::ops::RangeBounds;

pub use helper::StorageHelper;
pub use log_store_ext::RaftLogReaderExt;
pub use log_reader_ext::RaftLogReaderExt;
use openraft_macros::add_async_trait;
use openraft_macros::since;
pub use snapshot_signature::SnapshotSignature;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::Vote;
/// write request before a former write request is completed. This rule applies to both `vote` and
/// `log` IO. E.g., Saving a vote and appending a log entry must be serialized too.
#[add_async_trait]
pub trait RaftLogStorage<C>: RaftLogReader<C> + OptionalSend + OptionalSync + 'static
pub trait RaftLogStorage<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Log reader type.
Expand Down
51 changes: 51 additions & 0 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeSet;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::RangeBounds;
use std::time::Duration;

use anyerror::AnyError;
Expand All @@ -26,6 +27,8 @@ use crate::LogId;
use crate::Membership;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
use crate::StorageError;
Expand All @@ -45,6 +48,54 @@ macro_rules! btreeset {
}};
}

/// Allows [`RaftLogStorage`] to access methods provided by [`RaftLogReader`] in ths test.
trait ReaderExt<C>: RaftLogStorage<C>
where C: RaftTypeConfig
{
/// Proxy method to invoke [`RaftLogReaderExt::get_log_id`].
async fn get_log_id(&mut self, log_index: u64) -> Result<LogId<C::NodeId>, StorageError<C>> {
self.get_log_reader().await.get_log_id(log_index).await
}

/// Proxy method to invoke [`RaftLogReaderExt::try_get_log_entry`].
async fn try_get_log_entry(&mut self, log_index: u64) -> Result<Option<C::Entry>, StorageError<C>> {
self.get_log_reader().await.try_get_log_entry(log_index).await
}

/// Proxy method to invoke [`RaftLogReaderExt::get_log_entries`].
async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C>> {
self.get_log_reader().await.get_log_entries(range).await
}

/// Proxy method to invoke [`RaftLogReader::try_get_log_entries`].
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C>> {
self.get_log_reader().await.try_get_log_entries(range).await
}

/// Proxy method to invoke [`RaftLogReader::read_vote`].
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
self.get_log_reader().await.read_vote().await
}

/// Proxy method to invoke [`RaftLogReader::limited_get_log_entries`].
async fn limited_get_log_entries(&mut self, start: u64, end: u64) -> Result<Vec<C::Entry>, StorageError<C>> {
self.get_log_reader().await.limited_get_log_entries(start, end).await
}
}

impl<C, S> ReaderExt<C> for S
where
C: RaftTypeConfig,
S: RaftLogStorage<C>,
{
}

/// Test suite to ensure a `RaftStore` impl works as expected.
///
/// Additional traits are required to be implemented by the store builder for testing:
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/append_entries/t11_append_conflicts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ where
C: RaftTypeConfig,
LS: RaftLogStorage<C>,
{
let logs = log_store.get_log_entries(..).await?;
let logs = log_store.get_log_reader().await.get_log_entries(..).await?;
let skip = 0;
let want: Vec<Entry<openraft_memstore::TypeConfig>> = terms
.iter()
Expand Down

0 comments on commit 98ab9ac

Please sign in to comment.