diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index b09b99ebc648..8b9ea080aac2 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -13,7 +13,10 @@ use reth_provider::{ providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory, }; use reth_snapshot::{segments, segments::Segment}; -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; impl Command { pub(crate) fn generate_headers_snapshot( @@ -32,7 +35,7 @@ impl Command { let segment = segments::Headers::new(compression, filters); - segment.snapshot::(provider, range.clone())?; + segment.snapshot::(provider, PathBuf::default(), range.clone())?; // Default name doesn't have any configuration reth_primitives::fs::rename( diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs index b0475eeff1b5..84e47acf4ba8 100644 --- a/bin/reth/src/db/snapshots/receipts.rs +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -14,7 +14,10 @@ use reth_provider::{ ReceiptProvider, TransactionsProvider, TransactionsProviderExt, }; use reth_snapshot::{segments, segments::Segment}; -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; impl Command { pub(crate) fn generate_receipts_snapshot( @@ -33,7 +36,7 @@ impl Command { let segment = segments::Receipts::new(compression, filters); - segment.snapshot::(provider, range.clone())?; + segment.snapshot::(provider, PathBuf::default(), range.clone())?; // Default name doesn't have any configuration reth_primitives::fs::rename( diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs index 9d3530d40286..0b7d8b0163db 100644 --- a/bin/reth/src/db/snapshots/transactions.rs +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -14,7 +14,10 @@ use reth_provider::{ TransactionsProvider, TransactionsProviderExt, }; use reth_snapshot::{segments, segments::Segment}; -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; impl Command { pub(crate) fn generate_transactions_snapshot( @@ -33,7 +36,7 @@ impl Command { let segment = segments::Transactions::new(compression, filters); - segment.snapshot::(provider, range.clone())?; + segment.snapshot::(provider, PathBuf::default(), range.clone())?; // Default name doesn't have any configuration reth_primitives::fs::rename( diff --git a/crates/primitives/src/snapshot/mod.rs b/crates/primitives/src/snapshot/mod.rs index f73ba9874d77..8c595c75c1c7 100644 --- a/crates/primitives/src/snapshot/mod.rs +++ b/crates/primitives/src/snapshot/mod.rs @@ -7,7 +7,7 @@ mod segment; use alloy_primitives::BlockNumber; pub use compression::Compression; pub use filters::{Filters, InclusionFilter, PerfectHashingFunction}; -pub use segment::{SegmentHeader, SnapshotSegment}; +pub use segment::{SegmentConfig, SegmentHeader, SnapshotSegment}; /// Default snapshot block count. pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000; diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index 309d2c4ba7fe..f016e3e85bff 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -36,11 +36,14 @@ pub enum SnapshotSegment { impl SnapshotSegment { /// Returns the default configuration of the segment. - pub const fn config(&self) -> (Filters, Compression) { - let default_config = ( - Filters::WithFilters(InclusionFilter::Cuckoo, super::PerfectHashingFunction::Fmph), - Compression::Lz4, - ); + pub const fn config(&self) -> SegmentConfig { + let default_config = SegmentConfig { + filters: Filters::WithFilters( + InclusionFilter::Cuckoo, + super::PerfectHashingFunction::Fmph, + ), + compression: Compression::Lz4, + }; match self { SnapshotSegment::Headers => default_config, @@ -133,3 +136,12 @@ impl SegmentHeader { } } } + +/// Configuration used on the segment. +#[derive(Debug, Clone, Copy)] +pub struct SegmentConfig { + /// Inclusion filters used on the segment + pub filters: Filters, + /// Compression used on the segment + pub compression: Compression, +} diff --git a/crates/snapshot/src/segments/headers.rs b/crates/snapshot/src/segments/headers.rs index 4cc3ced20470..89fc009f9fd7 100644 --- a/crates/snapshot/src/segments/headers.rs +++ b/crates/snapshot/src/segments/headers.rs @@ -5,38 +5,48 @@ use reth_db::{ }; use reth_interfaces::RethResult; use reth_primitives::{ - snapshot::{Compression, Filters}, + snapshot::{Compression, Filters, SegmentConfig}, BlockNumber, SnapshotSegment, }; use reth_provider::DatabaseProviderRO; -use std::ops::RangeInclusive; +use std::{ops::RangeInclusive, path::Path}; /// Snapshot segment responsible for [SnapshotSegment::Headers] part of data. #[derive(Debug)] pub struct Headers { - compression: Compression, - filters: Filters, + config: SegmentConfig, } impl Headers { /// Creates new instance of [Headers] snapshot segment. pub fn new(compression: Compression, filters: Filters) -> Self { - Self { compression, filters } + Self { config: SegmentConfig { compression, filters } } + } +} + +impl Default for Headers { + fn default() -> Self { + Self { config: SnapshotSegment::Headers.config() } } } impl Segment for Headers { + fn segment() -> SnapshotSegment { + SnapshotSegment::Headers + } + fn snapshot( &self, provider: &DatabaseProviderRO<'_, DB>, + directory: impl AsRef, range: RangeInclusive, ) -> RethResult<()> { let range_len = range.clone().count(); let mut jar = prepare_jar::( provider, - SnapshotSegment::Headers, - self.filters, - self.compression, + directory, + Self::segment(), + self.config, range.clone(), range_len, || { @@ -57,7 +67,7 @@ impl Segment for Headers { // Generate list of hashes for filters & PHF let mut cursor = provider.tx_ref().cursor_read::>()?; let mut hashes = None; - if self.filters.has_filters() { + if self.config.filters.has_filters() { hashes = Some( cursor .walk(Some(RawKey::from(*range.start())))? diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs index 6293c3896c10..37727953a59e 100644 --- a/crates/snapshot/src/segments/mod.rs +++ b/crates/snapshot/src/segments/mod.rs @@ -15,7 +15,9 @@ use reth_db::{ use reth_interfaces::RethResult; use reth_nippy_jar::NippyJar; use reth_primitives::{ - snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader}, + snapshot::{ + Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader, + }, BlockNumber, SnapshotSegment, }; use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; @@ -24,14 +26,19 @@ use std::{ops::RangeInclusive, path::Path}; pub(crate) type Rows = [Vec>; COLUMNS]; /// A segment represents a snapshotting of some portion of the data. -pub trait Segment { - /// Snapshot data using the provided range. +pub trait Segment: Default { + /// Snapshot data using the provided range. The `directory` parameter determines the snapshot + /// file's save location. fn snapshot( &self, provider: &DatabaseProviderRO<'_, DB>, + directory: impl AsRef, range: RangeInclusive, ) -> RethResult<()>; + /// Returns this struct's [`SnapshotSegment`]. + fn segment() -> SnapshotSegment; + /// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000). fn dataset_for_compression>( &self, @@ -48,12 +55,13 @@ pub trait Segment { } } -/// Returns a [`NippyJar`] according to the desired configuration. +/// Returns a [`NippyJar`] according to the desired configuration. The `directory` parameter +/// determines the snapshot file's save location. pub(crate) fn prepare_jar( provider: &DatabaseProviderRO<'_, DB>, + directory: impl AsRef, segment: SnapshotSegment, - filters: Filters, - compression: Compression, + segment_config: SegmentConfig, block_range: RangeInclusive, total_rows: usize, prepare_compression: impl Fn() -> RethResult>, @@ -61,11 +69,11 @@ pub(crate) fn prepare_jar( let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; let mut nippy_jar = NippyJar::new( COLUMNS, - Path::new(segment.filename(&block_range).as_str()), + &directory.as_ref().join(segment.filename(&block_range).as_str()), SegmentHeader::new(block_range, tx_range, segment), ); - nippy_jar = match compression { + nippy_jar = match segment_config.compression { Compression::Lz4 => nippy_jar.with_lz4(), Compression::Zstd => nippy_jar.with_zstd(false, 0), Compression::ZstdWithDictionary => { @@ -78,7 +86,7 @@ pub(crate) fn prepare_jar( Compression::Uncompressed => nippy_jar, }; - if let Filters::WithFilters(inclusion_filter, phf) = filters { + if let Filters::WithFilters(inclusion_filter, phf) = segment_config.filters { nippy_jar = match inclusion_filter { InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows), }; diff --git a/crates/snapshot/src/segments/receipts.rs b/crates/snapshot/src/segments/receipts.rs index 4fb2e399d115..75e8aaa8995d 100644 --- a/crates/snapshot/src/segments/receipts.rs +++ b/crates/snapshot/src/segments/receipts.rs @@ -2,30 +2,40 @@ use crate::segments::{prepare_jar, Segment}; use reth_db::{database::Database, snapshot::create_snapshot_T1, tables}; use reth_interfaces::RethResult; use reth_primitives::{ - snapshot::{Compression, Filters, SegmentHeader}, + snapshot::{Compression, Filters, SegmentConfig, SegmentHeader}, BlockNumber, SnapshotSegment, TxNumber, }; use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; -use std::ops::RangeInclusive; +use std::{ops::RangeInclusive, path::Path}; /// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data. #[derive(Debug)] pub struct Receipts { - compression: Compression, - filters: Filters, + config: SegmentConfig, } impl Receipts { /// Creates new instance of [Receipts] snapshot segment. pub fn new(compression: Compression, filters: Filters) -> Self { - Self { compression, filters } + Self { config: SegmentConfig { compression, filters } } + } +} + +impl Default for Receipts { + fn default() -> Self { + Self { config: SnapshotSegment::Receipts.config() } } } impl Segment for Receipts { + fn segment() -> SnapshotSegment { + SnapshotSegment::Receipts + } + fn snapshot( &self, provider: &DatabaseProviderRO<'_, DB>, + directory: impl AsRef, block_range: RangeInclusive, ) -> RethResult<()> { let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; @@ -33,9 +43,9 @@ impl Segment for Receipts { let mut jar = prepare_jar::( provider, - SnapshotSegment::Receipts, - self.filters, - self.compression, + directory, + Self::segment(), + self.config, block_range, tx_range_len, || { @@ -49,7 +59,7 @@ impl Segment for Receipts { // Generate list of hashes for filters & PHF let mut hashes = None; - if self.filters.has_filters() { + if self.config.filters.has_filters() { hashes = Some( provider .transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))? diff --git a/crates/snapshot/src/segments/transactions.rs b/crates/snapshot/src/segments/transactions.rs index 09d120c09db4..b9cccfd20d37 100644 --- a/crates/snapshot/src/segments/transactions.rs +++ b/crates/snapshot/src/segments/transactions.rs @@ -2,30 +2,40 @@ use crate::segments::{prepare_jar, Segment}; use reth_db::{database::Database, snapshot::create_snapshot_T1, tables}; use reth_interfaces::RethResult; use reth_primitives::{ - snapshot::{Compression, Filters, SegmentHeader}, + snapshot::{Compression, Filters, SegmentConfig, SegmentHeader}, BlockNumber, SnapshotSegment, TxNumber, }; use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; -use std::ops::RangeInclusive; +use std::{ops::RangeInclusive, path::Path}; /// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data. #[derive(Debug)] pub struct Transactions { - compression: Compression, - filters: Filters, + config: SegmentConfig, } impl Transactions { /// Creates new instance of [Transactions] snapshot segment. pub fn new(compression: Compression, filters: Filters) -> Self { - Self { compression, filters } + Self { config: SegmentConfig { compression, filters } } + } +} + +impl Default for Transactions { + fn default() -> Self { + Self { config: SnapshotSegment::Transactions.config() } } } impl Segment for Transactions { + fn segment() -> SnapshotSegment { + SnapshotSegment::Transactions + } + fn snapshot( &self, provider: &DatabaseProviderRO<'_, DB>, + directory: impl AsRef, block_range: RangeInclusive, ) -> RethResult<()> { let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; @@ -33,9 +43,9 @@ impl Segment for Transactions { let mut jar = prepare_jar::( provider, - SnapshotSegment::Transactions, - self.filters, - self.compression, + directory, + Self::segment(), + self.config, block_range, tx_range_len, || { @@ -49,7 +59,7 @@ impl Segment for Transactions { // Generate list of hashes for filters & PHF let mut hashes = None; - if self.filters.has_filters() { + if self.config.filters.has_filters() { hashes = Some( provider .transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))? diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index c8790d336adf..cd583b5e14eb 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -1,6 +1,6 @@ //! Support for snapshotting. -use crate::SnapshotterError; +use crate::{segments, segments::Segment, SnapshotterError}; use reth_db::database::Database; use reth_interfaces::{RethError, RethResult}; use reth_primitives::{ @@ -17,6 +17,10 @@ pub type SnapshotterResult = Result; /// The snapshotter type itself with the result of [Snapshotter::run] pub type SnapshotterWithResult = (Snapshotter, SnapshotterResult); +/// Snapshots are initially created in `{...}/datadir/snapshots/temp` and moved once finished. This +/// directory is cleaned up on every booting up of the node. +const TEMPORARY_SUBDIRECTORY: &str = "temp"; + /// Snapshotting routine. Main snapshotting logic happens in [Snapshotter::run]. #[derive(Debug)] pub struct Snapshotter { @@ -89,11 +93,6 @@ impl Snapshotter { chain_spec: Arc, block_interval: u64, ) -> RethResult { - // Create directory for snapshots if it doesn't exist. - if !snapshots_path.exists() { - reth_primitives::fs::create_dir_all(&snapshots_path)?; - } - let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None); let mut snapshotter = Self { @@ -106,11 +105,34 @@ impl Snapshotter { block_interval, }; + snapshotter.create_directory()?; snapshotter.update_highest_snapshots_tracker()?; Ok(snapshotter) } + /// Ensures the snapshots directory and its temporary subdirectory are properly set up. + /// + /// This function performs the following actions: + /// 1. If `datadir/snapshots` does not exist, it creates it. + /// 2. Ensures `datadir/snapshots/temp` exists and is empty. + /// + /// The `temp` subdirectory is where snapshots are initially created before being + /// moved to their final location within `datadir/snapshots`. + fn create_directory(&self) -> RethResult<()> { + let temporary_path = self.snapshots_path.join(TEMPORARY_SUBDIRECTORY); + + if !self.snapshots_path.exists() { + reth_primitives::fs::create_dir_all(&self.snapshots_path)?; + } else if temporary_path.exists() { + reth_primitives::fs::remove_dir_all(&temporary_path)?; + } + + reth_primitives::fs::create_dir_all(temporary_path)?; + + Ok(()) + } + #[cfg(test)] fn set_highest_snapshots_from_targets(&mut self, targets: &SnapshotTargets) { if let Some(block_number) = &targets.headers { @@ -167,13 +189,48 @@ impl Snapshotter { debug_assert!(targets.is_multiple_of_block_interval(self.block_interval)); debug_assert!(targets.is_contiguous_to_highest_snapshots(self.highest_snapshots)); - // TODO(alexey): snapshot logic + self.run_segment::( + targets.receipts.as_ref().map(|(range, _)| range.clone()), + )?; + + self.run_segment::( + targets.transactions.as_ref().map(|(range, _)| range.clone()), + )?; + + self.run_segment::(targets.headers.clone())?; self.update_highest_snapshots_tracker()?; Ok(targets) } + /// Run the snapshotter for one segment. + /// + /// It first builds the snapshot in a **temporary directory** inside the snapshots directory. If + /// for some reason the node is terminated during the snapshot process, it will be cleaned + /// up on boot (on [`Snapshotter::new`]) and the snapshot process restarted from scratch for + /// this block range and segment. + /// + /// If it succeeds, then we move the snapshot file from the temporary directory to its main one. + fn run_segment( + &self, + block_range: Option>, + ) -> RethResult<()> { + if let Some(block_range) = block_range { + let temp = self.snapshots_path.join(TEMPORARY_SUBDIRECTORY); + let filename = S::segment().filename(&block_range); + + S::default().snapshot::( + &self.provider_factory.provider()?, + temp.clone(), + block_range.clone(), + )?; + + reth_primitives::fs::rename(temp.join(&filename), self.snapshots_path.join(filename))?; + } + Ok(()) + } + /// Returns a snapshot targets at the provided finalized block number, respecting the block /// interval. The target is determined by the check against last snapshots. pub fn get_snapshot_targets(