Skip to content

Commit

Permalink
Use SubscriptionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
yaziciahmet committed Jul 23, 2024
1 parent c1ad447 commit d577518
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 27 deletions.
8 changes: 6 additions & 2 deletions crates/ethereum-rpc/src/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tracing::instrument;

use crate::gas_price::fee_history::FeeHistoryCacheConfig;
use crate::gas_price::gas_oracle::{GasPriceOracle, GasPriceOracleConfig};
use crate::subscription::SubscriptionManager;

const MAX_TRACE_BLOCK: u32 = 1000;

Expand All @@ -37,7 +38,7 @@ pub struct Ethereum<C: sov_modules_api::Context, Da: DaService> {
pub(crate) sequencer_client: Option<SequencerClient>,
pub(crate) web3_client_version: String,
pub(crate) trace_cache: Mutex<LruMap<u64, Vec<GethTrace>, ByLength>>,
pub(crate) soft_confirmation_rx: Option<broadcast::Receiver<u64>>,
pub(crate) subscription_manager: Option<SubscriptionManager>,
}

impl<C: sov_modules_api::Context, Da: DaService> Ethereum<C, Da> {
Expand All @@ -62,6 +63,9 @@ impl<C: sov_modules_api::Context, Da: DaService> Ethereum<C, Da> {

let trace_cache = Mutex::new(LruMap::new(ByLength::new(MAX_TRACE_BLOCK)));

let subscription_manager =
soft_confirmation_rx.map(|rx| SubscriptionManager::new::<C>(storage.clone(), rx));

Self {
da_service,
gas_price_oracle,
Expand All @@ -71,7 +75,7 @@ impl<C: sov_modules_api::Context, Da: DaService> Ethereum<C, Da> {
sequencer_client,
web3_client_version: current_version,
trace_cache,
soft_confirmation_rx,
subscription_manager,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ethereum-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod trace;
#[cfg(feature = "local")]
pub use citrea_evm::DevSigner;
use citrea_evm::Evm;
use subscription::handle_new_heads_subscription;
pub use ethereum::{EthRpcConfig, Ethereum};
pub use gas_price::fee_history::FeeHistoryCacheConfig;
pub use gas_price::gas_oracle::GasPriceOracleConfig;
Expand All @@ -21,6 +20,7 @@ use serde_json::json;
use sov_modules_api::utils::to_jsonrpsee_error_object;
use sov_modules_api::WorkingSet;
use sov_rollup_interface::services::da::DaService;
use subscription::handle_new_heads_subscription;
use tokio::sync::broadcast;
use trace::{debug_trace_by_block_number, handle_debug_trace_chain};
use tracing::info;
Expand Down
35 changes: 11 additions & 24 deletions crates/ethereum-rpc/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ use tokio::sync::broadcast;

use crate::ethereum::Ethereum;

pub(crate) struct SubscriptionManager<C: sov_modules_api::Context> {
storage: C::Storage,
pub(crate) struct SubscriptionManager {
new_heads_tx: broadcast::Sender<RichBlock>,
}

impl<C: sov_modules_api::Context> SubscriptionManager<C> {
pub(crate) fn new(storage: C::Storage, soft_confirmation_rx: broadcast::Receiver<u64>) -> Self {
impl SubscriptionManager {
pub(crate) fn new<C: sov_modules_api::Context>(
storage: C::Storage,
soft_confirmation_rx: broadcast::Receiver<u64>,
) -> Self {
let mut soft_confirmation_rx = soft_confirmation_rx;
let storage_c = storage.clone();
let new_heads_tx = broadcast::channel(16).0;
let new_heads_tx_c = new_heads_tx.clone();
// let new_heads_tx_c = new_heads_tx.clone();
// Spawn the task that will listen for new soft confirmation heights
// and send the corresponding ethereum block to subscribers
tokio::spawn(async move {
Expand All @@ -34,7 +34,7 @@ impl<C: sov_modules_api::Context> SubscriptionManager<C> {
continue;
}

let mut working_set = WorkingSet::<C>::new(storage_c.clone());
let mut working_set = WorkingSet::<C>::new(storage.clone());
let block = evm
.get_block_by_number(
Some(BlockNumberOrTag::Number(height)),
Expand All @@ -48,10 +48,7 @@ impl<C: sov_modules_api::Context> SubscriptionManager<C> {
let _ = new_heads_tx_c.send(block);
}
});
Self {
storage,
new_heads_tx,
}
Self { new_heads_tx }
}

pub(crate) fn subscribe_new_heads(&self) -> broadcast::Receiver<RichBlock> {
Expand All @@ -64,27 +61,17 @@ pub async fn handle_new_heads_subscription<C: sov_modules_api::Context, Da: DaSe
ethereum: Arc<Ethereum<C, Da>>,
) {
let mut rx = ethereum
.soft_confirmation_rx
.subscription_manager
.as_ref()
.unwrap()
.resubscribe();
let evm = Evm::<C>::default();
.subscribe_new_heads();
let subscription = pending.accept().await.unwrap();
tokio::spawn(async move {
loop {
let Ok(block_number) = rx.recv().await else {
let Ok(block) = rx.recv().await else {
// Connection closed
return;
};
let mut working_set = WorkingSet::<C>::new(ethereum.storage.clone());
let block = evm
.get_block_by_number(
Some(BlockNumberOrTag::Number(block_number)),
None,
&mut working_set,
)
.expect("Error querying block from evm")
.expect("Received signal but evm block is not found");

let msg = SubscriptionMessage::new(
subscription.method_name(),
Expand Down

0 comments on commit d577518

Please sign in to comment.