diff --git a/crates/ethereum-rpc/src/ethereum.rs b/crates/ethereum-rpc/src/ethereum.rs index 4db636be3..f9e129cc4 100644 --- a/crates/ethereum-rpc/src/ethereum.rs +++ b/crates/ethereum-rpc/src/ethereum.rs @@ -16,6 +16,7 @@ use tracing::instrument; use crate::gas_price::fee_history::FeeHistoryCacheConfig; use crate::gas_price::gas_oracle::{GasPriceOracle, GasPriceOracleConfig}; +use crate::subscription::SubscriptionManager; const MAX_TRACE_BLOCK: u32 = 1000; @@ -37,7 +38,7 @@ pub struct Ethereum { pub(crate) sequencer_client: Option, pub(crate) web3_client_version: String, pub(crate) trace_cache: Mutex, ByLength>>, - pub(crate) soft_confirmation_rx: Option>, + pub(crate) subscription_manager: Option, } impl Ethereum { @@ -62,6 +63,9 @@ impl Ethereum { let trace_cache = Mutex::new(LruMap::new(ByLength::new(MAX_TRACE_BLOCK))); + let subscription_manager = + soft_confirmation_rx.map(|rx| SubscriptionManager::new::(storage.clone(), rx)); + Self { da_service, gas_price_oracle, @@ -71,7 +75,7 @@ impl Ethereum { sequencer_client, web3_client_version: current_version, trace_cache, - soft_confirmation_rx, + subscription_manager, } } diff --git a/crates/ethereum-rpc/src/lib.rs b/crates/ethereum-rpc/src/lib.rs index f57d547a9..2e2b1e15c 100644 --- a/crates/ethereum-rpc/src/lib.rs +++ b/crates/ethereum-rpc/src/lib.rs @@ -6,7 +6,6 @@ mod trace; #[cfg(feature = "local")] pub use citrea_evm::DevSigner; use citrea_evm::Evm; -use subscription::handle_new_heads_subscription; pub use ethereum::{EthRpcConfig, Ethereum}; pub use gas_price::fee_history::FeeHistoryCacheConfig; pub use gas_price::gas_oracle::GasPriceOracleConfig; @@ -21,6 +20,7 @@ use serde_json::json; use sov_modules_api::utils::to_jsonrpsee_error_object; use sov_modules_api::WorkingSet; use sov_rollup_interface::services::da::DaService; +use subscription::handle_new_heads_subscription; use tokio::sync::broadcast; use trace::{debug_trace_by_block_number, handle_debug_trace_chain}; use tracing::info; diff --git a/crates/ethereum-rpc/src/subscription.rs b/crates/ethereum-rpc/src/subscription.rs index 3d028e93c..812d34643 100644 --- a/crates/ethereum-rpc/src/subscription.rs +++ b/crates/ethereum-rpc/src/subscription.rs @@ -9,18 +9,18 @@ use tokio::sync::broadcast; use crate::ethereum::Ethereum; -pub(crate) struct SubscriptionManager { - storage: C::Storage, +pub(crate) struct SubscriptionManager { new_heads_tx: broadcast::Sender, } -impl SubscriptionManager { - pub(crate) fn new(storage: C::Storage, soft_confirmation_rx: broadcast::Receiver) -> Self { +impl SubscriptionManager { + pub(crate) fn new( + storage: C::Storage, + soft_confirmation_rx: broadcast::Receiver, + ) -> Self { let mut soft_confirmation_rx = soft_confirmation_rx; - let storage_c = storage.clone(); let new_heads_tx = broadcast::channel(16).0; let new_heads_tx_c = new_heads_tx.clone(); - // let new_heads_tx_c = new_heads_tx.clone(); // Spawn the task that will listen for new soft confirmation heights // and send the corresponding ethereum block to subscribers tokio::spawn(async move { @@ -34,7 +34,7 @@ impl SubscriptionManager { continue; } - let mut working_set = WorkingSet::::new(storage_c.clone()); + let mut working_set = WorkingSet::::new(storage.clone()); let block = evm .get_block_by_number( Some(BlockNumberOrTag::Number(height)), @@ -48,10 +48,7 @@ impl SubscriptionManager { let _ = new_heads_tx_c.send(block); } }); - Self { - storage, - new_heads_tx, - } + Self { new_heads_tx } } pub(crate) fn subscribe_new_heads(&self) -> broadcast::Receiver { @@ -64,27 +61,17 @@ pub async fn handle_new_heads_subscription>, ) { let mut rx = ethereum - .soft_confirmation_rx + .subscription_manager .as_ref() .unwrap() - .resubscribe(); - let evm = Evm::::default(); + .subscribe_new_heads(); let subscription = pending.accept().await.unwrap(); tokio::spawn(async move { loop { - let Ok(block_number) = rx.recv().await else { + let Ok(block) = rx.recv().await else { // Connection closed return; }; - let mut working_set = WorkingSet::::new(ethereum.storage.clone()); - let block = evm - .get_block_by_number( - Some(BlockNumberOrTag::Number(block_number)), - None, - &mut working_set, - ) - .expect("Error querying block from evm") - .expect("Received signal but evm block is not found"); let msg = SubscriptionMessage::new( subscription.method_name(),