Skip to content

Commit

Permalink
chore: make NodeEvent generic over NodePrimitives (#13534)
Browse files Browse the repository at this point in the history
  • Loading branch information
klkvr authored Dec 23, 2024
1 parent af1c9b7 commit 934fd1f
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
use reth_node_api::NodeTypesWithDBAdapter;
use reth_node_ethereum::EthExecutorProvider;
use reth_node_events::node::NodeEvent;
use reth_provider::{
providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
};
Expand Down Expand Up @@ -211,7 +212,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
reth_node_events::node::handle_events(
Some(Box::new(network)),
latest_block_number,
pipeline.events().map(Into::into),
pipeline.events().map(Into::<NodeEvent<N::Primitives>>::into),
),
);

Expand Down
2 changes: 1 addition & 1 deletion crates/cli/commands/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub fn build_import_pipeline<N, C, E>(
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
executor: E,
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent>)>
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent<N::Primitives>>)>
where
N: ProviderNodeTypes + CliNodeTypes,
C: Consensus + 'static,
Expand Down
4 changes: 2 additions & 2 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use reth_node_core::{
dirs::{ChainPath, DataDirPath},
exit::NodeExitFuture,
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node, node::NodeEvent};
use reth_provider::providers::{BlockchainProvider, NodeTypesForTree};
use reth_rpc::eth::RpcNodeCore;
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -292,7 +292,7 @@ where
info!(target: "reth::cli", "Consensus engine initialized");

let events = stream_select!(
pipeline_events.map(Into::into),
pipeline_events.map(Into::<NodeEvent<Types::Primitives>>::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left(
ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
Expand Down
1 change: 1 addition & 0 deletions crates/node/events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ tracing.workspace = true
# misc
pin-project.workspace = true
humantime.workspace = true
derive_more.workspace = true
77 changes: 25 additions & 52 deletions crates/node/events/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Support for handling events emitted by node components.
use crate::cl::ConsensusLayerHealthEvent;
use alloy_consensus::constants::GWEI_TO_WEI;
use alloy_consensus::{constants::GWEI_TO_WEI, BlockHeader};
use alloy_primitives::{BlockNumber, B256};
use alloy_rpc_types_engine::ForkchoiceState;
use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
use reth_engine_primitives::ForkchoiceStatus;
use reth_network_api::PeersInfo;
use reth_primitives_traits::{format_gas, format_gas_throughput};
use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
use reth_prune_types::PrunerEvent;
use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
use reth_static_file_types::StaticFileProducerEvent;
Expand Down Expand Up @@ -211,7 +211,10 @@ impl NodeState {
}
}

fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) {
fn handle_consensus_engine_event<N: NodePrimitives>(
&mut self,
event: BeaconConsensusEngineEvent<N>,
) {
match event {
BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
Expand Down Expand Up @@ -248,28 +251,28 @@ impl NodeState {
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
info!(
number=block.number,
number=block.number(),
hash=?block.hash(),
peers=self.num_connected_peers(),
txs=block.body.transactions.len(),
gas=%format_gas(block.header.gas_used),
gas_throughput=%format_gas_throughput(block.header.gas_used, elapsed),
full=%format!("{:.1}%", block.header.gas_used as f64 * 100.0 / block.header.gas_limit as f64),
base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas.unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
blobs=block.header.blob_gas_used.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
excess_blobs=block.header.excess_blob_gas.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
txs=block.body.transactions().len(),
gas=%format_gas(block.header.gas_used()),
gas_throughput=%format_gas_throughput(block.header.gas_used(), elapsed),
full=%format!("{:.1}%", block.header.gas_used() as f64 * 100.0 / block.header.gas_limit() as f64),
base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
blobs=block.header.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
excess_blobs=block.header.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
?elapsed,
"Block added to canonical chain"
);
}
BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
self.latest_block = Some(head.number);
self.latest_block_time = Some(head.timestamp);
self.latest_block = Some(head.number());
self.latest_block_time = Some(head.timestamp());

info!(number=head.number, hash=?head.hash(), ?elapsed, "Canonical chain committed");
info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
}
BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
info!(number=block.number, hash=?block.hash(), ?elapsed, "Block added to fork chain");
info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
}
}
}
Expand Down Expand Up @@ -350,12 +353,12 @@ struct CurrentStage {
}

/// A node event.
#[derive(Debug)]
pub enum NodeEvent {
#[derive(Debug, derive_more::From)]
pub enum NodeEvent<N: NodePrimitives> {
/// A sync pipeline event.
Pipeline(PipelineEvent),
/// A consensus engine event.
ConsensusEngine(BeaconConsensusEngineEvent),
ConsensusEngine(BeaconConsensusEngineEvent<N>),
/// A Consensus Layer health event.
ConsensusLayerHealth(ConsensusLayerHealthEvent),
/// A pruner event
Expand All @@ -367,44 +370,14 @@ pub enum NodeEvent {
Other(String),
}

impl From<PipelineEvent> for NodeEvent {
fn from(event: PipelineEvent) -> Self {
Self::Pipeline(event)
}
}

impl From<BeaconConsensusEngineEvent> for NodeEvent {
fn from(event: BeaconConsensusEngineEvent) -> Self {
Self::ConsensusEngine(event)
}
}

impl From<ConsensusLayerHealthEvent> for NodeEvent {
fn from(event: ConsensusLayerHealthEvent) -> Self {
Self::ConsensusLayerHealth(event)
}
}

impl From<PrunerEvent> for NodeEvent {
fn from(event: PrunerEvent) -> Self {
Self::Pruner(event)
}
}

impl From<StaticFileProducerEvent> for NodeEvent {
fn from(event: StaticFileProducerEvent) -> Self {
Self::StaticFileProducer(event)
}
}

/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events<E>(
pub async fn handle_events<E, N: NodePrimitives>(
peers_info: Option<Box<dyn PeersInfo>>,
latest_block_number: Option<BlockNumber>,
events: E,
) where
E: Stream<Item = NodeEvent> + Unpin,
E: Stream<Item = NodeEvent<N>> + Unpin,
{
let state = NodeState::new(peers_info, latest_block_number);

Expand All @@ -426,9 +399,9 @@ struct EventHandler<E> {
info_interval: Interval,
}

impl<E> Future for EventHandler<E>
impl<E, N: NodePrimitives> Future for EventHandler<E>
where
E: Stream<Item = NodeEvent> + Unpin,
E: Stream<Item = NodeEvent<N>> + Unpin,
{
type Output = ();

Expand Down

0 comments on commit 934fd1f

Please sign in to comment.