From 886119da73753711c3f7a8865692276b1bca06b4 Mon Sep 17 00:00:00 2001 From: caglarkaya Date: Wed, 16 Oct 2024 14:03:02 +0300 Subject: [PATCH 1/2] introduce custom exex wal errors --- Cargo.lock | 1 + crates/exex/exex/Cargo.toml | 1 + crates/exex/exex/src/manager.rs | 7 +++-- crates/exex/exex/src/wal/error.rs | 32 ++++++++++++++++++++ crates/exex/exex/src/wal/mod.rs | 28 ++++++++++-------- crates/exex/exex/src/wal/storage.rs | 46 ++++++++++++++++------------- 6 files changed, 78 insertions(+), 37 deletions(-) create mode 100644 crates/exex/exex/src/wal/error.rs diff --git a/Cargo.lock b/Cargo.lock index d690cf536aa3..0e8c633e1f87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7450,6 +7450,7 @@ dependencies = [ "rmp-serde", "secp256k1", "tempfile", + "thiserror", "tokio", "tokio-util", "tracing", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 6a3815e4045b..aeacec8bd03f 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -47,6 +47,7 @@ itertools.workspace = true metrics.workspace = true parking_lot.workspace = true rmp-serde = "1.3" +thiserror.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 8c1518f3090f..36525fc2930d 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -635,6 +635,7 @@ impl Clone for ExExManagerHandle { #[cfg(test)] mod tests { use super::*; + use crate::wal::WalResult; use alloy_primitives::B256; use futures::{StreamExt, TryStreamExt}; use rand::Rng; @@ -1278,7 +1279,7 @@ mod tests { ); // WAL shouldn't contain the genesis notification, because it's finalized assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, + exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] ); @@ -1286,7 +1287,7 @@ mod tests { assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, + exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] ); @@ -1300,7 +1301,7 @@ mod tests { // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a // non-canonical block assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, + exex_manager.wal.iter_notifications()?.collect::>>()?, [notification] ); diff --git a/crates/exex/exex/src/wal/error.rs b/crates/exex/exex/src/wal/error.rs new file mode 100644 index 000000000000..f0adbc6b40e5 --- /dev/null +++ b/crates/exex/exex/src/wal/error.rs @@ -0,0 +1,32 @@ +//! Wal Errors + +use std::path::PathBuf; + +/// Wal Result type. +pub type WalResult = Result; + +/// Wal Error types +#[derive(Debug, thiserror::Error)] +pub enum WalError { + /// Directory operation `FsPathError` + #[error("directory operation error: {0}")] + DirFsPath(reth_fs_util::FsPathError), + /// Directory operation IO Error + #[error("directory operation error: {0}")] + DirIO(std::io::Error), + /// File operation `FsPathError` + #[error("file operation error on file with id {0}: {1}")] + FileFsPath(u32, reth_fs_util::FsPathError), + /// File operation IO Error + #[error("file operation error on file with id {0}: {1}")] + FileIO(u32, std::io::Error), + /// Parse error + #[error("failed to parse file name: {0}")] + Parse(String), + /// Notification not found error + #[error("notification {0} not found")] + FileNotFound(u32), + /// Decode error + #[error("failed to decode notification {0} from {1}: {2}")] + Decode(u32, PathBuf, rmp_serde::decode::Error), +} diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 00b0ea919ef6..9a764da125d6 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -6,6 +6,8 @@ mod storage; pub use storage::Storage; mod metrics; use metrics::Metrics; +mod error; +pub use error::{WalError, WalResult}; use std::{ path::Path, @@ -38,7 +40,7 @@ pub struct Wal { impl Wal { /// Creates a new instance of [`Wal`]. - pub fn new(directory: impl AsRef) -> eyre::Result { + pub fn new(directory: impl AsRef) -> WalResult { Ok(Self { inner: Arc::new(WalInner::new(directory)?) }) } @@ -48,7 +50,7 @@ impl Wal { } /// Commits the notification to WAL. - pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { + pub fn commit(&self, notification: &ExExNotification) -> WalResult<()> { self.inner.commit(notification) } @@ -56,14 +58,14 @@ impl Wal { /// /// The caller should check that all ExExes are on the canonical chain and will not need any /// blocks from the WAL below the provided block, inclusive. - pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { + pub fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> { self.inner.finalize(to_block) } /// Returns an iterator over all notifications in the WAL. pub fn iter_notifications( &self, - ) -> eyre::Result> + '_>> { + ) -> WalResult> + '_>> { self.inner.iter_notifications() } } @@ -80,7 +82,7 @@ struct WalInner { } impl WalInner { - fn new(directory: impl AsRef) -> eyre::Result { + fn new(directory: impl AsRef) -> WalResult { let mut wal = Self { next_file_id: AtomicU32::new(0), storage: Storage::new(directory)?, @@ -97,7 +99,7 @@ impl WalInner { /// Fills the block cache with the notifications from the storage. #[instrument(skip(self))] - fn fill_block_cache(&mut self) -> eyre::Result<()> { + fn fill_block_cache(&mut self) -> WalResult<()> { let Some(files_range) = self.storage.files_range()? else { return Ok(()) }; self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed); @@ -132,7 +134,7 @@ impl WalInner { reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) ))] - fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { + fn commit(&self, notification: &ExExNotification) -> WalResult<()> { let mut block_cache = self.block_cache.write(); let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); @@ -147,7 +149,7 @@ impl WalInner { } #[instrument(skip(self))] - fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { + fn finalize(&self, to_block: BlockNumHash) -> WalResult<()> { let mut block_cache = self.block_cache.write(); let file_ids = block_cache.remove_before(to_block.number); @@ -182,7 +184,7 @@ impl WalInner { /// Returns an iterator over all notifications in the WAL. fn iter_notifications( &self, - ) -> eyre::Result> + '_>> { + ) -> WalResult> + '_>> { let Some(range) = self.storage.files_range()? else { return Ok(Box::new(std::iter::empty())) }; @@ -202,7 +204,7 @@ impl WalHandle { pub fn get_committed_notification_by_block_hash( &self, block_hash: &B256, - ) -> eyre::Result> { + ) -> WalResult> { let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash) else { return Ok(None) @@ -228,16 +230,16 @@ mod tests { self, random_block, random_block_range, BlockParams, BlockRangeParams, }; - use crate::wal::{cache::CachedBlock, Wal}; + use crate::wal::{cache::CachedBlock, error::WalResult, Wal}; - fn read_notifications(wal: &Wal) -> eyre::Result> { + fn read_notifications(wal: &Wal) -> WalResult> { let Some(files_range) = wal.inner.storage.files_range()? else { return Ok(Vec::new()) }; wal.inner .storage .iter_notifications(files_range) .map(|entry| Ok(entry?.2)) - .collect::>() + .collect::>() } fn sort_committed_blocks( diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index af3a590e5860..c83645624248 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -4,7 +4,7 @@ use std::{ path::{Path, PathBuf}, }; -use eyre::OptionExt; +use crate::wal::{WalError, WalResult}; use reth_exex_types::ExExNotification; use reth_tracing::tracing::debug; use tracing::instrument; @@ -22,8 +22,8 @@ pub struct Storage { impl Storage { /// Creates a new instance of [`Storage`] backed by the file at the given path and creates /// it doesn't exist. - pub(super) fn new(path: impl AsRef) -> eyre::Result { - reth_fs_util::create_dir_all(&path)?; + pub(super) fn new(path: impl AsRef) -> WalResult { + reth_fs_util::create_dir_all(&path).map_err(WalError::DirFsPath)?; Ok(Self { path: path.as_ref().to_path_buf() }) } @@ -32,11 +32,11 @@ impl Storage { self.path.join(format!("{id}.wal")) } - fn parse_filename(filename: &str) -> eyre::Result { + fn parse_filename(filename: &str) -> WalResult { filename .strip_suffix(".wal") .and_then(|s| s.parse().ok()) - .ok_or_eyre(format!("failed to parse file name: {filename}")) + .ok_or_else(|| WalError::Parse(filename.to_string())) } /// Removes notification for the given file ID from the storage. @@ -64,12 +64,12 @@ impl Storage { /// Returns the range of file IDs in the storage. /// /// If there are no files in the storage, returns `None`. - pub(super) fn files_range(&self) -> eyre::Result>> { + pub(super) fn files_range(&self) -> WalResult>> { let mut min_id = None; let mut max_id = None; - for entry in reth_fs_util::read_dir(&self.path)? { - let entry = entry?; + for entry in reth_fs_util::read_dir(&self.path).map_err(WalError::DirFsPath)? { + let entry = entry.map_err(WalError::DirIO)?; let file_name = entry.file_name(); let file_id = Self::parse_filename(&file_name.to_string_lossy())?; @@ -88,7 +88,7 @@ impl Storage { pub(super) fn remove_notifications( &self, file_ids: impl IntoIterator, - ) -> eyre::Result<(usize, u64)> { + ) -> WalResult<(usize, u64)> { let mut deleted_total = 0; let mut deleted_size = 0; @@ -105,10 +105,10 @@ impl Storage { pub(super) fn iter_notifications( &self, range: RangeInclusive, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { range.map(move |id| { let (notification, size) = - self.read_notification(id)?.ok_or_eyre("notification {id} not found")?; + self.read_notification(id)?.ok_or(WalError::FileNotFound(id))?; Ok((id, size, notification)) }) @@ -119,23 +119,26 @@ impl Storage { pub(super) fn read_notification( &self, file_id: u32, - ) -> eyre::Result> { + ) -> WalResult> { let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL"); let mut file = match File::open(&file_path) { Ok(file) => file, Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()), + Err(err) => { + return Err(WalError::FileFsPath( + file_id, + reth_fs_util::FsPathError::open(err, &file_path), + )) + } }; - let size = file.metadata()?.len(); + let size = file.metadata().map_err(|err| WalError::FileIO(file_id, err))?.len(); // Deserialize using the bincode- and msgpack-compatible serde wrapper let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> = - rmp_serde::decode::from_read(&mut file).map_err(|err| { - eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}") - })?; - + rmp_serde::decode::from_read(&mut file) + .map_err(|err| WalError::Decode(file_id, file_path, err))?; Ok(Some((notification.into(), size))) } @@ -149,7 +152,7 @@ impl Storage { &self, file_id: u32, notification: &ExExNotification, - ) -> eyre::Result { + ) -> WalResult { let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL"); @@ -159,9 +162,10 @@ impl Storage { reth_fs_util::atomic_write_file(&file_path, |file| { rmp_serde::encode::write(file, ¬ification) - })?; + }) + .map_err(|err| WalError::FileFsPath(file_id, err))?; - Ok(file_path.metadata()?.len()) + Ok(file_path.metadata().map_err(|err| WalError::FileIO(file_id, err))?.len()) } } From b22826ccd51bd5eb16dd16cb19986df13c51008b Mon Sep 17 00:00:00 2001 From: caglarkaya Date: Thu, 17 Oct 2024 13:36:01 +0300 Subject: [PATCH 2/2] improve custom exex wal errors --- crates/exex/exex/src/wal/error.rs | 21 +++++++++------------ crates/exex/exex/src/wal/storage.rs | 20 +++++++------------- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/crates/exex/exex/src/wal/error.rs b/crates/exex/exex/src/wal/error.rs index f0adbc6b40e5..b091890f6ff8 100644 --- a/crates/exex/exex/src/wal/error.rs +++ b/crates/exex/exex/src/wal/error.rs @@ -8,18 +8,15 @@ pub type WalResult = Result; /// Wal Error types #[derive(Debug, thiserror::Error)] pub enum WalError { - /// Directory operation `FsPathError` - #[error("directory operation error: {0}")] - DirFsPath(reth_fs_util::FsPathError), - /// Directory operation IO Error - #[error("directory operation error: {0}")] - DirIO(std::io::Error), - /// File operation `FsPathError` - #[error("file operation error on file with id {0}: {1}")] - FileFsPath(u32, reth_fs_util::FsPathError), - /// File operation IO Error - #[error("file operation error on file with id {0}: {1}")] - FileIO(u32, std::io::Error), + /// Filesystem error at the path + #[error(transparent)] + FsPathError(#[from] reth_fs_util::FsPathError), + /// Directory entry reading error + #[error("failed to get {0} directory entry: {1}")] + DirEntry(PathBuf, std::io::Error), + /// Error when reading file metadata + #[error("failed to get metadata for file {0}: {1}")] + FileMetadata(u32, std::io::Error), /// Parse error #[error("failed to parse file name: {0}")] Parse(String), diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index c83645624248..6fb22bbfbbc1 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -23,7 +23,7 @@ impl Storage { /// Creates a new instance of [`Storage`] backed by the file at the given path and creates /// it doesn't exist. pub(super) fn new(path: impl AsRef) -> WalResult { - reth_fs_util::create_dir_all(&path).map_err(WalError::DirFsPath)?; + reth_fs_util::create_dir_all(&path)?; Ok(Self { path: path.as_ref().to_path_buf() }) } @@ -68,8 +68,8 @@ impl Storage { let mut min_id = None; let mut max_id = None; - for entry in reth_fs_util::read_dir(&self.path).map_err(WalError::DirFsPath)? { - let entry = entry.map_err(WalError::DirIO)?; + for entry in reth_fs_util::read_dir(&self.path)? { + let entry = entry.map_err(|err| WalError::DirEntry(self.path.clone(), err))?; let file_name = entry.file_name(); let file_id = Self::parse_filename(&file_name.to_string_lossy())?; @@ -126,14 +126,9 @@ impl Storage { let mut file = match File::open(&file_path) { Ok(file) => file, Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(err) => { - return Err(WalError::FileFsPath( - file_id, - reth_fs_util::FsPathError::open(err, &file_path), - )) - } + Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()), }; - let size = file.metadata().map_err(|err| WalError::FileIO(file_id, err))?.len(); + let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len(); // Deserialize using the bincode- and msgpack-compatible serde wrapper let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> = @@ -162,10 +157,9 @@ impl Storage { reth_fs_util::atomic_write_file(&file_path, |file| { rmp_serde::encode::write(file, ¬ification) - }) - .map_err(|err| WalError::FileFsPath(file_id, err))?; + })?; - Ok(file_path.metadata().map_err(|err| WalError::FileIO(file_id, err))?.len()) + Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len()) } }