From 934fd1f7f07c42ea49b92fb15694209ee0b9530f Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Tue, 24 Dec 2024 03:31:59 +0400 Subject: [PATCH] chore: make `NodeEvent` generic over `NodePrimitives` (#13534) --- Cargo.lock | 1 + bin/reth/src/commands/debug_cmd/execution.rs | 3 +- crates/cli/commands/src/import.rs | 2 +- crates/node/builder/src/launch/mod.rs | 4 +- crates/node/events/Cargo.toml | 1 + crates/node/events/src/node.rs | 77 +++++++------------- 6 files changed, 32 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ccf19e52d36d..c8a73da0d7b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8315,6 +8315,7 @@ dependencies = [ "alloy-eips", "alloy-primitives", "alloy-rpc-types-engine", + "derive_more", "futures", "humantime", "pin-project", diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index efe4a2f7c221..d4ffbb9fe3a6 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -24,6 +24,7 @@ use reth_network_api::NetworkInfo; use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient}; use reth_node_api::NodeTypesWithDBAdapter; use reth_node_ethereum::EthExecutorProvider; +use reth_node_events::node::NodeEvent; use reth_provider::{ providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader, }; @@ -211,7 +212,7 @@ impl> Command { reth_node_events::node::handle_events( Some(Box::new(network)), latest_block_number, - pipeline.events().map(Into::into), + pipeline.events().map(Into::>::into), ), ); diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index dc99ae7f98d0..eb314b4e8a43 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -165,7 +165,7 @@ pub fn build_import_pipeline( static_file_producer: StaticFileProducer>, disable_exec: bool, executor: E, -) -> eyre::Result<(Pipeline, impl Stream)> +) -> eyre::Result<(Pipeline, impl Stream>)> where N: ProviderNodeTypes + CliNodeTypes, C: Consensus + 'static, diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 87d026873697..c6a00a6eec8c 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -34,7 +34,7 @@ use reth_node_core::{ dirs::{ChainPath, DataDirPath}, exit::NodeExitFuture, }; -use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; +use reth_node_events::{cl::ConsensusLayerHealthEvents, node, node::NodeEvent}; use reth_provider::providers::{BlockchainProvider, NodeTypesForTree}; use reth_rpc::eth::RpcNodeCore; use reth_tasks::TaskExecutor; @@ -292,7 +292,7 @@ where info!(target: "reth::cli", "Consensus engine initialized"); let events = stream_select!( - pipeline_events.map(Into::into), + pipeline_events.map(Into::>::into), if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { Either::Left( ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) diff --git a/crates/node/events/Cargo.toml b/crates/node/events/Cargo.toml index 03f3ab172883..9629aecef9a2 100644 --- a/crates/node/events/Cargo.toml +++ b/crates/node/events/Cargo.toml @@ -38,3 +38,4 @@ tracing.workspace = true # misc pin-project.workspace = true humantime.workspace = true +derive_more.workspace = true diff --git a/crates/node/events/src/node.rs b/crates/node/events/src/node.rs index 86f1ea507ac5..07aa0bb0da95 100644 --- a/crates/node/events/src/node.rs +++ b/crates/node/events/src/node.rs @@ -1,14 +1,14 @@ //! Support for handling events emitted by node components. use crate::cl::ConsensusLayerHealthEvent; -use alloy_consensus::constants::GWEI_TO_WEI; +use alloy_consensus::{constants::GWEI_TO_WEI, BlockHeader}; use alloy_primitives::{BlockNumber, B256}; use alloy_rpc_types_engine::ForkchoiceState; use futures::Stream; use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress}; use reth_engine_primitives::ForkchoiceStatus; use reth_network_api::PeersInfo; -use reth_primitives_traits::{format_gas, format_gas_throughput}; +use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives}; use reth_prune_types::PrunerEvent; use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId}; use reth_static_file_types::StaticFileProducerEvent; @@ -211,7 +211,10 @@ impl NodeState { } } - fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) { + fn handle_consensus_engine_event( + &mut self, + event: BeaconConsensusEngineEvent, + ) { match event { BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => { let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } = @@ -248,28 +251,28 @@ impl NodeState { } BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => { info!( - number=block.number, + number=block.number(), hash=?block.hash(), peers=self.num_connected_peers(), - txs=block.body.transactions.len(), - gas=%format_gas(block.header.gas_used), - gas_throughput=%format_gas_throughput(block.header.gas_used, elapsed), - full=%format!("{:.1}%", block.header.gas_used as f64 * 100.0 / block.header.gas_limit as f64), - base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas.unwrap_or(0) as f64 / GWEI_TO_WEI as f64), - blobs=block.header.blob_gas_used.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB, - excess_blobs=block.header.excess_blob_gas.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB, + txs=block.body.transactions().len(), + gas=%format_gas(block.header.gas_used()), + gas_throughput=%format_gas_throughput(block.header.gas_used(), elapsed), + full=%format!("{:.1}%", block.header.gas_used() as f64 * 100.0 / block.header.gas_limit() as f64), + base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64), + blobs=block.header.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB, + excess_blobs=block.header.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB, ?elapsed, "Block added to canonical chain" ); } BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => { - self.latest_block = Some(head.number); - self.latest_block_time = Some(head.timestamp); + self.latest_block = Some(head.number()); + self.latest_block_time = Some(head.timestamp()); - info!(number=head.number, hash=?head.hash(), ?elapsed, "Canonical chain committed"); + info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed"); } BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => { - info!(number=block.number, hash=?block.hash(), ?elapsed, "Block added to fork chain"); + info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain"); } } } @@ -350,12 +353,12 @@ struct CurrentStage { } /// A node event. -#[derive(Debug)] -pub enum NodeEvent { +#[derive(Debug, derive_more::From)] +pub enum NodeEvent { /// A sync pipeline event. Pipeline(PipelineEvent), /// A consensus engine event. - ConsensusEngine(BeaconConsensusEngineEvent), + ConsensusEngine(BeaconConsensusEngineEvent), /// A Consensus Layer health event. ConsensusLayerHealth(ConsensusLayerHealthEvent), /// A pruner event @@ -367,44 +370,14 @@ pub enum NodeEvent { Other(String), } -impl From for NodeEvent { - fn from(event: PipelineEvent) -> Self { - Self::Pipeline(event) - } -} - -impl From for NodeEvent { - fn from(event: BeaconConsensusEngineEvent) -> Self { - Self::ConsensusEngine(event) - } -} - -impl From for NodeEvent { - fn from(event: ConsensusLayerHealthEvent) -> Self { - Self::ConsensusLayerHealth(event) - } -} - -impl From for NodeEvent { - fn from(event: PrunerEvent) -> Self { - Self::Pruner(event) - } -} - -impl From for NodeEvent { - fn from(event: StaticFileProducerEvent) -> Self { - Self::StaticFileProducer(event) - } -} - /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. -pub async fn handle_events( +pub async fn handle_events( peers_info: Option>, latest_block_number: Option, events: E, ) where - E: Stream + Unpin, + E: Stream> + Unpin, { let state = NodeState::new(peers_info, latest_block_number); @@ -426,9 +399,9 @@ struct EventHandler { info_interval: Interval, } -impl Future for EventHandler +impl Future for EventHandler where - E: Stream + Unpin, + E: Stream> + Unpin, { type Output = ();