Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fail on unwind during reth import #12062

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/cli/commands/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ where
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
.with_max_block(max_block)
.with_fail_on_unwind(true)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
Expand Down
1 change: 1 addition & 0 deletions crates/optimism/cli/src/commands/build_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ where
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
.with_max_block(max_block)
.with_fail_on_unwind(true)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
Expand Down
3 changes: 3 additions & 0 deletions crates/stages/api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,7 @@ pub enum PipelineError {
/// Internal error
#[error(transparent)]
Internal(#[from] RethError),
/// The pipeline encountered an unwind when `fail_on_unwind` was set to `true`.
#[error("unexpected unwind")]
UnexpectedUnwind,
}
19 changes: 17 additions & 2 deletions crates/stages/api/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct PipelineBuilder<Provider> {
/// A receiver for the current chain tip to sync to.
tip_tx: Option<watch::Sender<B256>>,
metrics_tx: Option<MetricEventsSender>,
fail_on_unwind: bool,
}

impl<Provider> PipelineBuilder<Provider> {
Expand Down Expand Up @@ -62,6 +63,12 @@ impl<Provider> PipelineBuilder<Provider> {
self
}

/// Set whether pipeline should fail on unwind.
pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self {
self.fail_on_unwind = yes;
self
}

/// Builds the final [`Pipeline`] using the given database.
pub fn build<N>(
self,
Expand All @@ -72,7 +79,7 @@ impl<Provider> PipelineBuilder<Provider> {
N: ProviderNodeTypes,
ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
{
let Self { stages, max_block, tip_tx, metrics_tx } = self;
let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self;
Pipeline {
provider_factory,
stages,
Expand All @@ -82,13 +89,20 @@ impl<Provider> PipelineBuilder<Provider> {
event_sender: Default::default(),
progress: Default::default(),
metrics_tx,
fail_on_unwind,
}
}
}

impl<Provider> Default for PipelineBuilder<Provider> {
fn default() -> Self {
Self { stages: Vec::new(), max_block: None, tip_tx: None, metrics_tx: None }
Self {
stages: Vec::new(),
max_block: None,
tip_tx: None,
metrics_tx: None,
fail_on_unwind: false,
}
}
}

Expand All @@ -97,6 +111,7 @@ impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
f.debug_struct("PipelineBuilder")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block)
.field("fail_on_unwind", &self.fail_on_unwind)
.finish()
}
}
8 changes: 8 additions & 0 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub struct Pipeline<N: ProviderNodeTypes> {
/// A receiver for the current chain tip to sync to.
tip_tx: Option<watch::Sender<B256>>,
metrics_tx: Option<MetricEventsSender>,
/// Whether an unwind should fail the syncing process. Should only be set when downloading
/// blocks from trusted sources and expecting them to be valid.
fail_on_unwind: bool,
}

impl<N: ProviderNodeTypes> Pipeline<N> {
Expand Down Expand Up @@ -164,6 +167,10 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
loop {
let next_action = self.run_loop().await?;

if next_action.is_unwind() && self.fail_on_unwind {
return Err(PipelineError::UnexpectedUnwind)
}

// Terminate the loop early if it's reached the maximum user
// configured block.
if next_action.should_continue() &&
Expand Down Expand Up @@ -586,6 +593,7 @@ impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block)
.field("event_sender", &self.event_sender)
.field("fail_on_unwind", &self.fail_on_unwind)
.finish()
}
}
Expand Down
Loading