Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Snapshotter triggers segment snapshots #5287

Merged
merged 41 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
eb00e6d
add snapshots_path to ChainPath
joshieDo Oct 31, 2023
a8ef221
move HighestSnapshots to primitives crate
joshieDo Oct 31, 2023
f1f32d7
add with_highest_tracker to SnapshotProvider
joshieDo Oct 31, 2023
f5cadf8
add a shared snapshot provvider to providerfactory and dbprovider
joshieDo Oct 31, 2023
1acd28f
add snapshot provider to shared blockchain_db
joshieDo Oct 31, 2023
470a6b6
allow unused for now
joshieDo Oct 31, 2023
113ef23
add get_highest_snapshot to SnapshotProvider
joshieDo Oct 31, 2023
73fd00f
move hihgest snapshot channel inside snapshotter
joshieDo Nov 2, 2023
2fef51f
add default receiver to snapshotter
joshieDo Nov 2, 2023
33dc284
replace with with_snapshot_provider on db provider
joshieDo Nov 2, 2023
cda1ca7
use strum for SnapshotSegment, compression and filters
joshieDo Nov 3, 2023
ec89704
snapshotter takes a directory and reads from it
joshieDo Nov 3, 2023
e87c64d
default snapshot filename doesnt have configuration
joshieDo Nov 3, 2023
99c1fa5
create snapshots directory if it doesnt exist
joshieDo Nov 3, 2023
d4b5f61
add run_segment
joshieDo Nov 3, 2023
dd338e5
add type SegmentConfig
joshieDo Nov 3, 2023
00ec5f5
add docs for directory
joshieDo Nov 8, 2023
c1dd713
replace PathBuf usage
joshieDo Nov 8, 2023
3be09cd
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider
joshieDo Nov 8, 2023
96b255c
Merge branch 'joshie/db-snap-provider' into joshie/db-snap-provider2
joshieDo Nov 8, 2023
b281144
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 8, 2023
d0699f7
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
892b260
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
756d00b
Update crates/storage/provider/src/providers/snapshot/manager.rs
joshieDo Nov 13, 2023
5891d0a
Update crates/primitives/src/snapshot/segment.rs
joshieDo Nov 13, 2023
d2c21e9
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
a11751f
Merge branch 'joshie/db-snap-provider' into joshie/db-snap-provider2
joshieDo Nov 13, 2023
4574c68
add fn block_range to snapshot cmd
joshieDo Nov 13, 2023
d47812a
add read_dir to primitives fs
joshieDo Nov 13, 2023
e1dca84
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 13, 2023
95e965a
trait Segment requires Default
joshieDo Nov 13, 2023
7054744
add reth_primitives::fs::rename
joshieDo Nov 14, 2023
a42ebaa
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 14, 2023
a865b59
use reth fs rename
joshieDo Nov 14, 2023
77094e5
clippy
joshieDo Nov 14, 2023
622159b
Update crates/primitives/src/fs.rs
joshieDo Nov 14, 2023
975b47d
fmt
joshieDo Nov 14, 2023
c9ee102
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider2
joshieDo Nov 14, 2023
2bd3205
Merge branch 'joshie/db-snap-provider2' into joshie/db-snap-provider3
joshieDo Nov 14, 2023
bceba97
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider3
joshieDo Nov 14, 2023
986abfd
add missing imports
joshieDo Nov 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 27 additions & 14 deletions bin/reth/src/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use reth_primitives::{
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
};
use reth_snapshot::segments::{Headers, Segment};
use std::{path::Path, sync::Arc};
use reth_snapshot::{segments, segments::Segment};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_headers_snapshot<DB: Database>(
Expand All @@ -23,15 +26,24 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = Headers::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
let range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment = segments::Headers::new(compression, filters);

segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;

// Default name doesn't have any configuration
let default_name: PathBuf = SnapshotSegment::Headers.filename(&range).into();
let new_name: PathBuf = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &range)
.into();

std::fs::rename(default_name, new_name)?;

Ok(())
}
Expand All @@ -51,12 +63,13 @@ impl Command {
Filters::WithoutFilters
};

let range = self.from..=(self.from + self.block_interval - 1);
let range = self.block_range();

let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let path =
SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range);
let path = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &range)
.into();
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Headers, self.from, Some(path))?;
Expand Down
7 changes: 6 additions & 1 deletion bin/reth/src/db/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use reth_primitives::{
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::ProviderFactory;
use std::{path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path, sync::Arc};

mod bench;
mod headers;
Expand Down Expand Up @@ -130,4 +130,9 @@ impl Command {

Ok(())
}

/// Gives out the inclusive block range for the snapshot requested by the user.
fn block_range(&self) -> RangeInclusive<BlockNumber> {
self.from..=(self.from + self.block_interval - 1)
}
}
41 changes: 26 additions & 15 deletions bin/reth/src/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use reth_provider::{
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_receipts_snapshot<DB: Database>(
Expand All @@ -24,15 +27,24 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = segments::Receipts::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
let range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment = segments::Receipts::new(compression, filters);

segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;

// Default name doesn't have any configuration
let default_name: PathBuf = SnapshotSegment::Receipts.filename(&range).into();
let new_name: PathBuf = SnapshotSegment::Receipts
.filename_with_configuration(filters, compression, &range)
.into();

std::fs::rename(default_name, new_name)?;

Ok(())
}
Expand Down Expand Up @@ -62,11 +74,10 @@ impl Command {

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let path = SnapshotSegment::Receipts.filename_with_configuration(
filters,
compression,
&block_range,
);
let path = SnapshotSegment::Receipts
.filename_with_configuration(filters, compression, &block_range)
.into();

let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Receipts, self.from, Some(path))?;
Expand Down
40 changes: 25 additions & 15 deletions bin/reth/src/db/snapshots/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use reth_provider::{
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_transactions_snapshot<DB: Database>(
Expand All @@ -24,15 +27,24 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = segments::Transactions::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
let range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment = segments::Transactions::new(compression, filters);

segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;

// Default name doesn't have any configuration
let default_name: PathBuf = SnapshotSegment::Transactions.filename(&range).into();
let new_name: PathBuf = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &range)
.into();

std::fs::rename(default_name, new_name)?;

Ok(())
}
Expand Down Expand Up @@ -62,11 +74,9 @@ impl Command {

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let path = SnapshotSegment::Transactions.filename_with_configuration(
filters,
compression,
&block_range,
);
let path = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &block_range)
.into();
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Transactions, self.from, Some(path))?;
Expand Down
5 changes: 5 additions & 0 deletions bin/reth/src/dirs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ impl<D> ChainPath<D> {
self.0.join("db").into()
}

/// Returns the path to the snapshots directory for this chain.
pub fn snapshots_path(&self) -> PathBuf {
self.0.join("snapshots").into()
}

/// Returns the path to the reth p2p secret key for this chain.
///
/// `<DIR>/<CHAIN_ID>/discovery-secret`
Expand Down
26 changes: 15 additions & 11 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,17 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// fetch the head block from the database
let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?;

// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
db.clone(),
data_dir.snapshots_path(),
self.chain.clone(),
self.chain.snapshot_block_interval,
)?;

// setup the blockchain provider
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain));
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain))
.with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver());
let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?;
let blob_store = InMemoryBlobStore::default();
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain))
Expand Down Expand Up @@ -454,12 +463,14 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
None
};

let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None);

let mut hooks = EngineHooks::new();

let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx);
let mut pruner = self.build_pruner(
&prune_config,
db.clone(),
snapshotter.highest_snapshot_receiver(),
);

let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
Expand All @@ -470,13 +481,6 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Either::Right(stream::empty())
};

let _snapshotter = reth_snapshot::Snapshotter::new(
db,
self.chain.clone(),
self.chain.snapshot_block_interval,
highest_snapshots_tx,
);

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client,
Expand Down
6 changes: 6 additions & 0 deletions crates/interfaces/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ impl From<reth_nippy_jar::NippyJarError> for RethError {
}
}

impl From<reth_primitives::fs::FsPathError> for RethError {
fn from(err: reth_primitives::fs::FsPathError) -> Self {
RethError::Custom(err.to_string())
}
}

// We don't want these types to be too large because they're used in a lot of places.
const _SIZE_ASSERTIONS: () = {
// Main error.
Expand Down
17 changes: 16 additions & 1 deletion crates/primitives/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Wrapper for `std::fs` methods
use std::{
fs, io,
fs::{self, ReadDir},
io,
path::{Path, PathBuf},
};

Expand Down Expand Up @@ -30,6 +31,9 @@ pub enum FsPathError {
/// Provides additional path context for [`std::fs::remove_dir`].
#[error("failed to remove dir {path:?}: {source}")]
RemoveDir { source: io::Error, path: PathBuf },
/// Provides additional path context for [`std::fs::read_dir`].
#[error("failed to read dir {path:?}: {source}")]
ReadDir { source: io::Error, path: PathBuf },
/// Provides additional path context for [`std::fs::File::open`].
#[error("failed to open file {path:?}: {source}")]
Open { source: io::Error, path: PathBuf },
Expand Down Expand Up @@ -77,6 +81,11 @@ impl FsPathError {
FsPathError::RemoveDir { source, path: path.into() }
}

/// Returns the complementary error variant for [`std::fs::read_dir`].
pub fn read_dir(source: io::Error, path: impl Into<PathBuf>) -> Self {
FsPathError::ReadDir { source, path: path.into() }
}

/// Returns the complementary error variant for [`std::fs::File::open`].
pub fn open(source: io::Error, path: impl Into<PathBuf>) -> Self {
FsPathError::Open { source, path: path.into() }
Expand Down Expand Up @@ -108,3 +117,9 @@ pub fn create_dir_all(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
fs::create_dir_all(path).map_err(|err| FsPathError::create_dir(err, path))
}

/// Wrapper for `std::fs::read_dir`
pub fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir> {
let path = path.as_ref();
fs::read_dir(path).map_err(|err| FsPathError::read_dir(err, path))
}
8 changes: 7 additions & 1 deletion crates/primitives/src/snapshot/compression.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#[derive(Debug, Copy, Clone, Default)]
use strum::AsRefStr;

#[derive(Debug, Copy, Clone, Default, AsRefStr)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
#[allow(missing_docs)]
/// Snapshot compression
pub enum Compression {
#[strum(serialize = "lz4")]
Lz4,
#[strum(serialize = "zstd")]
Zstd,
#[strum(serialize = "zstd-dict")]
ZstdWithDictionary,
#[strum(serialize = "uncompressed")]
#[default]
Uncompressed,
}
9 changes: 7 additions & 2 deletions crates/primitives/src/snapshot/filters.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use strum::AsRefStr;

#[derive(Debug, Copy, Clone)]
/// Snapshot filters.
pub enum Filters {
Expand All @@ -14,20 +16,23 @@ impl Filters {
}
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, AsRefStr)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Snapshot inclusion filter. Also see [Filters].
pub enum InclusionFilter {
#[strum(serialize = "cuckoo")]
/// Cuckoo filter
Cuckoo,
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, AsRefStr)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Snapshot perfect hashing function. Also see [Filters].
pub enum PerfectHashingFunction {
#[strum(serialize = "fmph")]
/// Fingerprint-Based Minimal Perfect Hash Function
Fmph,
#[strum(serialize = "gofmph")]
/// Fingerprint-Based Minimal Perfect Hash Function with Group Optimization
GoFmph,
}
Loading
Loading