Skip to content

Commit

Permalink
Realtime Subscription RPCs (#916)
Browse files Browse the repository at this point in the history
* Separate rpc Ethereum context into its file

* Separate debug trace

* Carry subscriptions at the end

* Create eth subscription handler signature

* Pass tx to ledger db

* Pass down the tx to rpc context

* Send height after commit

* Update new head handler fn signature

* Finalize handling

* Add test

* unwrap to expect

* Remove notifying from ledger db

* Send new l2 block notif in sequencer and fullnode

* Add to prover as well

* Make tx optional in rpc

* Use params sequence

* Rename

* Fix doc

* Replace tx with rx in rpc

* Rename file

* IMplement SubscriptionManager

* Use SubscriptionManager

* Minor changes

* Implemented logs subscription

* Add log subscription test

* Simplify logs subscription flow

* Disable debug_subscribe if subscriptions disabled

* Update configs

* Rename

* Add new conf params

* UPdate log_matches_filter

* recreate working set

* Replace disable with enable

* Rename config
  • Loading branch information
yaziciahmet authored Jul 25, 2024
1 parent 677c066 commit 13eed63
Show file tree
Hide file tree
Showing 33 changed files with 1,150 additions and 494 deletions.
3 changes: 3 additions & 0 deletions bin/citrea/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use sov_modules_api::default_context::DefaultContext;
use sov_prover_storage_manager::SnapshotManager;
use sov_rollup_interface::services::da::DaService;
use sov_state::ProverStorage;
use tokio::sync::broadcast;

// register ethereum methods.
pub(crate) fn register_ethereum<Da: DaService>(
da_service: Da,
storage: ProverStorage<sov_state::DefaultStorageSpec, SnapshotManager>,
methods: &mut jsonrpsee::RpcModule<()>,
sequencer_client_url: Option<String>,
soft_confirmation_rx: Option<broadcast::Receiver<u64>>,
) -> Result<(), anyhow::Error> {
let eth_rpc_config = {
let eth_signer = eth_dev_signer();
Expand All @@ -28,6 +30,7 @@ pub(crate) fn register_ethereum<Da: DaService>(
eth_rpc_config,
storage,
sequencer_client_url,
soft_confirmation_rx,
);
methods
.merge(ethereum_rpc)
Expand Down
3 changes: 3 additions & 0 deletions bin/citrea/src/rollup/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sov_rollup_interface::da::DaVerifier;
use sov_rollup_interface::zk::{Zkvm, ZkvmHost};
use sov_state::{DefaultStorageSpec, Storage, ZkStorage};
use sov_stf_runner::{FullNodeConfig, ProverConfig};
use tokio::sync::broadcast;
use tracing::instrument;

use crate::CitreaRollupBlueprint;
Expand Down Expand Up @@ -61,6 +62,7 @@ impl RollupBlueprint for BitcoinRollup {
ledger_db: &LedgerDB,
da_service: &Self::DaService,
sequencer_client_url: Option<String>,
soft_confirmation_rx: Option<broadcast::Receiver<u64>>,
) -> Result<jsonrpsee::RpcModule<()>, anyhow::Error> {
// unused inside register RPC
let sov_sequencer = Address::new([0; 32]);
Expand All @@ -77,6 +79,7 @@ impl RollupBlueprint for BitcoinRollup {
storage.clone(),
&mut rpc_methods,
sequencer_client_url,
soft_confirmation_rx,
)?;

Ok(rpc_methods)
Expand Down
3 changes: 3 additions & 0 deletions bin/citrea/src/rollup/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use sov_prover_storage_manager::ProverStorageManager;
use sov_rollup_interface::zk::{Zkvm, ZkvmHost};
use sov_state::{DefaultStorageSpec, Storage, ZkStorage};
use sov_stf_runner::{FullNodeConfig, ProverConfig};
use tokio::sync::broadcast;

use crate::CitreaRollupBlueprint;

Expand Down Expand Up @@ -55,6 +56,7 @@ impl RollupBlueprint for MockDemoRollup {
ledger_db: &LedgerDB,
da_service: &Self::DaService,
sequencer_client_url: Option<String>,
soft_confirmation_rx: Option<broadcast::Receiver<u64>>,
) -> Result<jsonrpsee::RpcModule<()>, anyhow::Error> {
// TODO set the sequencer address
let sequencer = Address::new([0; 32]);
Expand All @@ -71,6 +73,7 @@ impl RollupBlueprint for MockDemoRollup {
storage.clone(),
&mut rpc_methods,
sequencer_client_url,
soft_confirmation_rx,
)?;

Ok(rpc_methods)
Expand Down
36 changes: 34 additions & 2 deletions bin/citrea/src/rollup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sov_modules_rollup_blueprint::RollupBlueprint;
use sov_modules_stf_blueprint::{Runtime as RuntimeTrait, StfBlueprint};
use sov_state::storage::NativeStorage;
use sov_stf_runner::{FullNodeConfig, InitVariant, ProverConfig};
use tokio::sync::broadcast;
use tracing::instrument;
mod bitcoin;
mod mock;
Expand Down Expand Up @@ -43,9 +44,21 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
let mut storage_manager = self.create_storage_manager(&rollup_config)?;
let prover_storage = storage_manager.create_finalized_storage()?;

let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10);
// If subscriptions disabled, pass None
let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions {
Some(soft_confirmation_rx)
} else {
None
};
// TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218)
let rpc_methods =
self.create_rpc_methods(&prover_storage, &ledger_db, &da_service, None)?;
let rpc_methods = self.create_rpc_methods(
&prover_storage,
&ledger_db,
&da_service,
None,
soft_confirmation_rx,
)?;

let native_stf = StfBlueprint::new();

Expand Down Expand Up @@ -75,6 +88,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
rollup_config.public_keys,
ledger_db,
rollup_config.rpc,
soft_confirmation_tx,
)
.unwrap();

Expand Down Expand Up @@ -110,12 +124,20 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
let prover_storage = storage_manager.create_finalized_storage()?;

let runner_config = rollup_config.runner.expect("Runner config is missing");
let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10);
// If subscriptions disabled, pass None
let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions {
Some(soft_confirmation_rx)
} else {
None
};
// TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218)
let rpc_methods = self.create_rpc_methods(
&prover_storage,
&ledger_db,
&da_service,
Some(runner_config.sequencer_client_url.clone()),
soft_confirmation_rx,
)?;

let native_stf = StfBlueprint::new();
Expand Down Expand Up @@ -149,6 +171,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
init_variant,
code_commitment,
rollup_config.sync_blocks_count,
soft_confirmation_tx,
)?;

Ok(FullNode {
Expand Down Expand Up @@ -187,13 +210,21 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
let mut storage_manager = self.create_storage_manager(&rollup_config)?;
let prover_storage = storage_manager.create_finalized_storage()?;

let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10);
// If subscriptions disabled, pass None
let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions {
Some(soft_confirmation_rx)
} else {
None
};
let runner_config = rollup_config.runner.expect("Runner config is missing");
// TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218)
let rpc_methods = self.create_rpc_methods(
&prover_storage,
&ledger_db,
&da_service,
Some(runner_config.sequencer_client_url.clone()),
soft_confirmation_rx,
)?;

let native_stf = StfBlueprint::new();
Expand Down Expand Up @@ -229,6 +260,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
Some(prover_config),
code_commitment,
rollup_config.sync_blocks_count,
soft_confirmation_tx,
)?;

Ok(Prover {
Expand Down
2 changes: 2 additions & 0 deletions bin/citrea/src/test_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ fn test_helper(
max_request_body_size: 10 * 1024 * 1024,
max_response_body_size: 10 * 1024 * 1024,
batch_requests_limit: 50,
enable_subscriptions: true,
max_subscriptions_per_connection: 100,
};

queries_test_runner(test_queries, rpc_config).await;
Expand Down
1 change: 1 addition & 0 deletions bin/citrea/tests/evm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{

mod archival_state;
mod gas_price;
mod subscription;
mod tracing;

#[tokio::test(flavor = "multi_thread")]
Expand Down
Loading

0 comments on commit 13eed63

Please sign in to comment.