Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime Subscription RPCs #916

Merged
merged 34 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
40d0625
Separate rpc Ethereum context into its file
yaziciahmet Jul 22, 2024
24c27bd
Separate debug trace
yaziciahmet Jul 22, 2024
c5517c8
Carry subscriptions at the end
yaziciahmet Jul 22, 2024
96bdab0
Create eth subscription handler signature
yaziciahmet Jul 22, 2024
f022b11
Pass tx to ledger db
yaziciahmet Jul 22, 2024
63ac5c3
Pass down the tx to rpc context
yaziciahmet Jul 22, 2024
01e94bc
Send height after commit
yaziciahmet Jul 22, 2024
5a23d73
Update new head handler fn signature
yaziciahmet Jul 22, 2024
1ae585a
Finalize handling
yaziciahmet Jul 22, 2024
c7adbcc
Add test
yaziciahmet Jul 22, 2024
d0174f0
unwrap to expect
yaziciahmet Jul 22, 2024
5cde45e
Remove notifying from ledger db
yaziciahmet Jul 23, 2024
5b5ac8e
Send new l2 block notif in sequencer and fullnode
yaziciahmet Jul 23, 2024
e9e26a9
Add to prover as well
yaziciahmet Jul 23, 2024
75550b2
Make tx optional in rpc
yaziciahmet Jul 23, 2024
a349c1a
Use params sequence
yaziciahmet Jul 23, 2024
1867614
Rename
yaziciahmet Jul 23, 2024
9b72af0
Fix doc
yaziciahmet Jul 23, 2024
0b4e547
Replace tx with rx in rpc
yaziciahmet Jul 23, 2024
60f3ecc
Rename file
yaziciahmet Jul 23, 2024
c1ad447
IMplement SubscriptionManager
yaziciahmet Jul 23, 2024
d577518
Use SubscriptionManager
yaziciahmet Jul 23, 2024
6ec1b46
Minor changes
yaziciahmet Jul 23, 2024
cfbcc80
Implemented logs subscription
yaziciahmet Jul 23, 2024
33f724b
Add log subscription test
yaziciahmet Jul 24, 2024
661b339
Simplify logs subscription flow
yaziciahmet Jul 24, 2024
af215de
Disable debug_subscribe if subscriptions disabled
yaziciahmet Jul 25, 2024
e7b7531
Update configs
yaziciahmet Jul 25, 2024
f64a593
Rename
yaziciahmet Jul 25, 2024
0903903
Add new conf params
yaziciahmet Jul 25, 2024
3f4aa0d
UPdate log_matches_filter
yaziciahmet Jul 25, 2024
c94154b
recreate working set
yaziciahmet Jul 25, 2024
e88ecbd
Replace disable with enable
yaziciahmet Jul 25, 2024
d141d6d
Rename config
yaziciahmet Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/citrea/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use sov_modules_api::default_context::DefaultContext;
use sov_prover_storage_manager::SnapshotManager;
use sov_rollup_interface::services::da::DaService;
use sov_state::ProverStorage;
use tokio::sync::broadcast;

// register ethereum methods.
pub(crate) fn register_ethereum<Da: DaService>(
da_service: Da,
storage: ProverStorage<sov_state::DefaultStorageSpec, SnapshotManager>,
methods: &mut jsonrpsee::RpcModule<()>,
sequencer_client_url: Option<String>,
soft_commitment_tx: broadcast::Sender<u64>,
) -> Result<(), anyhow::Error> {
let eth_rpc_config = {
let eth_signer = eth_dev_signer();
Expand All @@ -28,6 +30,7 @@ pub(crate) fn register_ethereum<Da: DaService>(
eth_rpc_config,
storage,
sequencer_client_url,
soft_commitment_tx,
);
methods
.merge(ethereum_rpc)
Expand Down
3 changes: 3 additions & 0 deletions bin/citrea/src/rollup/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sov_rollup_interface::da::DaVerifier;
use sov_rollup_interface::zk::{Zkvm, ZkvmHost};
use sov_state::{DefaultStorageSpec, Storage, ZkStorage};
use sov_stf_runner::{FullNodeConfig, ProverConfig};
use tokio::sync::broadcast;
use tracing::instrument;

use crate::CitreaRollupBlueprint;
Expand Down Expand Up @@ -61,6 +62,7 @@ impl RollupBlueprint for BitcoinRollup {
ledger_db: &LedgerDB,
da_service: &Self::DaService,
sequencer_client_url: Option<String>,
soft_commitment_tx: broadcast::Sender<u64>,
) -> Result<jsonrpsee::RpcModule<()>, anyhow::Error> {
// unused inside register RPC
let sov_sequencer = Address::new([0; 32]);
Expand All @@ -77,6 +79,7 @@ impl RollupBlueprint for BitcoinRollup {
storage.clone(),
&mut rpc_methods,
sequencer_client_url,
soft_commitment_tx,
)?;

Ok(rpc_methods)
Expand Down
3 changes: 3 additions & 0 deletions bin/citrea/src/rollup/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use sov_prover_storage_manager::ProverStorageManager;
use sov_rollup_interface::zk::{Zkvm, ZkvmHost};
use sov_state::{DefaultStorageSpec, Storage, ZkStorage};
use sov_stf_runner::{FullNodeConfig, ProverConfig};
use tokio::sync::broadcast;

use crate::CitreaRollupBlueprint;

Expand Down Expand Up @@ -55,6 +56,7 @@ impl RollupBlueprint for MockDemoRollup {
ledger_db: &LedgerDB,
da_service: &Self::DaService,
sequencer_client_url: Option<String>,
soft_commitment_tx: broadcast::Sender<u64>,
) -> Result<jsonrpsee::RpcModule<()>, anyhow::Error> {
// TODO set the sequencer address
let sequencer = Address::new([0; 32]);
Expand All @@ -71,6 +73,7 @@ impl RollupBlueprint for MockDemoRollup {
storage.clone(),
&mut rpc_methods,
sequencer_client_url,
soft_commitment_tx,
)?;

Ok(rpc_methods)
Expand Down
21 changes: 16 additions & 5 deletions bin/citrea/src/rollup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sov_modules_rollup_blueprint::RollupBlueprint;
use sov_modules_stf_blueprint::{Runtime as RuntimeTrait, StfBlueprint};
use sov_state::storage::NativeStorage;
use sov_stf_runner::{FullNodeConfig, InitVariant, ProverConfig};
use tokio::sync::broadcast;
use tracing::instrument;
mod bitcoin;
mod mock;
Expand Down Expand Up @@ -37,15 +38,21 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
// Maybe whole "prev_root" can be initialized inside runner
// Getting block here, so prover_service doesn't have to be `Send`

let ledger_db = self.create_ledger_db(&rollup_config);
let (soft_confirmation_tx, _) = broadcast::channel(10);
yaziciahmet marked this conversation as resolved.
Show resolved Hide resolved
let ledger_db = self.create_ledger_db(&rollup_config, soft_confirmation_tx.clone());
let genesis_config = self.create_genesis_config(runtime_genesis_paths, &rollup_config)?;

let mut storage_manager = self.create_storage_manager(&rollup_config)?;
let prover_storage = storage_manager.create_finalized_storage()?;

// TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218)
let rpc_methods =
self.create_rpc_methods(&prover_storage, &ledger_db, &da_service, None)?;
let rpc_methods = self.create_rpc_methods(
eyusufatik marked this conversation as resolved.
Show resolved Hide resolved
&prover_storage,
&ledger_db,
&da_service,
None,
soft_confirmation_tx,
)?;

let native_stf = StfBlueprint::new();

Expand Down Expand Up @@ -103,7 +110,8 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
// Maybe whole "prev_root" can be initialized inside runner
// Getting block here, so prover_service doesn't have to be `Send`

let ledger_db = self.create_ledger_db(&rollup_config);
let (soft_confirmation_tx, _) = broadcast::channel(10);
let ledger_db = self.create_ledger_db(&rollup_config, soft_confirmation_tx.clone());
let genesis_config = self.create_genesis_config(runtime_genesis_paths, &rollup_config)?;

let mut storage_manager = self.create_storage_manager(&rollup_config)?;
Expand All @@ -116,6 +124,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
&ledger_db,
&da_service,
Some(runner_config.sequencer_client_url.clone()),
soft_confirmation_tx,
)?;

let native_stf = StfBlueprint::new();
Expand Down Expand Up @@ -181,7 +190,8 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
// Maybe whole "prev_root" can be initialized inside runner
// Getting block here, so prover_service doesn't have to be `Send`

let ledger_db = self.create_ledger_db(&rollup_config);
let (soft_confirmation_tx, _) = broadcast::channel(10);
let ledger_db = self.create_ledger_db(&rollup_config, soft_confirmation_tx.clone());
let genesis_config = self.create_genesis_config(runtime_genesis_paths, &rollup_config)?;

let mut storage_manager = self.create_storage_manager(&rollup_config)?;
Expand All @@ -194,6 +204,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
&ledger_db,
&da_service,
Some(runner_config.sequencer_client_url.clone()),
soft_confirmation_tx,
)?;

let native_stf = StfBlueprint::new();
Expand Down
3 changes: 2 additions & 1 deletion bin/citrea/src/test_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use sov_rollup_interface::stf::fuzzing::BatchReceiptStrategyArgs;
use sov_rollup_interface::stf::{BatchReceipt, Event, SoftBatchReceipt, TransactionReceipt};
#[cfg(test)]
use sov_stf_runner::RpcConfig;
use tokio::sync::broadcast;

struct TestExpect {
payload: serde_json::Value,
Expand Down Expand Up @@ -81,7 +82,7 @@ fn test_helper(
rt.block_on(async {
// Initialize the ledger database, which stores blocks, transactions, events, etc.
let tmpdir = tempfile::tempdir().unwrap();
let mut ledger_db = LedgerDB::with_path(tmpdir.path()).unwrap();
let mut ledger_db = LedgerDB::with_path(tmpdir.path(), broadcast::channel(1).0).unwrap();
populate_ledger(&mut ledger_db, slots, soft_batch_receipts);
let server = jsonrpsee::server::ServerBuilder::default()
.build("127.0.0.1:0")
Expand Down
40 changes: 40 additions & 0 deletions crates/ethereum-rpc/src/eth_subscription.rs
eyusufatik marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::sync::Arc;

use citrea_evm::Evm;
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage};
use reth_rpc_types::BlockNumberOrTag;
use sov_modules_api::WorkingSet;
use sov_rollup_interface::services::da::DaService;

use crate::ethereum::Ethereum;

pub async fn handle_new_heads_subscription<C: sov_modules_api::Context, Da: DaService>(
pending: PendingSubscriptionSink,
ethereum: Arc<Ethereum<C, Da>>,
) {
let mut rx = ethereum.soft_commitment_tx.subscribe();
let evm = Evm::<C>::default();
let subscription = pending.accept().await.unwrap();
tokio::spawn(async move {
loop {
let block_number = rx.recv().await.unwrap();
let mut working_set = WorkingSet::<C>::new(ethereum.storage.clone());
let block = evm
.get_block_by_number(
Some(BlockNumberOrTag::Number(block_number)),
None,
&mut working_set,
)
.unwrap()
.unwrap();

let msg = SubscriptionMessage::new(
subscription.method_name(),
subscription.subscription_id(),
&block,
)
.unwrap();
let _ = subscription.send(msg).await;
}
});
}
111 changes: 111 additions & 0 deletions crates/ethereum-rpc/src/ethereum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::sync::Mutex;

#[cfg(feature = "local")]
use citrea_evm::DevSigner;
use citrea_evm::Evm;
use reth_primitives::U256;
use reth_rpc_types::trace::geth::GethTrace;
use rustc_version_runtime::version;
use schnellru::{ByLength, LruMap};
use sequencer_client::SequencerClient;
use sov_modules_api::WorkingSet;
use sov_rollup_interface::services::da::DaService;
use sov_rollup_interface::CITREA_VERSION;
use tokio::sync::broadcast;
use tracing::instrument;

use crate::gas_price::fee_history::FeeHistoryCacheConfig;
use crate::gas_price::gas_oracle::{GasPriceOracle, GasPriceOracleConfig};

const MAX_TRACE_BLOCK: u32 = 1000;

#[derive(Clone)]
pub struct EthRpcConfig {
pub gas_price_oracle_config: GasPriceOracleConfig,
pub fee_history_cache_config: FeeHistoryCacheConfig,
#[cfg(feature = "local")]
pub eth_signer: DevSigner,
}

pub struct Ethereum<C: sov_modules_api::Context, Da: DaService> {
#[allow(dead_code)]
pub(crate) da_service: Da,
pub(crate) gas_price_oracle: GasPriceOracle<C>,
#[cfg(feature = "local")]
pub(crate) eth_signer: DevSigner,
pub(crate) storage: C::Storage,
pub(crate) sequencer_client: Option<SequencerClient>,
pub(crate) web3_client_version: String,
pub(crate) trace_cache: Mutex<LruMap<u64, Vec<GethTrace>, ByLength>>,
pub(crate) soft_commitment_tx: broadcast::Sender<u64>,
}

impl<C: sov_modules_api::Context, Da: DaService> Ethereum<C, Da> {
pub(crate) fn new(
da_service: Da,
gas_price_oracle_config: GasPriceOracleConfig,
fee_history_cache_config: FeeHistoryCacheConfig,
#[cfg(feature = "local")] eth_signer: DevSigner,
storage: C::Storage,
sequencer_client: Option<SequencerClient>,
soft_commitment_tx: broadcast::Sender<u64>,
) -> Self {
let evm = Evm::<C>::default();
let gas_price_oracle =
GasPriceOracle::new(evm, gas_price_oracle_config, fee_history_cache_config);

let rollup = "citrea";
let arch = std::env::consts::ARCH;
let rustc_v = version();

let current_version = format!("{}/{}/{}/rust-{}", rollup, CITREA_VERSION, arch, rustc_v);

let trace_cache = Mutex::new(LruMap::new(ByLength::new(MAX_TRACE_BLOCK)));

Self {
da_service,
gas_price_oracle,
#[cfg(feature = "local")]
eth_signer,
storage,
sequencer_client,
web3_client_version: current_version,
trace_cache,
soft_commitment_tx,
}
}

#[instrument(level = "trace", skip_all)]
pub(crate) async fn max_fee_per_gas(&self, working_set: &mut WorkingSet<C>) -> (U256, U256) {
let suggested_tip = self
.gas_price_oracle
.suggest_tip_cap(working_set)
.await
.unwrap();

let evm = Evm::<C>::default();
let base_fee = evm
.get_block_by_number(None, None, working_set)
.unwrap()
.unwrap()
.header
.base_fee_per_gas
.unwrap_or_default();

(U256::from(base_fee), U256::from(suggested_tip))
}

// fn make_raw_tx(
// &self,
// raw_tx: RlpEvmTransaction,
// ) -> Result<(B256, Vec<u8>), jsonrpsee::core::RegisterMethodError> {
// let signed_transaction: RethTransactionSignedNoHash = raw_tx.clone().try_into()?;

// let tx_hash = signed_transaction.hash();

// let tx = CallMessage { txs: vec![raw_tx] };
// let message = <Runtime<C, Da::Spec> as EncodeCall<citrea_evm::Evm<C>>>::encode_call(tx);

// Ok((B256::from(tx_hash), message))
// }
}
Loading
Loading