Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce custom exex wal errors #11789

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ itertools.workspace = true
metrics.workspace = true
parking_lot.workspace = true
rmp-serde = "1.3"
thiserror.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
7 changes: 4 additions & 3 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ impl Clone for ExExManagerHandle {
#[cfg(test)]
mod tests {
use super::*;
use crate::wal::WalResult;
use alloy_primitives::B256;
use futures::{StreamExt, TryStreamExt};
use rand::Rng;
Expand Down Expand Up @@ -1278,15 +1279,15 @@ mod tests {
);
// WAL shouldn't contain the genesis notification, because it's finalized
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
[notification.clone()]
);

finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
[notification.clone()]
);

Expand All @@ -1300,7 +1301,7 @@ mod tests {
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
[notification]
);

Expand Down
29 changes: 29 additions & 0 deletions crates/exex/exex/src/wal/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//! Wal Errors

use std::path::PathBuf;

/// Wal Result type.
pub type WalResult<T> = Result<T, WalError>;

/// Wal Error types
#[derive(Debug, thiserror::Error)]
pub enum WalError {
/// Filesystem error at the path
#[error(transparent)]
FsPathError(#[from] reth_fs_util::FsPathError),
/// Directory entry reading error
#[error("failed to get {0} directory entry: {1}")]
DirEntry(PathBuf, std::io::Error),
/// Error when reading file metadata
#[error("failed to get metadata for file {0}: {1}")]
FileMetadata(u32, std::io::Error),
/// Parse error
#[error("failed to parse file name: {0}")]
Parse(String),
/// Notification not found error
#[error("notification {0} not found")]
FileNotFound(u32),
/// Decode error
#[error("failed to decode notification {0} from {1}: {2}")]
Decode(u32, PathBuf, rmp_serde::decode::Error),
}
28 changes: 15 additions & 13 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod storage;
pub use storage::Storage;
mod metrics;
use metrics::Metrics;
mod error;
pub use error::{WalError, WalResult};

use std::{
path::Path,
Expand Down Expand Up @@ -38,7 +40,7 @@ pub struct Wal {

impl Wal {
/// Creates a new instance of [`Wal`].
pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
pub fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
}

Expand All @@ -48,22 +50,22 @@ impl Wal {
}

/// Commits the notification to WAL.
pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
pub fn commit(&self, notification: &ExExNotification) -> WalResult<()> {
self.inner.commit(notification)
}

/// Finalizes the WAL up to the given canonical block, inclusive.
///
/// The caller should check that all ExExes are on the canonical chain and will not need any
/// blocks from the WAL below the provided block, inclusive.
pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
pub fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
self.inner.finalize(to_block)
}

/// Returns an iterator over all notifications in the WAL.
pub fn iter_notifications(
&self,
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> {
) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification>> + '_>> {
self.inner.iter_notifications()
}
}
Expand All @@ -80,7 +82,7 @@ struct WalInner {
}

impl WalInner {
fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
let mut wal = Self {
next_file_id: AtomicU32::new(0),
storage: Storage::new(directory)?,
Expand All @@ -97,7 +99,7 @@ impl WalInner {

/// Fills the block cache with the notifications from the storage.
#[instrument(skip(self))]
fn fill_block_cache(&mut self) -> eyre::Result<()> {
fn fill_block_cache(&mut self) -> WalResult<()> {
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);

Expand Down Expand Up @@ -132,7 +134,7 @@ impl WalInner {
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
))]
fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
fn commit(&self, notification: &ExExNotification) -> WalResult<()> {
let mut block_cache = self.block_cache.write();

let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
Expand All @@ -147,7 +149,7 @@ impl WalInner {
}

#[instrument(skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> {
let mut block_cache = self.block_cache.write();
let file_ids = block_cache.remove_before(to_block.number);

Expand Down Expand Up @@ -182,7 +184,7 @@ impl WalInner {
/// Returns an iterator over all notifications in the WAL.
fn iter_notifications(
&self,
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> {
) -> WalResult<Box<dyn Iterator<Item = WalResult<ExExNotification>> + '_>> {
let Some(range) = self.storage.files_range()? else {
return Ok(Box::new(std::iter::empty()))
};
Expand All @@ -202,7 +204,7 @@ impl WalHandle {
pub fn get_committed_notification_by_block_hash(
&self,
block_hash: &B256,
) -> eyre::Result<Option<ExExNotification>> {
) -> WalResult<Option<ExExNotification>> {
let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
else {
return Ok(None)
Expand All @@ -228,16 +230,16 @@ mod tests {
self, random_block, random_block_range, BlockParams, BlockRangeParams,
};

use crate::wal::{cache::CachedBlock, Wal};
use crate::wal::{cache::CachedBlock, error::WalResult, Wal};

fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
fn read_notifications(wal: &Wal) -> WalResult<Vec<ExExNotification>> {
let Some(files_range) = wal.inner.storage.files_range()? else { return Ok(Vec::new()) };

wal.inner
.storage
.iter_notifications(files_range)
.map(|entry| Ok(entry?.2))
.collect::<eyre::Result<_>>()
.collect::<WalResult<_>>()
}

fn sort_committed_blocks(
Expand Down
32 changes: 15 additions & 17 deletions crates/exex/exex/src/wal/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
path::{Path, PathBuf},
};

use eyre::OptionExt;
use crate::wal::{WalError, WalResult};
use reth_exex_types::ExExNotification;
use reth_tracing::tracing::debug;
use tracing::instrument;
Expand All @@ -24,7 +24,7 @@ pub struct Storage {
impl Storage {
/// Creates a new instance of [`Storage`] backed by the file at the given path and creates
/// it doesn't exist.
pub(super) fn new(path: impl AsRef<Path>) -> eyre::Result<Self> {
pub(super) fn new(path: impl AsRef<Path>) -> WalResult<Self> {
reth_fs_util::create_dir_all(&path)?;

Ok(Self { path: path.as_ref().to_path_buf() })
Expand All @@ -34,11 +34,11 @@ impl Storage {
self.path.join(format!("{id}.{FILE_EXTENSION}"))
}

fn parse_filename(filename: &str) -> eyre::Result<u32> {
fn parse_filename(filename: &str) -> WalResult<u32> {
filename
.strip_suffix(".wal")
.and_then(|s| s.parse().ok())
.ok_or_eyre(format!("failed to parse file name: {filename}"))
.ok_or_else(|| WalError::Parse(filename.to_string()))
}

/// Removes notification for the given file ID from the storage.
Expand Down Expand Up @@ -66,12 +66,12 @@ impl Storage {
/// Returns the range of file IDs in the storage.
///
/// If there are no files in the storage, returns `None`.
pub(super) fn files_range(&self) -> eyre::Result<Option<RangeInclusive<u32>>> {
pub(super) fn files_range(&self) -> WalResult<Option<RangeInclusive<u32>>> {
let mut min_id = None;
let mut max_id = None;

for entry in reth_fs_util::read_dir(&self.path)? {
let entry = entry?;
let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?;

if entry.path().extension() == Some(FILE_EXTENSION.as_ref()) {
let file_name = entry.file_name();
Expand All @@ -93,7 +93,7 @@ impl Storage {
pub(super) fn remove_notifications(
&self,
file_ids: impl IntoIterator<Item = u32>,
) -> eyre::Result<(usize, u64)> {
) -> WalResult<(usize, u64)> {
let mut deleted_total = 0;
let mut deleted_size = 0;

Expand All @@ -110,10 +110,10 @@ impl Storage {
pub(super) fn iter_notifications(
&self,
range: RangeInclusive<u32>,
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification)>> + '_ {
) -> impl Iterator<Item = WalResult<(u32, u64, ExExNotification)>> + '_ {
range.map(move |id| {
let (notification, size) =
self.read_notification(id)?.ok_or_eyre("notification {id} not found")?;
self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?;

Ok((id, size, notification))
})
Expand All @@ -124,7 +124,7 @@ impl Storage {
pub(super) fn read_notification(
&self,
file_id: u32,
) -> eyre::Result<Option<(ExExNotification, u64)>> {
) -> WalResult<Option<(ExExNotification, u64)>> {
let file_path = self.file_path(file_id);
debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");

Expand All @@ -133,14 +133,12 @@ impl Storage {
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
};
let size = file.metadata()?.len();
let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();

// Deserialize using the bincode- and msgpack-compatible serde wrapper
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> =
rmp_serde::decode::from_read(&mut file).map_err(|err| {
eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}")
})?;

rmp_serde::decode::from_read(&mut file)
.map_err(|err| WalError::Decode(file_id, file_path, err))?;
Ok(Some((notification.into(), size)))
}

Expand All @@ -154,7 +152,7 @@ impl Storage {
&self,
file_id: u32,
notification: &ExExNotification,
) -> eyre::Result<u64> {
) -> WalResult<u64> {
let file_path = self.file_path(file_id);
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");

Expand All @@ -166,7 +164,7 @@ impl Storage {
rmp_serde::encode::write(file, &notification)
})?;

Ok(file_path.metadata()?.len())
Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
}
}

Expand Down
Loading