From 13eed63d684252bc5d60e8c154c48d737773f7c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Yaz=C4=B1c=C4=B1?= <75089142+yaziciahmet@users.noreply.github.com> Date: Thu, 25 Jul 2024 20:12:09 +0200 Subject: [PATCH] Realtime Subscription RPCs (#916) * Separate rpc Ethereum context into its file * Separate debug trace * Carry subscriptions at the end * Create eth subscription handler signature * Pass tx to ledger db * Pass down the tx to rpc context * Send height after commit * Update new head handler fn signature * Finalize handling * Add test * unwrap to expect * Remove notifying from ledger db * Send new l2 block notif in sequencer and fullnode * Add to prover as well * Make tx optional in rpc * Use params sequence * Rename * Fix doc * Replace tx with rx in rpc * Rename file * IMplement SubscriptionManager * Use SubscriptionManager * Minor changes * Implemented logs subscription * Add log subscription test * Simplify logs subscription flow * Disable debug_subscribe if subscriptions disabled * Update configs * Rename * Add new conf params * UPdate log_matches_filter * recreate working set * Replace disable with enable * Rename config --- bin/citrea/src/eth.rs | 3 + bin/citrea/src/rollup/bitcoin.rs | 3 + bin/citrea/src/rollup/mock.rs | 3 + bin/citrea/src/rollup/mod.rs | 36 +- bin/citrea/src/test_rpc.rs | 2 + bin/citrea/tests/evm/mod.rs | 1 + bin/citrea/tests/evm/subscription.rs | 285 +++++++++ bin/citrea/tests/test_client/mod.rs | 48 +- bin/citrea/tests/test_helpers/mod.rs | 2 + crates/ethereum-rpc/src/ethereum.rs | 115 ++++ crates/ethereum-rpc/src/lib.rs | 567 ++++-------------- crates/ethereum-rpc/src/subscription.rs | 136 +++++ crates/ethereum-rpc/src/trace.rs | 309 ++++++++++ crates/evm/src/query.rs | 20 +- crates/evm/src/rpc_helpers/filter.rs | 5 +- crates/evm/src/rpc_helpers/responses.rs | 10 +- .../evm/src/smart_contracts/logs_contract.rs | 19 +- crates/evm/src/smart_contracts/mod.rs | 2 +- crates/fullnode/src/runner.rs | 10 +- .../tests/runner_initialization_tests.rs | 4 + crates/fullnode/tests/runner_reorg_tests.rs | 4 + crates/prover/src/runner.rs | 10 +- crates/sequencer/src/sequencer.rs | 10 +- .../full-node/db/sov-db/src/ledger_db/mod.rs | 5 +- .../full-node/sov-stf-runner/src/config.rs | 20 + .../sov-modules-rollup-blueprint/src/lib.rs | 2 + .../bitcoin-regtest/prover_rollup_config.toml | 1 + .../bitcoin-regtest/rollup_config.toml | 2 + .../sequencer_rollup_config.toml | 2 + resources/configs/devnet/rollup_config.toml | 2 + .../mock-dockerized/rollup_config.toml | 2 + resources/configs/mock/rollup_config.toml | 2 + .../configs/mock/sequencer_rollup_config.toml | 2 + 33 files changed, 1150 insertions(+), 494 deletions(-) create mode 100644 bin/citrea/tests/evm/subscription.rs create mode 100644 crates/ethereum-rpc/src/ethereum.rs create mode 100644 crates/ethereum-rpc/src/subscription.rs create mode 100644 crates/ethereum-rpc/src/trace.rs diff --git a/bin/citrea/src/eth.rs b/bin/citrea/src/eth.rs index c09f568f7..289154c2b 100644 --- a/bin/citrea/src/eth.rs +++ b/bin/citrea/src/eth.rs @@ -6,6 +6,7 @@ use sov_modules_api::default_context::DefaultContext; use sov_prover_storage_manager::SnapshotManager; use sov_rollup_interface::services::da::DaService; use sov_state::ProverStorage; +use tokio::sync::broadcast; // register ethereum methods. pub(crate) fn register_ethereum( @@ -13,6 +14,7 @@ pub(crate) fn register_ethereum( storage: ProverStorage, methods: &mut jsonrpsee::RpcModule<()>, sequencer_client_url: Option, + soft_confirmation_rx: Option>, ) -> Result<(), anyhow::Error> { let eth_rpc_config = { let eth_signer = eth_dev_signer(); @@ -28,6 +30,7 @@ pub(crate) fn register_ethereum( eth_rpc_config, storage, sequencer_client_url, + soft_confirmation_rx, ); methods .merge(ethereum_rpc) diff --git a/bin/citrea/src/rollup/bitcoin.rs b/bin/citrea/src/rollup/bitcoin.rs index 3326add5d..0f35a67c2 100644 --- a/bin/citrea/src/rollup/bitcoin.rs +++ b/bin/citrea/src/rollup/bitcoin.rs @@ -18,6 +18,7 @@ use sov_rollup_interface::da::DaVerifier; use sov_rollup_interface::zk::{Zkvm, ZkvmHost}; use sov_state::{DefaultStorageSpec, Storage, ZkStorage}; use sov_stf_runner::{FullNodeConfig, ProverConfig}; +use tokio::sync::broadcast; use tracing::instrument; use crate::CitreaRollupBlueprint; @@ -61,6 +62,7 @@ impl RollupBlueprint for BitcoinRollup { ledger_db: &LedgerDB, da_service: &Self::DaService, sequencer_client_url: Option, + soft_confirmation_rx: Option>, ) -> Result, anyhow::Error> { // unused inside register RPC let sov_sequencer = Address::new([0; 32]); @@ -77,6 +79,7 @@ impl RollupBlueprint for BitcoinRollup { storage.clone(), &mut rpc_methods, sequencer_client_url, + soft_confirmation_rx, )?; Ok(rpc_methods) diff --git a/bin/citrea/src/rollup/mock.rs b/bin/citrea/src/rollup/mock.rs index b0991f0ea..463f81805 100644 --- a/bin/citrea/src/rollup/mock.rs +++ b/bin/citrea/src/rollup/mock.rs @@ -14,6 +14,7 @@ use sov_prover_storage_manager::ProverStorageManager; use sov_rollup_interface::zk::{Zkvm, ZkvmHost}; use sov_state::{DefaultStorageSpec, Storage, ZkStorage}; use sov_stf_runner::{FullNodeConfig, ProverConfig}; +use tokio::sync::broadcast; use crate::CitreaRollupBlueprint; @@ -55,6 +56,7 @@ impl RollupBlueprint for MockDemoRollup { ledger_db: &LedgerDB, da_service: &Self::DaService, sequencer_client_url: Option, + soft_confirmation_rx: Option>, ) -> Result, anyhow::Error> { // TODO set the sequencer address let sequencer = Address::new([0; 32]); @@ -71,6 +73,7 @@ impl RollupBlueprint for MockDemoRollup { storage.clone(), &mut rpc_methods, sequencer_client_url, + soft_confirmation_rx, )?; Ok(rpc_methods) diff --git a/bin/citrea/src/rollup/mod.rs b/bin/citrea/src/rollup/mod.rs index 83238e722..a36085ebb 100644 --- a/bin/citrea/src/rollup/mod.rs +++ b/bin/citrea/src/rollup/mod.rs @@ -10,6 +10,7 @@ use sov_modules_rollup_blueprint::RollupBlueprint; use sov_modules_stf_blueprint::{Runtime as RuntimeTrait, StfBlueprint}; use sov_state::storage::NativeStorage; use sov_stf_runner::{FullNodeConfig, InitVariant, ProverConfig}; +use tokio::sync::broadcast; use tracing::instrument; mod bitcoin; mod mock; @@ -43,9 +44,21 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let mut storage_manager = self.create_storage_manager(&rollup_config)?; let prover_storage = storage_manager.create_finalized_storage()?; + let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10); + // If subscriptions disabled, pass None + let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions { + Some(soft_confirmation_rx) + } else { + None + }; // TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218) - let rpc_methods = - self.create_rpc_methods(&prover_storage, &ledger_db, &da_service, None)?; + let rpc_methods = self.create_rpc_methods( + &prover_storage, + &ledger_db, + &da_service, + None, + soft_confirmation_rx, + )?; let native_stf = StfBlueprint::new(); @@ -75,6 +88,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { rollup_config.public_keys, ledger_db, rollup_config.rpc, + soft_confirmation_tx, ) .unwrap(); @@ -110,12 +124,20 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let prover_storage = storage_manager.create_finalized_storage()?; let runner_config = rollup_config.runner.expect("Runner config is missing"); + let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10); + // If subscriptions disabled, pass None + let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions { + Some(soft_confirmation_rx) + } else { + None + }; // TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218) let rpc_methods = self.create_rpc_methods( &prover_storage, &ledger_db, &da_service, Some(runner_config.sequencer_client_url.clone()), + soft_confirmation_rx, )?; let native_stf = StfBlueprint::new(); @@ -149,6 +171,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { init_variant, code_commitment, rollup_config.sync_blocks_count, + soft_confirmation_tx, )?; Ok(FullNode { @@ -187,6 +210,13 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let mut storage_manager = self.create_storage_manager(&rollup_config)?; let prover_storage = storage_manager.create_finalized_storage()?; + let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10); + // If subscriptions disabled, pass None + let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions { + Some(soft_confirmation_rx) + } else { + None + }; let runner_config = rollup_config.runner.expect("Runner config is missing"); // TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218) let rpc_methods = self.create_rpc_methods( @@ -194,6 +224,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { &ledger_db, &da_service, Some(runner_config.sequencer_client_url.clone()), + soft_confirmation_rx, )?; let native_stf = StfBlueprint::new(); @@ -229,6 +260,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { Some(prover_config), code_commitment, rollup_config.sync_blocks_count, + soft_confirmation_tx, )?; Ok(Prover { diff --git a/bin/citrea/src/test_rpc.rs b/bin/citrea/src/test_rpc.rs index a68cf781b..cead7b8cf 100644 --- a/bin/citrea/src/test_rpc.rs +++ b/bin/citrea/src/test_rpc.rs @@ -99,6 +99,8 @@ fn test_helper( max_request_body_size: 10 * 1024 * 1024, max_response_body_size: 10 * 1024 * 1024, batch_requests_limit: 50, + enable_subscriptions: true, + max_subscriptions_per_connection: 100, }; queries_test_runner(test_queries, rpc_config).await; diff --git a/bin/citrea/tests/evm/mod.rs b/bin/citrea/tests/evm/mod.rs index 1e368fb4c..322dd82bb 100644 --- a/bin/citrea/tests/evm/mod.rs +++ b/bin/citrea/tests/evm/mod.rs @@ -20,6 +20,7 @@ use crate::{ mod archival_state; mod gas_price; +mod subscription; mod tracing; #[tokio::test(flavor = "multi_thread")] diff --git a/bin/citrea/tests/evm/subscription.rs b/bin/citrea/tests/evm/subscription.rs new file mode 100644 index 000000000..821cff622 --- /dev/null +++ b/bin/citrea/tests/evm/subscription.rs @@ -0,0 +1,285 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use alloy_primitives::FixedBytes; +use alloy_sol_types::SolEvent; +use citrea_evm::smart_contracts::{AnotherLogEvent, LogEvent, LogsContract, TestContract}; +use citrea_evm::{Filter, LogResponse}; +// use citrea::initialize_logging; +use citrea_stf::genesis_config::GenesisPaths; +use reth_primitives::{keccak256, Address}; +use tokio::time::sleep; + +use crate::evm::make_test_client; +use crate::test_client::TestClient; +use crate::test_helpers::{start_rollup, tempdir_with_children, wait_for_l2_block, NodeMode}; +use crate::{ + DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, DEFAULT_MIN_SOFT_CONFIRMATIONS_PER_COMMITMENT, + TEST_DATA_GENESIS_PATH, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_eth_subscriptions() -> Result<(), Box> { + let storage_dir = tempdir_with_children(&["DA", "sequencer"]); + let da_db_dir = storage_dir.path().join("DA").to_path_buf(); + let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); + + let (port_tx, port_rx) = tokio::sync::oneshot::channel(); + let da_db_dir_cloned = da_db_dir.clone(); + let seq_task = tokio::spawn(async { + // Don't provide a prover since the EVM is not currently provable + start_rollup( + port_tx, + GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), + None, + NodeMode::SequencerNode, + sequencer_db_dir, + da_db_dir_cloned, + DEFAULT_MIN_SOFT_CONFIRMATIONS_PER_COMMITMENT, + true, + None, + None, + Some(true), + DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, + ) + .await; + }); + + // Wait for rollup task to start: + let port = port_rx.await.unwrap(); + + let test_client = make_test_client(port).await; + + test_client.send_publish_batch_request().await; + wait_for_l2_block(&test_client, 1, None).await; + + // Spawn newHeads subscriber + let new_block_rx = test_client.subscribe_new_heads().await; + let last_received_block = Arc::new(Mutex::new(None)); + let last_received_block_clone = last_received_block.clone(); + tokio::spawn(async move { + loop { + let Ok(block) = new_block_rx.recv() else { + return; + }; + *(last_received_block_clone.lock().unwrap()) = Some(block); + } + }); + + // Produce an empty block and receive it from subscription + { + test_client.send_publish_batch_request().await; + wait_for_l2_block(&test_client, 2, None).await; + // Sleep in case of subscription delay + sleep(Duration::from_millis(100)).await; + + let block = last_received_block.lock().unwrap(); + let block = block.as_ref().unwrap(); + assert_eq!(block.header.number, Some(2)); + assert!(block.transactions.is_empty()); + } + + // Produce a block with 1 send transaction and receive it from subscription + { + let pending_tx = test_client + .send_eth(Address::random(), None, None, None, 10000) + .await + .unwrap(); + let tx_hash = *pending_tx.tx_hash(); + + test_client.send_publish_batch_request().await; + wait_for_l2_block(&test_client, 3, None).await; + // Sleep in case of subscription delay + sleep(Duration::from_millis(100)).await; + + let block = last_received_block.lock().unwrap(); + let block = block.as_ref().unwrap(); + assert_eq!(block.header.number, Some(3)); + assert_eq!(block.transactions.len(), 1); + assert_eq!(block.transactions.hashes().last().unwrap().clone(), tx_hash); + } + + // Deploy 2 LogsContract + let (logs_contract1, logs_contract_address1, logs_contract2, logs_contract_address2) = { + let logs_contract1 = LogsContract::default(); + let deploy_logs_contract_req1 = test_client + .deploy_contract(logs_contract1.byte_code(), None) + .await?; + let logs_contract2 = LogsContract::default(); + let deploy_logs_contract_req2 = test_client + .deploy_contract(logs_contract2.byte_code(), None) + .await?; + + test_client.send_publish_batch_request().await; + + let logs_contract_address1 = deploy_logs_contract_req1 + .get_receipt() + .await? + .contract_address + .unwrap(); + let logs_contract_address2 = deploy_logs_contract_req2 + .get_receipt() + .await? + .contract_address + .unwrap(); + + ( + logs_contract1, + logs_contract_address1, + logs_contract2, + logs_contract_address2, + ) + }; + + // Spawn logs subscriber with no filter + let logs_by_tx_no_filter = spawn_logs_subscriber(&test_client, Filter::default()).await; + // Spawn logs subscriber with logs_contract_address1 filter + let mut filter = Filter::default(); + filter.address.0.insert(logs_contract_address1); + let logs_by_tx_address1_filter = spawn_logs_subscriber(&test_client, filter).await; + // Spawn logs subscriber with logs_contract_address2 filter and a topic + let mut filter = Filter::default(); + filter.address.0.insert(logs_contract_address2); + filter.topics[0].0.insert(AnotherLogEvent::SIGNATURE_HASH); + let logs_by_tx_address2_filter = spawn_logs_subscriber(&test_client, filter).await; + + // Call logs_contract1 and logs_contract2 contracts once and observe that + // each log subscription receives the respective events + { + // Send transaction to 1st contract + let test_log_msg: String = "DRAGONBALLZ".into(); + let pending_tx1 = test_client + .contract_transaction( + logs_contract_address1, + logs_contract1.publish_event(test_log_msg.clone()), + None, + ) + .await; + let tx_hash1 = *pending_tx1.tx_hash(); + // Send transaction to 2nd contract + let pending_tx2 = test_client + .contract_transaction( + logs_contract_address2, + logs_contract2.publish_event(test_log_msg.clone()), + None, + ) + .await; + let tx_hash2 = *pending_tx2.tx_hash(); + + // Wait for them to be mined + test_client.send_publish_batch_request().await; + wait_for_l2_block(&test_client, 5, None).await; + // Sleep in case of subscription delay + sleep(Duration::from_millis(100)).await; + + // Observe that we received a block and it contains 2 transactions + let block = last_received_block.lock().unwrap(); + let block = block.as_ref().unwrap(); + let mut tx_hashes = block.transactions.hashes(); + assert_eq!(block.header.number, Some(5)); + assert_eq!(block.transactions.len(), 2); + assert_eq!(tx_hashes.next().unwrap().clone(), tx_hash1); + assert_eq!(tx_hashes.next().unwrap().clone(), tx_hash2); + + { + // Observe that no filter logs subscription received all 4 events + let logs_by_tx_no_filter = logs_by_tx_no_filter.lock().unwrap(); + let (log_payload1, another_log_payload1) = + parse_log_contract_logs(logs_by_tx_no_filter.get(&tx_hash1).unwrap()); + let (log_payload2, another_log_payload2) = + parse_log_contract_logs(logs_by_tx_no_filter.get(&tx_hash2).unwrap()); + + // Verify tx1 events payload + assert_eq!(log_payload1.address, logs_contract_address1); + assert_eq!(log_payload1.sender, test_client.from_addr); + assert_eq!(log_payload1.contractAddress, logs_contract_address1); + assert_eq!(log_payload1.senderMessage, keccak256(test_log_msg.clone())); + assert_eq!(log_payload1.message, "Hello World!"); + assert_eq!(another_log_payload1.contractAddress, logs_contract_address1); + + // Verify tx2 events payload + assert_eq!(log_payload2.address, logs_contract_address2); + assert_eq!(log_payload2.sender, test_client.from_addr); + assert_eq!(log_payload2.contractAddress, logs_contract_address2); + assert_eq!(log_payload2.senderMessage, keccak256(test_log_msg.clone())); + assert_eq!(log_payload2.message, "Hello World!"); + assert_eq!(another_log_payload2.contractAddress, logs_contract_address2); + } + + { + // Observe that address1 filtered subscription received only 2 events from contract1 + let logs_by_tx_address1_filter = logs_by_tx_address1_filter.lock().unwrap(); + assert!(logs_by_tx_address1_filter.get(&tx_hash2).is_none()); + + let (log_payload1, another_log_payload1) = + parse_log_contract_logs(logs_by_tx_address1_filter.get(&tx_hash1).unwrap()); + + // Verify tx1 events payload + assert_eq!(log_payload1.address, logs_contract_address1); + assert_eq!(log_payload1.sender, test_client.from_addr); + assert_eq!(log_payload1.contractAddress, logs_contract_address1); + assert_eq!(log_payload1.senderMessage, keccak256(test_log_msg.clone())); + assert_eq!(log_payload1.message, "Hello World!"); + assert_eq!(another_log_payload1.contractAddress, logs_contract_address1); + } + + { + // Observe that address1 and topic filtered subscription received only 1 event from contract1 + let logs_by_tx_address2_filter = logs_by_tx_address2_filter.lock().unwrap(); + assert!(logs_by_tx_address2_filter.get(&tx_hash1).is_none()); + + let logs = logs_by_tx_address2_filter.get(&tx_hash2).unwrap(); + assert_eq!(logs.len(), 1); + + let log: alloy_primitives::Log = logs[0].clone().try_into().unwrap(); + let another_log_payload = LogsContract::decode_another_log_event(&log).unwrap(); + + // Verify tx1 events payload + assert_eq!(another_log_payload.contractAddress, logs_contract_address2); + } + } + + seq_task.abort(); + Ok(()) +} + +async fn spawn_logs_subscriber( + client: &TestClient, + filter: Filter, +) -> Arc, Vec>>> { + let logs_rx = client.subscribe_logs(filter).await; + let logs_by_tx = Arc::new(Mutex::new(HashMap::new())); + let logs_by_tx_c = logs_by_tx.clone(); + tokio::spawn(async move { + loop { + let Ok(log) = logs_rx.recv() else { + return; + }; + let mut logs_by_tx_c = logs_by_tx_c.lock().unwrap(); + let logs = logs_by_tx_c + .entry(log.transaction_hash.unwrap()) + .or_insert(vec![]); + logs.push(log); + } + }); + + logs_by_tx +} + +fn parse_log_contract_logs( + logs: &[LogResponse], +) -> ( + alloy_primitives::Log, + alloy_primitives::Log, +) { + assert_eq!(logs.len(), 2); + + let log1: alloy_primitives::Log = logs[0].clone().try_into().unwrap(); + let log2: alloy_primitives::Log = logs[1].clone().try_into().unwrap(); + + let log_payload = LogsContract::decode_log_event(&log1).unwrap(); + let another_log_payload = LogsContract::decode_another_log_event(&log2).unwrap(); + + (log_payload, another_log_payload) +} diff --git a/bin/citrea/tests/test_client/mod.rs b/bin/citrea/tests/test_client/mod.rs index 98f0cd2e8..11c173e25 100644 --- a/bin/citrea/tests/test_client/mod.rs +++ b/bin/citrea/tests/test_client/mod.rs @@ -1,5 +1,6 @@ use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc; use std::time::Duration; use alloy::providers::network::{Ethereum, EthereumSigner}; @@ -7,7 +8,7 @@ use alloy::providers::{PendingTransactionBuilder, Provider as AlloyProvider, Pro use alloy::rpc::types::eth::{Block, Transaction, TransactionReceipt, TransactionRequest}; use alloy::signers::wallet::LocalWallet; use alloy::transports::http::{Http, HyperClient}; -use citrea_evm::LogResponse; +use citrea_evm::{Filter, LogResponse}; use ethereum_rpc::CitreaStatus; use jsonrpsee::core::client::{ClientT, SubscriptionClientT}; use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; @@ -16,6 +17,7 @@ use jsonrpsee::ws_client::{PingConfig, WsClient, WsClientBuilder}; use reth_primitives::{Address, BlockId, BlockNumberOrTag, Bytes, TxHash, TxKind, B256, U256, U64}; // use reth_rpc_types::TransactionReceipt; use reth_rpc_types::trace::geth::{GethDebugTracingOptions, GethTrace}; +use reth_rpc_types::RichBlock; use sequencer_client::GetSoftBatchResponse; use sov_rollup_interface::rpc::{ LastVerifiedProofResponse, ProofResponse, SequencerCommitmentResponse, SoftBatchResponse, @@ -691,6 +693,50 @@ impl TestClient { traces.into_iter().flatten().collect() } + pub(crate) async fn subscribe_new_heads(&self) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(); + let mut subscription = self + .ws_client + .subscribe("eth_subscribe", rpc_params!["newHeads"], "eth_unsubscribe") + .await + .unwrap(); + + tokio::spawn(async move { + loop { + let Some(Ok(block)) = subscription.next().await else { + return; + }; + tx.send(block).unwrap(); + } + }); + + rx + } + + pub(crate) async fn subscribe_logs(&self, filter: Filter) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(); + let mut subscription = self + .ws_client + .subscribe( + "eth_subscribe", + rpc_params!["logs", filter], + "eth_unsubscribe", + ) + .await + .unwrap(); + + tokio::spawn(async move { + loop { + let Some(Ok(log)) = subscription.next().await else { + return; + }; + tx.send(log).unwrap(); + } + }); + + rx + } + pub(crate) async fn eth_block_number(&self) -> u64 { let block_number: U256 = self .http_client diff --git a/bin/citrea/tests/test_helpers/mod.rs b/bin/citrea/tests/test_helpers/mod.rs index b546bb0b4..f9225298e 100644 --- a/bin/citrea/tests/test_helpers/mod.rs +++ b/bin/citrea/tests/test_helpers/mod.rs @@ -143,6 +143,8 @@ pub fn create_default_rollup_config( max_request_body_size: 10 * 1024 * 1024, max_response_body_size: 10 * 1024 * 1024, batch_requests_limit: 50, + enable_subscriptions: true, + max_subscriptions_per_connection: 100, }, runner: match node_mode { NodeMode::FullNode(socket_addr) | NodeMode::Prover(socket_addr) => Some(RunnerConfig { diff --git a/crates/ethereum-rpc/src/ethereum.rs b/crates/ethereum-rpc/src/ethereum.rs new file mode 100644 index 000000000..f9e129cc4 --- /dev/null +++ b/crates/ethereum-rpc/src/ethereum.rs @@ -0,0 +1,115 @@ +use std::sync::Mutex; + +#[cfg(feature = "local")] +use citrea_evm::DevSigner; +use citrea_evm::Evm; +use reth_primitives::U256; +use reth_rpc_types::trace::geth::GethTrace; +use rustc_version_runtime::version; +use schnellru::{ByLength, LruMap}; +use sequencer_client::SequencerClient; +use sov_modules_api::WorkingSet; +use sov_rollup_interface::services::da::DaService; +use sov_rollup_interface::CITREA_VERSION; +use tokio::sync::broadcast; +use tracing::instrument; + +use crate::gas_price::fee_history::FeeHistoryCacheConfig; +use crate::gas_price::gas_oracle::{GasPriceOracle, GasPriceOracleConfig}; +use crate::subscription::SubscriptionManager; + +const MAX_TRACE_BLOCK: u32 = 1000; + +#[derive(Clone)] +pub struct EthRpcConfig { + pub gas_price_oracle_config: GasPriceOracleConfig, + pub fee_history_cache_config: FeeHistoryCacheConfig, + #[cfg(feature = "local")] + pub eth_signer: DevSigner, +} + +pub struct Ethereum { + #[allow(dead_code)] + pub(crate) da_service: Da, + pub(crate) gas_price_oracle: GasPriceOracle, + #[cfg(feature = "local")] + pub(crate) eth_signer: DevSigner, + pub(crate) storage: C::Storage, + pub(crate) sequencer_client: Option, + pub(crate) web3_client_version: String, + pub(crate) trace_cache: Mutex, ByLength>>, + pub(crate) subscription_manager: Option, +} + +impl Ethereum { + pub(crate) fn new( + da_service: Da, + gas_price_oracle_config: GasPriceOracleConfig, + fee_history_cache_config: FeeHistoryCacheConfig, + #[cfg(feature = "local")] eth_signer: DevSigner, + storage: C::Storage, + sequencer_client: Option, + soft_confirmation_rx: Option>, + ) -> Self { + let evm = Evm::::default(); + let gas_price_oracle = + GasPriceOracle::new(evm, gas_price_oracle_config, fee_history_cache_config); + + let rollup = "citrea"; + let arch = std::env::consts::ARCH; + let rustc_v = version(); + + let current_version = format!("{}/{}/{}/rust-{}", rollup, CITREA_VERSION, arch, rustc_v); + + let trace_cache = Mutex::new(LruMap::new(ByLength::new(MAX_TRACE_BLOCK))); + + let subscription_manager = + soft_confirmation_rx.map(|rx| SubscriptionManager::new::(storage.clone(), rx)); + + Self { + da_service, + gas_price_oracle, + #[cfg(feature = "local")] + eth_signer, + storage, + sequencer_client, + web3_client_version: current_version, + trace_cache, + subscription_manager, + } + } + + #[instrument(level = "trace", skip_all)] + pub(crate) async fn max_fee_per_gas(&self, working_set: &mut WorkingSet) -> (U256, U256) { + let suggested_tip = self + .gas_price_oracle + .suggest_tip_cap(working_set) + .await + .unwrap(); + + let evm = Evm::::default(); + let base_fee = evm + .get_block_by_number(None, None, working_set) + .unwrap() + .unwrap() + .header + .base_fee_per_gas + .unwrap_or_default(); + + (U256::from(base_fee), U256::from(suggested_tip)) + } + + // fn make_raw_tx( + // &self, + // raw_tx: RlpEvmTransaction, + // ) -> Result<(B256, Vec), jsonrpsee::core::RegisterMethodError> { + // let signed_transaction: RethTransactionSignedNoHash = raw_tx.clone().try_into()?; + + // let tx_hash = signed_transaction.hash(); + + // let tx = CallMessage { txs: vec![raw_tx] }; + // let message = as EncodeCall>>::encode_call(tx); + + // Ok((B256::from(tx_hash), message)) + // } +} diff --git a/crates/ethereum-rpc/src/lib.rs b/crates/ethereum-rpc/src/lib.rs index 433f12305..5ad63eade 100644 --- a/crates/ethereum-rpc/src/lib.rs +++ b/crates/ethereum-rpc/src/lib.rs @@ -1,43 +1,29 @@ +mod ethereum; mod gas_price; - -use std::collections::BTreeMap; -use std::process::Command; -use std::sync::{Arc, Mutex}; +mod subscription; +mod trace; #[cfg(feature = "local")] pub use citrea_evm::DevSigner; -use citrea_evm::Evm; +use citrea_evm::{Evm, Filter}; +pub use ethereum::{EthRpcConfig, Ethereum}; pub use gas_price::fee_history::FeeHistoryCacheConfig; -use gas_price::gas_oracle::GasPriceOracle; pub use gas_price::gas_oracle::GasPriceOracleConfig; -use jsonrpsee::types::{ErrorObjectOwned, ParamsSequence}; -use jsonrpsee::{PendingSubscriptionSink, RpcModule, SubscriptionMessage}; +use jsonrpsee::types::ErrorObjectOwned; +use jsonrpsee::RpcModule; use reth_primitives::{keccak256, BlockNumberOrTag, Bytes, B256, U256}; use reth_rpc::eth::error::EthApiError; -use reth_rpc_types::trace::geth::{ - CallConfig, CallFrame, FourByteFrame, GethDebugBuiltInTracerType, GethDebugTracerConfig, - GethDebugTracerType, GethDebugTracingOptions, GethTrace, NoopFrame, -}; +use reth_rpc_types::trace::geth::{GethDebugTracingOptions, GethTrace}; use reth_rpc_types::{FeeHistory, Index}; -use rustc_version_runtime::version; -use schnellru::{ByLength, LruMap}; use sequencer_client::SequencerClient; use serde_json::json; use sov_modules_api::utils::to_jsonrpsee_error_object; -use sov_modules_api::{Context, WorkingSet}; +use sov_modules_api::WorkingSet; use sov_rollup_interface::services::da::DaService; -use sov_rollup_interface::CITREA_VERSION; -use tracing::{error, info, instrument}; - -const MAX_TRACE_BLOCK: u32 = 1000; - -#[derive(Clone)] -pub struct EthRpcConfig { - pub gas_price_oracle_config: GasPriceOracleConfig, - pub fee_history_cache_config: FeeHistoryCacheConfig, - #[cfg(feature = "local")] - pub eth_signer: DevSigner, -} +use subscription::{handle_logs_subscription, handle_new_heads_subscription}; +use tokio::sync::broadcast; +use trace::{debug_trace_by_block_number, handle_debug_trace_chain}; +use tracing::info; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] @@ -58,6 +44,7 @@ pub fn get_ethereum_rpc( eth_rpc_config: EthRpcConfig, storage: C::Storage, sequencer_client_url: Option, + soft_confirmation_rx: Option>, ) -> RpcModule> { // Unpack config let EthRpcConfig { @@ -69,6 +56,7 @@ pub fn get_ethereum_rpc( // If the node does not have a sequencer client, then it is the sequencer. let is_sequencer = sequencer_client_url.is_none(); + let enable_subscriptions = soft_confirmation_rx.is_some(); // If the running node is a full node rpc context should also have sequencer client so that it can send txs to sequencer let mut rpc = RpcModule::new(Ethereum::new( @@ -79,100 +67,19 @@ pub fn get_ethereum_rpc( eth_signer, storage, sequencer_client_url.map(SequencerClient::new), + soft_confirmation_rx, )); - register_rpc_methods(&mut rpc, is_sequencer).expect("Failed to register ethereum RPC methods"); + register_rpc_methods(&mut rpc, is_sequencer, enable_subscriptions) + .expect("Failed to register ethereum RPC methods"); rpc } -pub struct Ethereum { - #[allow(dead_code)] - da_service: Da, - gas_price_oracle: GasPriceOracle, - #[cfg(feature = "local")] - eth_signer: DevSigner, - storage: C::Storage, - sequencer_client: Option, - web3_client_version: String, - trace_cache: Mutex, ByLength>>, -} - -impl Ethereum { - fn new( - da_service: Da, - gas_price_oracle_config: GasPriceOracleConfig, - fee_history_cache_config: FeeHistoryCacheConfig, - #[cfg(feature = "local")] eth_signer: DevSigner, - storage: C::Storage, - sequencer_client: Option, - ) -> Self { - let evm = Evm::::default(); - let gas_price_oracle = - GasPriceOracle::new(evm, gas_price_oracle_config, fee_history_cache_config); - - let rollup = "citrea"; - let arch = std::env::consts::ARCH; - let rustc_v = version(); - - let current_version = format!("{}/{}/{}/rust-{}", rollup, CITREA_VERSION, arch, rustc_v); - - let trace_cache = Mutex::new(LruMap::new(ByLength::new(MAX_TRACE_BLOCK))); - - Self { - da_service, - gas_price_oracle, - #[cfg(feature = "local")] - eth_signer, - storage, - sequencer_client, - web3_client_version: current_version, - trace_cache, - } - } -} - -impl Ethereum { - #[instrument(level = "trace", skip_all)] - async fn max_fee_per_gas(&self, working_set: &mut WorkingSet) -> (U256, U256) { - let suggested_tip = self - .gas_price_oracle - .suggest_tip_cap(working_set) - .await - .unwrap(); - - let evm = Evm::::default(); - let base_fee = evm - .get_block_by_number(None, None, working_set) - .unwrap() - .unwrap() - .header - .base_fee_per_gas - .unwrap_or_default(); - - (U256::from(base_fee), U256::from(suggested_tip)) - } -} - -// impl Ethereum { -// fn make_raw_tx( -// &self, -// raw_tx: RlpEvmTransaction, -// ) -> Result<(B256, Vec), jsonrpsee::core::RegisterMethodError> { -// let signed_transaction: RethTransactionSignedNoHash = raw_tx.clone().try_into()?; - -// let tx_hash = signed_transaction.hash(); - -// let tx = CallMessage { txs: vec![raw_tx] }; -// let message = as EncodeCall>>::encode_call(tx); - -// Ok((B256::from(tx_hash), message)) -// } -// } - fn register_rpc_methods( rpc: &mut RpcModule>, // Checks wether the running node is a sequencer or not, if it is not a sequencer it should also have methods like eth_sendRawTransaction here. is_sequencer: bool, + enable_subscriptions: bool, ) -> Result<(), jsonrpsee::core::RegisterMethodError> { rpc.register_async_method("web3_clientVersion", |_, ethereum| async move { info!("eth module: web3_clientVersion"); @@ -577,34 +484,6 @@ fn register_rpc_methods( }, )?; - rpc.register_subscription( - "debug_subscribe", - "debug_subscription", - "debug_unsubscribe", - |parameters, pending, ethereum| async move { - let mut params = parameters.sequence(); - - let topic: String = match params.next() { - Ok(v) => v, - Err(err) => { - pending.reject(err).await; - return Ok(()); - } - }; - match topic.as_str() { - "traceChain" => handle_debug_trace_chain(params, pending, ethereum).await, - _ => { - pending - .reject(EthApiError::Unsupported("Unsupported subscription topic")) - .await; - return Ok(()); - } - }; - - Ok(()) - }, - )?; - rpc.register_async_method("txpool_content", |_, _| async move { info!("eth module: txpool_content"); @@ -767,6 +646,89 @@ fn register_rpc_methods( )?; } + if enable_subscriptions { + rpc.register_subscription( + "debug_subscribe", + "debug_subscription", + "debug_unsubscribe", + |parameters, pending, ethereum| async move { + let mut params = parameters.sequence(); + + let topic: String = match params.next() { + Ok(v) => v, + Err(err) => { + pending.reject(err).await; + return Ok(()); + } + }; + match topic.as_str() { + "traceChain" => handle_debug_trace_chain(params, pending, ethereum).await, + _ => { + pending + .reject(EthApiError::Unsupported("Unsupported subscription topic")) + .await; + return Ok(()); + } + }; + + Ok(()) + }, + )?; + + rpc.register_subscription( + "eth_subscribe", + "eth_subscription", + "eth_unsubscribe", + |parameters, pending, ethereum| async move { + let mut params = parameters.sequence(); + + let topic: String = match params.next() { + Ok(v) => v, + Err(err) => { + pending.reject(err).await; + return Ok(()); + } + }; + match topic.as_str() { + "newHeads" => { + let subscription = pending.accept().await.unwrap(); + let rx = ethereum + .subscription_manager + .as_ref() + .unwrap() + .subscribe_new_heads(); + handle_new_heads_subscription(subscription, rx).await + } + "logs" => { + let filter: Filter = match params.next() { + Ok(v) => v, + Err(err) => { + pending.reject(err).await; + return Ok(()); + } + }; + let subscription = pending.accept().await.unwrap(); + let rx = ethereum + .subscription_manager + .as_ref() + .unwrap() + .subscribe_logs() + .await; + handle_logs_subscription(subscription, rx, filter).await + } + _ => { + pending + .reject(EthApiError::Unsupported("Unsupported subscription topic")) + .await; + return Ok(()); + } + }; + + Ok(()) + }, + )?; + } + Ok(()) } @@ -803,328 +765,3 @@ fn register_rpc_methods( // (call_request, gas_price, max_fee_per_gas) // } - -pub fn get_latest_git_tag() -> Result { - let latest_tag_commit = Command::new("git") - .args(["rev-list", "--tags", "--max-count=1"]) - .output() - .map_err(|e| to_jsonrpsee_error_object("FULL_NODE_ERROR", e))?; - - if !latest_tag_commit.status.success() { - return Err(to_jsonrpsee_error_object( - "Failure", - "Failed to get version", - )); - } - - let latest_tag_commit = String::from_utf8_lossy(&latest_tag_commit.stdout) - .trim() - .to_string(); - - let latest_tag = Command::new("git") - .args(["describe", "--tags", &latest_tag_commit]) - .output() - .map_err(|e| to_jsonrpsee_error_object("FULL_NODE_ERROR", e))?; - - if !latest_tag.status.success() { - return Err(to_jsonrpsee_error_object( - "Failure", - "Failed to get version", - )); - } - - Ok(String::from_utf8_lossy(&latest_tag.stdout) - .trim() - .to_string()) -} - -fn apply_call_config(call_frame: CallFrame, call_config: CallConfig) -> CallFrame { - // let only_top_call = call_config.only_top_call.unwrap_or(); - let mut new_call_frame = call_frame.clone(); - if let Some(true) = call_config.only_top_call { - new_call_frame.calls = vec![]; - } - if !call_config.with_log.unwrap_or(false) { - remove_logs_from_call_frame(&mut vec![new_call_frame.clone()]); - } - new_call_frame -} - -fn remove_logs_from_call_frame(call_frame: &mut Vec) { - for frame in call_frame { - frame.logs = vec![]; - remove_logs_from_call_frame(&mut frame.calls); - } -} - -async fn handle_debug_trace_chain( - mut params: ParamsSequence<'_>, - pending: PendingSubscriptionSink, - ethereum: Arc>, -) { - let start_block: BlockNumberOrTag = match params.next() { - Ok(v) => v, - Err(err) => { - pending.reject(err).await; - return; - } - }; - let end_block: BlockNumberOrTag = match params.next() { - Ok(v) => v, - Err(err) => { - pending.reject(err).await; - return; - } - }; - - // start block is exclusive, hence latest is not supported - let BlockNumberOrTag::Number(start_block) = start_block else { - pending.reject(EthApiError::Unsupported( - "Latest, earliest, pending, safe and finalized are not supported for traceChain start block", - )).await; - return; - }; - - let mut working_set = WorkingSet::::new(ethereum.storage.clone()); - let evm = Evm::::default(); - let latest_block_number: u64 = evm - .block_number(&mut working_set) - .expect("Expected at least one block") - .saturating_to(); - let end_block = match end_block { - BlockNumberOrTag::Number(end_block) => { - if end_block > latest_block_number { - pending.reject(EthApiError::UnknownBlockNumber).await; - return; - } - end_block - } - BlockNumberOrTag::Latest => latest_block_number, - _ => { - pending.reject(EthApiError::Unsupported( - "Earliest, pending, safe and finalized are not supported for traceChain end block", - )).await; - return; - } - }; - - if start_block >= end_block { - pending.reject(EthApiError::InvalidBlockRange).await; - return; - } - - let opts: Option = match params.optional_next() { - Ok(v) => v, - Err(err) => { - pending.reject(err).await; - return; - } - }; - - let subscription = pending.accept().await.unwrap(); - tokio::spawn(async move { - for block_number in start_block + 1..=end_block { - let mut working_set = WorkingSet::::new(ethereum.storage.clone()); - let traces = debug_trace_by_block_number( - block_number, - None, - ðereum, - &evm, - &mut working_set, - opts.clone(), - ); - match traces { - Ok(traces) => { - let msg = SubscriptionMessage::new( - subscription.method_name(), - subscription.subscription_id(), - &traces, - ) - .unwrap(); - let Ok(_) = subscription.send(msg).await else { - return; - }; - } - Err(err) => { - error!( - "Failed to get traces of block {} in traceChain: {}", - block_number, err - ); - - let msg = SubscriptionMessage::new( - subscription.method_name(), - subscription.subscription_id(), - &"Internal error", - ) - .unwrap(); - let _ = subscription.send(msg).await; - return; - } - }; - } - }); -} - -fn debug_trace_by_block_number( - block_number: u64, - trace_idx: Option, - ethereum: &Ethereum, - evm: &Evm, - working_set: &mut WorkingSet, - opts: Option, -) -> Result, ErrorObjectOwned> { - // If opts is None or if opts.tracer is None, then do not check cache or insert cache, just perform the operation - if opts.as_ref().map_or(true, |o| o.tracer.is_none()) { - let traces = - evm.trace_block_transactions_by_number(block_number, opts, trace_idx, working_set)?; - return match trace_idx { - Some(idx) => Ok(vec![traces[idx].clone()]), - None => Ok(traces), - }; - } - - let requested_opts = opts.unwrap(); - let tracer_type = requested_opts.tracer.unwrap(); - let tracer_config = requested_opts.tracer_config; - - if let Some(traces) = ethereum.trace_cache.lock().unwrap().get(&block_number) { - // If traces are found in cache convert them to specified opts and then return - let traces = match trace_idx { - Some(idx) => vec![traces[idx].clone()], - None => traces.to_vec(), - }; - let traces = - get_traces_with_requested_tracer_and_config(traces, tracer_type, tracer_config)?; - return Ok(traces); - } - - let cache_options = create_trace_cache_opts(); - let traces = evm.trace_block_transactions_by_number( - block_number, - Some(cache_options), - None, - working_set, - )?; - ethereum - .trace_cache - .lock() - .unwrap() - .insert(block_number, traces.clone()); - - // Convert the traces to the requested tracer and config - let traces = match trace_idx { - Some(idx) => vec![traces[idx].clone()], - None => traces, - }; - let traces = get_traces_with_requested_tracer_and_config(traces, tracer_type, tracer_config)?; - - Ok(traces) -} - -fn get_traces_with_requested_tracer_and_config( - traces: Vec, - tracer: GethDebugTracerType, - tracer_config: GethDebugTracerConfig, -) -> Result, EthApiError> { - // This can be only CallConfig or PreStateConfig if it is not CallConfig return Error for now - - let mut new_traces = vec![]; - match tracer { - GethDebugTracerType::BuiltInTracer(builtin_tracer) => { - match builtin_tracer { - GethDebugBuiltInTracerType::CallTracer => { - // Apply the call config to the traces - let call_config = - GethDebugTracerConfig::into_call_config(tracer_config).unwrap_or_default(); - // if call config is the same in the cache then do not process again and return early - match call_config { - CallConfig { - only_top_call: None, - with_log: Some(true), - } - | CallConfig { - only_top_call: Some(false), - with_log: Some(true), - } => { - return Ok(traces); - } - _ => { - traces.into_iter().for_each(|trace| { - if let GethTrace::CallTracer(call_frame) = trace { - let new_call_frame = - apply_call_config(call_frame.clone(), call_config); - new_traces.push(GethTrace::CallTracer(new_call_frame)); - } - }); - } - } - Ok(new_traces) - } - GethDebugBuiltInTracerType::FourByteTracer => { - traces.into_iter().for_each(|trace| { - if let GethTrace::CallTracer(call_frame) = trace { - let four_byte_frame = - convert_call_trace_into_4byte_frame(vec![call_frame]); - new_traces.push(GethTrace::FourByteTracer(four_byte_frame)); - } - }); - Ok(new_traces) - } - GethDebugBuiltInTracerType::NoopTracer => { - Ok(vec![GethTrace::NoopTracer(NoopFrame::default())]) - } - _ => Err(EthApiError::Unsupported("This tracer is not supported")), - } - } - GethDebugTracerType::JsTracer(_code) => { - // This also requires DatabaseRef trait - // Implement after readonly state is implemented - Err(EthApiError::Unsupported("JsTracer")) - } - } -} - -pub fn convert_call_trace_into_4byte_frame(call_frames: Vec) -> FourByteFrame { - FourByteFrame(convert_call_trace_into_4byte_map( - call_frames, - BTreeMap::new(), - )) -} - -fn convert_call_trace_into_4byte_map( - call_frames: Vec, - mut four_byte_map: BTreeMap, -) -> BTreeMap { - // For each input in each call - // get the first 4 bytes, get the size of the input - // the key is : "-" - // value is the occurence of the key - for call_frame in call_frames { - let input = call_frame.input; - // If this is a function call (function selector is 4 bytes long) - if input.len() >= 4 { - let input_size = input.0.len() - 4; - let four_byte = &input.to_string()[2..10]; // Ignore the 0x - let key = format!("{}-{}", four_byte, input_size); - let count = four_byte_map.entry(key).or_insert(0); - *count += 1; - } - four_byte_map = convert_call_trace_into_4byte_map(call_frame.calls, four_byte_map); - } - four_byte_map -} - -fn create_trace_cache_opts() -> GethDebugTracingOptions { - // Get the traces with call tracer onlytopcall false and withlog true and always cache this way - let mut call_config_map = serde_json::Map::new(); - call_config_map.insert("only_top_call".to_string(), serde_json::Value::Bool(false)); - call_config_map.insert("with_log".to_string(), serde_json::Value::Bool(true)); - let call_config = serde_json::Value::Object(call_config_map); - GethDebugTracingOptions { - tracer: Some(GethDebugTracerType::BuiltInTracer( - GethDebugBuiltInTracerType::CallTracer, - )), - tracer_config: GethDebugTracerConfig(call_config), - ..Default::default() - } -} diff --git a/crates/ethereum-rpc/src/subscription.rs b/crates/ethereum-rpc/src/subscription.rs new file mode 100644 index 000000000..bd414db17 --- /dev/null +++ b/crates/ethereum-rpc/src/subscription.rs @@ -0,0 +1,136 @@ +use citrea_evm::{log_matches_filter, Evm, Filter, LogResponse}; +use jsonrpsee::{SubscriptionMessage, SubscriptionSink}; +use reth_rpc_types::{BlockNumberOrTag, RichBlock}; +use sov_modules_api::WorkingSet; +use tokio::sync::broadcast; + +pub(crate) struct SubscriptionManager { + new_heads_tx: broadcast::Sender, + logs_tx: broadcast::Sender>, +} + +impl SubscriptionManager { + pub(crate) fn new( + storage: C::Storage, + soft_confirmation_rx: broadcast::Receiver, + ) -> Self { + let new_heads_tx = broadcast::channel(16).0; + let logs_tx = broadcast::channel(16).0; + let manager = Self { + new_heads_tx: new_heads_tx.clone(), + logs_tx: logs_tx.clone(), + }; + + let mut soft_confirmation_rx = soft_confirmation_rx; + // Spawn the task that will listen for new soft confirmation heights + // and send the corresponding ethereum block to subscribers + tokio::spawn(async move { + let evm = Evm::::default(); + loop { + let Ok(height) = soft_confirmation_rx.recv().await else { + return; + }; + + if new_heads_tx.receiver_count() != 0 { + let mut working_set = WorkingSet::::new(storage.clone()); + let block = evm + .get_block_by_number( + Some(BlockNumberOrTag::Number(height)), + None, + &mut working_set, + ) + .expect("Error querying block from evm") + .expect("Received signal but evm block is not found"); + + // Only possible error is no receiver + let _ = new_heads_tx.send(block.clone()); + } + + if logs_tx.receiver_count() != 0 { + let mut working_set = WorkingSet::::new(storage.clone()); + let logs = evm + .get_logs_in_block_range( + &mut working_set, + &Filter::default(), + height, + height, + ) + .expect("Error getting logs in block range"); + + // Only possible error is no receiver + let _ = logs_tx.send(logs); + } + } + }); + + manager + } + + pub(crate) fn subscribe_new_heads(&self) -> broadcast::Receiver { + self.new_heads_tx.subscribe() + } + + pub(crate) async fn subscribe_logs(&self) -> broadcast::Receiver> { + self.logs_tx.subscribe() + } +} + +pub async fn handle_new_heads_subscription( + subscription: SubscriptionSink, + mut rx: broadcast::Receiver, +) { + tokio::spawn(async move { + loop { + let Ok(block) = rx.recv().await else { + // Connection closed + return; + }; + + let msg = SubscriptionMessage::new( + subscription.method_name(), + subscription.subscription_id(), + &block, + ) + .unwrap(); + let Ok(_) = subscription.send(msg).await else { + // Connection closed + return; + }; + } + }); +} + +pub async fn handle_logs_subscription( + subscription: SubscriptionSink, + mut rx: broadcast::Receiver>, + filter: Filter, +) { + tokio::spawn(async move { + loop { + let Ok(logs) = rx.recv().await else { + // Connection closed + return; + }; + + for log in logs { + if log_matches_filter( + &log.clone().try_into().unwrap(), + &filter, + log.block_hash.as_ref().unwrap(), + &log.block_number.as_ref().unwrap().to::(), + ) { + let msg = SubscriptionMessage::new( + subscription.method_name(), + subscription.subscription_id(), + &log, + ) + .unwrap(); + let Ok(_) = subscription.send(msg).await else { + // Connection closed + return; + }; + } + } + } + }); +} diff --git a/crates/ethereum-rpc/src/trace.rs b/crates/ethereum-rpc/src/trace.rs new file mode 100644 index 000000000..098e23294 --- /dev/null +++ b/crates/ethereum-rpc/src/trace.rs @@ -0,0 +1,309 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +#[cfg(feature = "local")] +use citrea_evm::Evm; +use jsonrpsee::types::{ErrorObjectOwned, ParamsSequence}; +use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage}; +use reth_primitives::BlockNumberOrTag; +use reth_rpc::eth::error::EthApiError; +use reth_rpc_types::trace::geth::{ + CallConfig, CallFrame, FourByteFrame, GethDebugBuiltInTracerType, GethDebugTracerConfig, + GethDebugTracerType, GethDebugTracingOptions, GethTrace, NoopFrame, +}; +use sov_modules_api::WorkingSet; +use sov_rollup_interface::services::da::DaService; +use tracing::error; + +use crate::ethereum::Ethereum; + +pub async fn handle_debug_trace_chain( + mut params: ParamsSequence<'_>, + pending: PendingSubscriptionSink, + ethereum: Arc>, +) { + let start_block: BlockNumberOrTag = match params.next() { + Ok(v) => v, + Err(err) => { + pending.reject(err).await; + return; + } + }; + let end_block: BlockNumberOrTag = match params.next() { + Ok(v) => v, + Err(err) => { + pending.reject(err).await; + return; + } + }; + + // start block is exclusive, hence latest is not supported + let BlockNumberOrTag::Number(start_block) = start_block else { + pending.reject(EthApiError::Unsupported( + "Latest, earliest, pending, safe and finalized are not supported for traceChain start block", + )).await; + return; + }; + + let mut working_set = WorkingSet::::new(ethereum.storage.clone()); + let evm = Evm::::default(); + let latest_block_number: u64 = evm + .block_number(&mut working_set) + .expect("Expected at least one block") + .saturating_to(); + let end_block = match end_block { + BlockNumberOrTag::Number(end_block) => { + if end_block > latest_block_number { + pending.reject(EthApiError::UnknownBlockNumber).await; + return; + } + end_block + } + BlockNumberOrTag::Latest => latest_block_number, + _ => { + pending.reject(EthApiError::Unsupported( + "Earliest, pending, safe and finalized are not supported for traceChain end block", + )).await; + return; + } + }; + + if start_block >= end_block { + pending.reject(EthApiError::InvalidBlockRange).await; + return; + } + + let opts: Option = match params.optional_next() { + Ok(v) => v, + Err(err) => { + pending.reject(err).await; + return; + } + }; + + let subscription = pending.accept().await.unwrap(); + tokio::spawn(async move { + for block_number in start_block + 1..=end_block { + let mut working_set = WorkingSet::::new(ethereum.storage.clone()); + let traces = debug_trace_by_block_number( + block_number, + None, + ðereum, + &evm, + &mut working_set, + opts.clone(), + ); + match traces { + Ok(traces) => { + let msg = SubscriptionMessage::new( + subscription.method_name(), + subscription.subscription_id(), + &traces, + ) + .unwrap(); + let Ok(_) = subscription.send(msg).await else { + return; + }; + } + Err(err) => { + error!( + "Failed to get traces of block {} in traceChain: {}", + block_number, err + ); + + let msg = SubscriptionMessage::new( + subscription.method_name(), + subscription.subscription_id(), + &"Internal error", + ) + .unwrap(); + let _ = subscription.send(msg).await; + return; + } + }; + } + }); +} + +pub fn debug_trace_by_block_number( + block_number: u64, + trace_idx: Option, + ethereum: &Ethereum, + evm: &Evm, + working_set: &mut WorkingSet, + opts: Option, +) -> Result, ErrorObjectOwned> { + // If opts is None or if opts.tracer is None, then do not check cache or insert cache, just perform the operation + if opts.as_ref().map_or(true, |o| o.tracer.is_none()) { + let traces = + evm.trace_block_transactions_by_number(block_number, opts, trace_idx, working_set)?; + return match trace_idx { + Some(idx) => Ok(vec![traces[idx].clone()]), + None => Ok(traces), + }; + } + + let requested_opts = opts.unwrap(); + let tracer_type = requested_opts.tracer.unwrap(); + let tracer_config = requested_opts.tracer_config; + + if let Some(traces) = ethereum.trace_cache.lock().unwrap().get(&block_number) { + // If traces are found in cache convert them to specified opts and then return + let traces = match trace_idx { + Some(idx) => vec![traces[idx].clone()], + None => traces.to_vec(), + }; + let traces = + get_traces_with_requested_tracer_and_config(traces, tracer_type, tracer_config)?; + return Ok(traces); + } + + let cache_options = create_trace_cache_opts(); + let traces = evm.trace_block_transactions_by_number( + block_number, + Some(cache_options), + None, + working_set, + )?; + ethereum + .trace_cache + .lock() + .unwrap() + .insert(block_number, traces.clone()); + + // Convert the traces to the requested tracer and config + let traces = match trace_idx { + Some(idx) => vec![traces[idx].clone()], + None => traces, + }; + let traces = get_traces_with_requested_tracer_and_config(traces, tracer_type, tracer_config)?; + + Ok(traces) +} + +fn apply_call_config(call_frame: CallFrame, call_config: CallConfig) -> CallFrame { + // let only_top_call = call_config.only_top_call.unwrap_or(); + let mut new_call_frame = call_frame.clone(); + if let Some(true) = call_config.only_top_call { + new_call_frame.calls = vec![]; + } + if !call_config.with_log.unwrap_or(false) { + remove_logs_from_call_frame(&mut vec![new_call_frame.clone()]); + } + new_call_frame +} + +fn remove_logs_from_call_frame(call_frame: &mut Vec) { + for frame in call_frame { + frame.logs = vec![]; + remove_logs_from_call_frame(&mut frame.calls); + } +} + +fn get_traces_with_requested_tracer_and_config( + traces: Vec, + tracer: GethDebugTracerType, + tracer_config: GethDebugTracerConfig, +) -> Result, EthApiError> { + // This can be only CallConfig or PreStateConfig if it is not CallConfig return Error for now + + let mut new_traces = vec![]; + match tracer { + GethDebugTracerType::BuiltInTracer(builtin_tracer) => { + match builtin_tracer { + GethDebugBuiltInTracerType::CallTracer => { + // Apply the call config to the traces + let call_config = + GethDebugTracerConfig::into_call_config(tracer_config).unwrap_or_default(); + // if call config is the same in the cache then do not process again and return early + match call_config { + CallConfig { + only_top_call: None, + with_log: Some(true), + } + | CallConfig { + only_top_call: Some(false), + with_log: Some(true), + } => { + return Ok(traces); + } + _ => { + traces.into_iter().for_each(|trace| { + if let GethTrace::CallTracer(call_frame) = trace { + let new_call_frame = + apply_call_config(call_frame.clone(), call_config); + new_traces.push(GethTrace::CallTracer(new_call_frame)); + } + }); + } + } + Ok(new_traces) + } + GethDebugBuiltInTracerType::FourByteTracer => { + traces.into_iter().for_each(|trace| { + if let GethTrace::CallTracer(call_frame) = trace { + let four_byte_frame = + convert_call_trace_into_4byte_frame(vec![call_frame]); + new_traces.push(GethTrace::FourByteTracer(four_byte_frame)); + } + }); + Ok(new_traces) + } + GethDebugBuiltInTracerType::NoopTracer => { + Ok(vec![GethTrace::NoopTracer(NoopFrame::default())]) + } + _ => Err(EthApiError::Unsupported("This tracer is not supported")), + } + } + GethDebugTracerType::JsTracer(_code) => { + // This also requires DatabaseRef trait + // Implement after readonly state is implemented + Err(EthApiError::Unsupported("JsTracer")) + } + } +} + +fn convert_call_trace_into_4byte_frame(call_frames: Vec) -> FourByteFrame { + FourByteFrame(convert_call_trace_into_4byte_map( + call_frames, + BTreeMap::new(), + )) +} + +fn convert_call_trace_into_4byte_map( + call_frames: Vec, + mut four_byte_map: BTreeMap, +) -> BTreeMap { + // For each input in each call + // get the first 4 bytes, get the size of the input + // the key is : "-" + // value is the occurence of the key + for call_frame in call_frames { + let input = call_frame.input; + // If this is a function call (function selector is 4 bytes long) + if input.len() >= 4 { + let input_size = input.0.len() - 4; + let four_byte = &input.to_string()[2..10]; // Ignore the 0x + let key = format!("{}-{}", four_byte, input_size); + let count = four_byte_map.entry(key).or_insert(0); + *count += 1; + } + four_byte_map = convert_call_trace_into_4byte_map(call_frame.calls, four_byte_map); + } + four_byte_map +} + +fn create_trace_cache_opts() -> GethDebugTracingOptions { + // Get the traces with call tracer onlytopcall false and withlog true and always cache this way + let mut call_config_map = serde_json::Map::new(); + call_config_map.insert("only_top_call".to_string(), serde_json::Value::Bool(false)); + call_config_map.insert("with_log".to_string(), serde_json::Value::Bool(true)); + let call_config = serde_json::Value::Object(call_config_map); + GethDebugTracingOptions { + tracer: Some(GethDebugTracerType::BuiltInTracer( + GethDebugBuiltInTracerType::CallTracer, + )), + tracer_config: GethDebugTracerConfig(call_config), + ..Default::default() + } +} diff --git a/crates/evm/src/query.rs b/crates/evm/src/query.rs index fba72086c..1a2bf94f6 100644 --- a/crates/evm/src/query.rs +++ b/crates/evm/src/query.rs @@ -1408,7 +1408,7 @@ impl Evm { /// Returns an error if: /// - underlying database error /// - amount of matches exceeds configured limit - fn get_logs_in_block_range( + pub fn get_logs_in_block_range( &self, working_set: &mut WorkingSet, filter: &Filter, @@ -1434,11 +1434,10 @@ impl Evm { BlockRangeInclusiveIter::new(from_block_number..=to_block_number, max_headers_range) { for idx in from..=to { - let block = match self.blocks.get( - // Index from +1 or just from? - (idx) as usize, - &mut working_set.accessory_state(), - ) { + let block = match self + .blocks + .get((idx) as usize, &mut working_set.accessory_state()) + { Some(block) => block, None => { return Err(FilterError::EthAPIError( @@ -1484,7 +1483,6 @@ impl Evm { // TAG - true when the log was removed, due to a chain reorganization. false if its a valid log. let removed = false; - let topics = filter.topics.clone(); let tx_range = block.transactions; for i in tx_range { @@ -1499,13 +1497,7 @@ impl Evm { let logs = receipt.receipt.logs; for log in logs.into_iter() { - if log_matches_filter( - &log, - filter, - &topics, - &block.header.hash(), - &block.header.number, - ) { + if log_matches_filter(&log, filter, &block.header.hash(), &block.header.number) { let log = LogResponse { address: log.address, topics: log.topics().to_vec(), diff --git a/crates/evm/src/rpc_helpers/filter.rs b/crates/evm/src/rpc_helpers/filter.rs index 4979f7a53..511fb8589 100644 --- a/crates/evm/src/rpc_helpers/filter.rs +++ b/crates/evm/src/rpc_helpers/filter.rs @@ -629,16 +629,15 @@ where // https://github.com/paradigmxyz/reth/blob/main/crates/rpc/rpc/src/eth/logs_utils.rs#L56 /// Returns true if the log matches the filter and should be included -pub(crate) fn log_matches_filter( +pub fn log_matches_filter( log: &reth_primitives::Log, filter: &Filter, - topics: &[FilterSet; 4], block_hash: &BlockHash, block_number: &u64, ) -> bool { if !filter.filter_block_range(block_number) || !filter.filter_block_hash(block_hash) - || !filter.filter_topics(log, topics) + || !filter.filter_topics(log, &filter.topics) || !filter.filter_address(log, &filter.address) { return false; diff --git a/crates/evm/src/rpc_helpers/responses.rs b/crates/evm/src/rpc_helpers/responses.rs index 6720ffa48..076c9b75c 100644 --- a/crates/evm/src/rpc_helpers/responses.rs +++ b/crates/evm/src/rpc_helpers/responses.rs @@ -1,7 +1,7 @@ use std::hash::Hash; use alloy_primitives::Bytes; -use reth_primitives::{Address, U256}; +use reth_primitives::{Address, Log, U256}; use revm::primitives::B256; /// Ethereum Log emitted by a transaction @@ -28,3 +28,11 @@ pub struct LogResponse { #[serde(default)] pub removed: bool, } + +impl TryInto for LogResponse { + type Error = &'static str; + + fn try_into(self) -> Result { + Log::new(self.address, self.topics, self.data).ok_or("Invalid LogResponse") + } +} diff --git a/crates/evm/src/smart_contracts/logs_contract.rs b/crates/evm/src/smart_contracts/logs_contract.rs index e946a5072..e1be3ede4 100644 --- a/crates/evm/src/smart_contracts/logs_contract.rs +++ b/crates/evm/src/smart_contracts/logs_contract.rs @@ -1,14 +1,15 @@ -use alloy_sol_types::{sol, SolCall}; +use alloy_sol_types::{sol, SolCall, SolEvent}; use super::TestContract; -// Logs wrapper. sol! { #[sol(abi)] Logs, "./src/evm/test_data/Logs.abi" } +pub use Logs::{AnotherLog as AnotherLogEvent, Log as LogEvent}; + /// Logs wrapper. pub struct LogsContract { bytecode: Vec, @@ -39,4 +40,18 @@ impl LogsContract { } .abi_encode() } + + /// Decode Log event of the Logs smart contract. + pub fn decode_log_event( + log: &alloy_primitives::Log, + ) -> anyhow::Result> { + Ok(Logs::Log::decode_log(log, true)?) + } + + /// Decode AnotherLog event of the Logs smart contract. + pub fn decode_another_log_event( + log: &alloy_primitives::Log, + ) -> anyhow::Result> { + Ok(Logs::AnotherLog::decode_log(log, true)?) + } } diff --git a/crates/evm/src/smart_contracts/mod.rs b/crates/evm/src/smart_contracts/mod.rs index 122e693bd..145c70839 100644 --- a/crates/evm/src/smart_contracts/mod.rs +++ b/crates/evm/src/smart_contracts/mod.rs @@ -15,7 +15,7 @@ pub use caller_contract::CallerContract; pub use coinbase_contract::CoinbaseContract; pub use hive_contract::HiveContract; pub use infinite_loop_contract::InfiniteLoopContract; -pub use logs_contract::LogsContract; +pub use logs_contract::{AnotherLogEvent, LogEvent, LogsContract}; pub use payable_contract::SimplePayableContract; pub use self_destructor_contract::SelfDestructorContract; pub use simple_storage_contract::SimpleStorageContract; diff --git a/crates/fullnode/src/runner.rs b/crates/fullnode/src/runner.rs index 38c9d9d22..9eca791d3 100644 --- a/crates/fullnode/src/runner.rs +++ b/crates/fullnode/src/runner.rs @@ -29,7 +29,7 @@ use sov_rollup_interface::storage::HierarchicalStorageManager; use sov_rollup_interface::zk::{Proof, Zkvm, ZkvmHost}; use sov_stf_runner::{InitVariant, RollupPublicKeys, RpcConfig, RunnerConfig}; use tokio::select; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use tokio::time::{sleep, Duration}; use tracing::{debug, error, info, instrument, warn}; @@ -65,6 +65,7 @@ where accept_public_input_as_proven: bool, l1_block_cache: Arc>>, sync_blocks_count: u64, + soft_confirmation_tx: broadcast::Sender, } impl CitreaFullnode @@ -98,6 +99,7 @@ where init_variant: InitVariant, code_commitment: Vm::CodeCommitment, sync_blocks_count: u64, + soft_confirmation_tx: broadcast::Sender, ) -> Result { let (prev_state_root, prev_batch_hash) = match init_variant { InitVariant::Initialized((state_root, batch_hash)) => { @@ -148,6 +150,7 @@ where .unwrap_or(false), sync_blocks_count, l1_block_cache: Arc::new(Mutex::new(L1BlockCache::new())), + soft_confirmation_tx, }) } @@ -167,10 +170,12 @@ where let listen_address = SocketAddr::new(bind_host, self.rpc_config.bind_port); let max_connections = self.rpc_config.max_connections; + let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; let _handle = tokio::spawn(async move { let server = jsonrpsee::server::ServerBuilder::default() .max_connections(max_connections) + .max_subscriptions_per_connection(max_subscriptions_per_connection) .build([listen_address].as_ref()) .await; @@ -466,6 +471,9 @@ where BatchNumber(l2_height), )?; + // Only errors when there are no receivers + let _ = self.soft_confirmation_tx.send(l2_height); + self.state_root = next_state_root; self.batch_hash = soft_batch.hash; diff --git a/crates/fullnode/tests/runner_initialization_tests.rs b/crates/fullnode/tests/runner_initialization_tests.rs index 46181c348..be40a096e 100644 --- a/crates/fullnode/tests/runner_initialization_tests.rs +++ b/crates/fullnode/tests/runner_initialization_tests.rs @@ -11,6 +11,7 @@ use sov_stf_runner::{ mod hash_stf; use hash_stf::HashStf; +use tokio::sync::broadcast; type MockInitVariant = InitVariant, MockZkvm, MockDaSpec>; @@ -71,6 +72,8 @@ fn initialize_runner( max_request_body_size: 10 * 1024 * 1024, max_response_body_size: 10 * 1024 * 1024, batch_requests_limit: 50, + enable_subscriptions: true, + max_subscriptions_per_connection: 100, }, runner: Some(RunnerConfig { sequencer_client_url: "http://127.0.0.1:4444".to_string(), @@ -114,6 +117,7 @@ fn initialize_runner( init_variant, MockCodeCommitment([1u8; 32]), 10, + broadcast::channel(1).0, ) .unwrap() } diff --git a/crates/fullnode/tests/runner_reorg_tests.rs b/crates/fullnode/tests/runner_reorg_tests.rs index d89afb303..60cb5bdb5 100644 --- a/crates/fullnode/tests/runner_reorg_tests.rs +++ b/crates/fullnode/tests/runner_reorg_tests.rs @@ -19,6 +19,7 @@ use sov_rollup_interface::services::da::DaService; use sov_rollup_interface::storage::HierarchicalStorageManager; use sov_state::storage::NativeStorage; use sov_state::{ProverStorage, Storage}; +use tokio::sync::broadcast; type MockInitVariant = InitVariant, MockZkvm, MockDaSpec>; @@ -129,6 +130,8 @@ async fn runner_execution( max_request_body_size: 10 * 1024 * 1024, max_response_body_size: 10 * 1024 * 1024, batch_requests_limit: 50, + enable_subscriptions: true, + max_subscriptions_per_connection: 100, }, runner: Some(RunnerConfig { sequencer_client_url: "http://127.0.0.1:4444".to_string(), @@ -167,6 +170,7 @@ async fn runner_execution( init_variant, MockCodeCommitment([1u8; 32]), 10, + broadcast::channel(1).0, ) .unwrap(); diff --git a/crates/prover/src/runner.rs b/crates/prover/src/runner.rs index d9b516eae..da452975d 100644 --- a/crates/prover/src/runner.rs +++ b/crates/prover/src/runner.rs @@ -30,7 +30,7 @@ use sov_stf_runner::{ InitVariant, ProverConfig, ProverService, RollupPublicKeys, RpcConfig, RunnerConfig, }; use tokio::select; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use tokio::time::sleep; use tracing::{debug, error, info, instrument, warn}; @@ -72,6 +72,7 @@ where code_commitment: Vm::CodeCommitment, l1_block_cache: Arc>>, sync_blocks_count: u64, + soft_confirmation_tx: broadcast::Sender, } impl CitreaProver @@ -108,6 +109,7 @@ where prover_config: Option, code_commitment: Vm::CodeCommitment, sync_blocks_count: u64, + soft_confirmation_tx: broadcast::Sender, ) -> Result { let (prev_state_root, prev_batch_hash) = match init_variant { InitVariant::Initialized((state_root, batch_hash)) => { @@ -153,6 +155,7 @@ where code_commitment, l1_block_cache: Arc::new(Mutex::new(L1BlockCache::new())), sync_blocks_count, + soft_confirmation_tx, }) } @@ -172,10 +175,12 @@ where let listen_address = SocketAddr::new(bind_host, self.rpc_config.bind_port); let max_connections = self.rpc_config.max_connections; + let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; let _handle = tokio::spawn(async move { let server = jsonrpsee::server::ServerBuilder::default() .max_connections(max_connections) + .max_subscriptions_per_connection(max_subscriptions_per_connection) .build([listen_address].as_ref()) .await; @@ -373,6 +378,9 @@ where BatchNumber(l2_height), )?; + // Only errors when there are no receivers + let _ = self.soft_confirmation_tx.send(l2_height); + self.state_root = next_state_root; self.batch_hash = soft_batch.hash; diff --git a/crates/sequencer/src/sequencer.rs b/crates/sequencer/src/sequencer.rs index 79bdff3bf..1a89af74e 100644 --- a/crates/sequencer/src/sequencer.rs +++ b/crates/sequencer/src/sequencer.rs @@ -43,7 +43,7 @@ use sov_rollup_interface::storage::HierarchicalStorageManager; use sov_rollup_interface::zk::ZkvmHost; use sov_stf_runner::{InitVariant, RollupPublicKeys, RpcConfig}; use tokio::sync::oneshot::channel as oneshot_channel; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::time::{sleep, Instant}; use tower_http::cors::{Any, CorsLayer}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -91,6 +91,7 @@ where rpc_config: RpcConfig, soft_confirmation_rule_enforcer: SoftConfirmationRuleEnforcer, last_state_diff: StateDiff, + soft_confirmation_tx: broadcast::Sender, } enum L2BlockMode { @@ -123,6 +124,7 @@ where public_keys: RollupPublicKeys, ledger_db: LedgerDB, rpc_config: RpcConfig, + soft_confirmation_tx: broadcast::Sender, ) -> anyhow::Result { let (l2_force_block_tx, l2_force_block_rx) = unbounded(); @@ -180,6 +182,7 @@ where rpc_config, soft_confirmation_rule_enforcer, last_state_diff, + soft_confirmation_tx, }) } @@ -199,6 +202,7 @@ where ); let max_connections = self.rpc_config.max_connections; + let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; let max_request_body_size = self.rpc_config.max_request_body_size; let max_response_body_size = self.rpc_config.max_response_body_size; let batch_requests_limit = self.rpc_config.batch_requests_limit; @@ -212,6 +216,7 @@ where let _handle = tokio::spawn(async move { let server = ServerBuilder::default() .max_connections(max_connections) + .max_subscriptions_per_connection(max_subscriptions_per_connection) .max_request_body_size(max_request_body_size) .max_response_body_size(max_response_body_size) .set_batch_request_config(BatchRequestConfig::Limit(batch_requests_limit)) @@ -527,6 +532,9 @@ where BatchNumber(l2_height), )?; + // Only errors when there are no receivers + let _ = self.soft_confirmation_tx.send(l2_height); + let l1_height = da_block.header().height(); info!( "New block #{}, DA #{}, Tx count: #{}", diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs index 85f45324d..40833dbef 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs @@ -8,6 +8,7 @@ use sov_rollup_interface::services::da::SlotData; use sov_rollup_interface::stf::{BatchReceipt, Event, SoftBatchReceipt, StateDiff}; use sov_rollup_interface::zk::Proof; use sov_schema_db::{Schema, SchemaBatch, SeekKeyEncoder, DB}; +use tokio::sync::broadcast; use tracing::instrument; use crate::rocks_db_config::gen_rocksdb_options; @@ -37,7 +38,7 @@ pub struct LedgerDB { /// requires transactions to be executed before being committed. db: Arc, next_item_numbers: Arc>, - slot_subscriptions: tokio::sync::broadcast::Sender, + slot_subscriptions: broadcast::Sender, } /// A SlotNumber, BatchNumber, TxNumber, and EventNumber which are grouped together, typically representing @@ -123,7 +124,7 @@ impl LedgerDB { Ok(Self { db: Arc::new(inner), next_item_numbers: Arc::new(Mutex::new(next_item_numbers)), - slot_subscriptions: tokio::sync::broadcast::channel(10).0, + slot_subscriptions: broadcast::channel(10).0, }) } diff --git a/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs b/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs index 3f0605879..87cda068d 100644 --- a/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs +++ b/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs @@ -39,6 +39,12 @@ pub struct RpcConfig { /// Maximum number of batch requests #[serde(default = "default_batch_requests_limit")] pub batch_requests_limit: u32, + /// Disable subscription RPCs + #[serde(default = "default_enable_subscriptions")] + pub enable_subscriptions: bool, + /// Maximum number of subscription connections + #[serde(default = "default_max_subscriptions_per_connection")] + pub max_subscriptions_per_connection: u32, } #[inline] @@ -66,6 +72,16 @@ const fn default_sync_blocks_count() -> u64 { 10 } +#[inline] +const fn default_enable_subscriptions() -> bool { + true +} + +#[inline] +const fn default_max_subscriptions_per_connection() -> u32 { + 100 +} + /// Simple storage configuration #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct StorageConfig { @@ -170,6 +186,8 @@ mod tests { bind_host = "127.0.0.1" bind_port = 12345 max_connections = 500 + enable_subscriptions = true + max_subscriptions_per_connection = 200 [da] sender_address = "0000000000000000000000000000000000000000000000000000000000000000" @@ -208,6 +226,8 @@ mod tests { max_request_body_size: 10 * 1024 * 1024, max_response_body_size: 10 * 1024 * 1024, batch_requests_limit: 50, + enable_subscriptions: true, + max_subscriptions_per_connection: 200, }, public_keys: RollupPublicKeys { sequencer_public_key: vec![0; 32], diff --git a/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs b/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs index c65781edf..ffe86eff8 100644 --- a/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs +++ b/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs @@ -14,6 +14,7 @@ use sov_rollup_interface::storage::HierarchicalStorageManager; use sov_rollup_interface::zk::{Zkvm, ZkvmHost}; use sov_state::Storage; use sov_stf_runner::{FullNodeConfig, ProverConfig, ProverService}; +use tokio::sync::broadcast; pub use wallet::*; /// This trait defines how to crate all the necessary dependencies required by a rollup. @@ -67,6 +68,7 @@ pub trait RollupBlueprint: Sized + Send + Sync { ledger_db: &LedgerDB, da_service: &Self::DaService, sequencer_client_url: Option, + soft_confirmation_rx: Option>, ) -> Result, anyhow::Error>; /// Creates GenesisConfig from genesis files. diff --git a/resources/configs/bitcoin-regtest/prover_rollup_config.toml b/resources/configs/bitcoin-regtest/prover_rollup_config.toml index 98a326f7f..944a5d792 100644 --- a/resources/configs/bitcoin-regtest/prover_rollup_config.toml +++ b/resources/configs/bitcoin-regtest/prover_rollup_config.toml @@ -20,6 +20,7 @@ path = "resources/dbs/prover-db" # the host and port to bind the rpc server for bind_host = "127.0.0.1" bind_port = 12346 +enable_subscriptions = false [runner] sequencer_client_url = "http://0.0.0.0:12345" diff --git a/resources/configs/bitcoin-regtest/rollup_config.toml b/resources/configs/bitcoin-regtest/rollup_config.toml index 5ef41ca40..828413e07 100644 --- a/resources/configs/bitcoin-regtest/rollup_config.toml +++ b/resources/configs/bitcoin-regtest/rollup_config.toml @@ -20,6 +20,8 @@ path = "resources/dbs/full-node-db" # the host and port to bind the rpc server for bind_host = "127.0.0.1" bind_port = 12346 +enable_subscriptions = true +max_subscriptions_per_connection = 100 [runner] sequencer_client_url = "http://0.0.0.0:12345" diff --git a/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml b/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml index 065e6b1d9..f4e1b709c 100644 --- a/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml +++ b/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml @@ -20,3 +20,5 @@ path = "resources/dbs/sequencer-db" # the host and port to bind the rpc server for bind_host = "127.0.0.1" bind_port = 12345 +enable_subscriptions = true +max_subscriptions_per_connection = 100 diff --git a/resources/configs/devnet/rollup_config.toml b/resources/configs/devnet/rollup_config.toml index 430b69bd4..7176301fb 100644 --- a/resources/configs/devnet/rollup_config.toml +++ b/resources/configs/devnet/rollup_config.toml @@ -20,6 +20,8 @@ path = "resources/dbs/full-node-db" # the host and port to bind the rpc server for bind_host = "0.0.0.0" bind_port = 12346 +enable_subscriptions = true +max_subscriptions_per_connection = 100 [runner] sequencer_client_url = "https://rpc.devnet.citrea.xyz" diff --git a/resources/configs/mock-dockerized/rollup_config.toml b/resources/configs/mock-dockerized/rollup_config.toml index 93e55cdc0..25e3636a1 100644 --- a/resources/configs/mock-dockerized/rollup_config.toml +++ b/resources/configs/mock-dockerized/rollup_config.toml @@ -15,6 +15,8 @@ path = "resources/dbs/full-node-db" # the host and port to bind the rpc server for bind_host = "0.0.0.0" bind_port = 8545 +enable_subscriptions = true +max_subscriptions_per_connection = 100 [runner] sequencer_client_url = "http://0.0.0.0:8545" diff --git a/resources/configs/mock/rollup_config.toml b/resources/configs/mock/rollup_config.toml index d2b6f7dd6..9748b4547 100644 --- a/resources/configs/mock/rollup_config.toml +++ b/resources/configs/mock/rollup_config.toml @@ -15,6 +15,8 @@ path = "resources/dbs/full-node-db" # the host and port to bind the rpc server for bind_host = "127.0.0.1" bind_port = 12346 +enable_subscriptions = true +max_subscriptions_per_connection = 100 [runner] include_tx_body = false diff --git a/resources/configs/mock/sequencer_rollup_config.toml b/resources/configs/mock/sequencer_rollup_config.toml index 83700ba16..406b2373e 100644 --- a/resources/configs/mock/sequencer_rollup_config.toml +++ b/resources/configs/mock/sequencer_rollup_config.toml @@ -16,3 +16,5 @@ path = "resources/dbs/sequencer-db" bind_host = "127.0.0.1" bind_port = 12345 max_connections = 10000 +enable_subscriptions = true +max_subscriptions_per_connection = 100