From 50e322da3395792e5537e779dae8c251f1897eba Mon Sep 17 00:00:00 2001 From: Rakan Al-Huneiti Date: Fri, 7 Jun 2024 17:07:49 +0300 Subject: [PATCH] Refactor into consistent block production cycles (#498) * Refactor into consistent block production cycles * Cleanup build_block * Warn when falling behind * Remove L2 block mode * Add empty line separator * Fix redundant error messages * Pass lints * wip * Account for parent block execution time * Submit empty blocks for missed DA blocks * Add additional comment * Resolve udeps * Add guards to block production * Remove redundant check * Fix typos * Fix wrong merge * Decrease the number of DA service requests * Add additional information to panic * Set value for skipped_blocks * Satisfy clippy * WIP * some changes * more sequencer configs + change handling of last_l1_hieght * merge fixes * Use the spawned commitment thread * Incorporate submitting commitments into the loop * Fix clippy warning * Remove unwraps * Clean up * Fix sleep duration * Filter out postgres logging * Remove wait_until_eth_block_number * Disable logging for tests * Fix tests * Adding missing interval configs * WIP * Add missing params * Resolve sequencer commitments tests * Fix more tests * Fix reopen prover * Cleanup * Update reopen prover * Make reopen prover multithreaded * Remove flakiness from reopen prover test * Fix full_node_verify_proof_and_store * Fix unwrap call * Improve timeout error messages * Fix test_all_flow and test_db_get_proof * If test fails, show backtrace * Add nextest config for e2e tests * Introduce wait_for_proof * Wait just until block 8 in prover * Update nextest configs * Fix sequencer crash test * Fix test_all_flow * Remove nextest configs * Fix PR feedback * Add TODO * Get rid of the flakiness of reopen prover * Update last_used_l1_height as a result --------- Co-authored-by: eyusufatik --- .github/workflows/checks.yml | 1 + Cargo.toml | 1 + bin/citrea/Cargo.toml | 2 +- bin/citrea/src/lib.rs | 10 +- bin/citrea/tests/e2e/mod.rs | 236 ++++--- bin/citrea/tests/sequencer_commitments/mod.rs | 53 +- .../soft_confirmation_rule_enforcer/mod.rs | 20 +- bin/citrea/tests/test_client/mod.rs | 4 +- bin/citrea/tests/test_helpers/mod.rs | 61 +- configs/bitcoin-regtest/sequencer_config.toml | 2 + configs/mock-dockerized/sequencer_config.toml | 2 + configs/mock/sequencer_config.toml | 2 + crates/sequencer/src/commitment_controller.rs | 2 +- crates/sequencer/src/config.rs | 8 + crates/sequencer/src/sequencer.rs | 582 +++++++++++------- .../full-node/sov-stf-runner/src/runner.rs | 2 +- .../sov-prover-incentives/Cargo.toml | 4 +- 17 files changed, 653 insertions(+), 339 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 39d60c49f..64bbce13b 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -400,6 +400,7 @@ jobs: - name: Run nextest run: make test env: + RUST_BACKTRACE: 1 BONSAI_API_URL: ${{ secrets.BONSAI_API_URL }} # TODO: remove this once we don't use the client on tests BONSAI_API_KEY: ${{ secrets.BONSAI_API_KEY }} # TODO: remove this once we don't use the client on tests diff --git a/Cargo.toml b/Cargo.toml index 9d62a4331..cad6efde9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ serde_json = { version = "1.0", default-features = false, features = ["alloc"] } sha2 = { version = "0.10.8", default-features = false } thiserror = "1.0.50" tracing = { version = "0.1.40", default-features = false } +tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json", "fmt"] } bech32 = { version = "0.9.1", default-features = false } derive_more = { version = "0.99.11", default-features = false } clap = { version = "4.4.10", features = ["derive"] } diff --git a/bin/citrea/Cargo.toml b/bin/citrea/Cargo.toml index 579f8d6ab..347b09e29 100644 --- a/bin/citrea/Cargo.toml +++ b/bin/citrea/Cargo.toml @@ -46,12 +46,12 @@ log-panics = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tracing = { workspace = true } +tracing-subscriber = { workspace = true } hex = { workspace = true, optional = true } tokio = { workspace = true } reth-primitives = { workspace = true } reth-transaction-pool = { workspace = true } reth-rpc-types = { workspace = true } -tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] } soft-confirmation-rule-enforcer = { path = "../../crates/soft-confirmation-rule-enforcer" } sov-db = { path = "../../crates/sovereign-sdk/full-node/db/sov-db" } diff --git a/bin/citrea/src/lib.rs b/bin/citrea/src/lib.rs index 68fadeae3..91f674308 100644 --- a/bin/citrea/src/lib.rs +++ b/bin/citrea/src/lib.rs @@ -17,11 +17,11 @@ pub use bitcoin_rollup::*; /// Default initialization of logging pub fn initialize_logging() { - let env_filter = - EnvFilter::from_str(&env::var("RUST_LOG").unwrap_or_else(|_| { - "debug,hyper=info,risc0_zkvm=info,guest_execution=debug".to_string() - })) - .unwrap(); + let env_filter = EnvFilter::from_str(&env::var("RUST_LOG").unwrap_or_else(|_| { + "debug,jmt=info,hyper=info,risc0_zkvm=info,guest_execution=debug,tokio_postgres=info" + .to_string() + })) + .unwrap(); if std::env::var("JSON_LOGS").is_ok() { tracing_subscriber::registry() .with(fmt::layer().json()) diff --git a/bin/citrea/tests/e2e/mod.rs b/bin/citrea/tests/e2e/mod.rs index 13cc6504b..d53613ad3 100644 --- a/bin/citrea/tests/e2e/mod.rs +++ b/bin/citrea/tests/e2e/mod.rs @@ -27,7 +27,8 @@ use crate::evm::{init_test_rollup, make_test_client}; use crate::test_client::TestClient; use crate::test_helpers::{ create_default_sequencer_config, start_rollup, tempdir_with_children, wait_for_l1_block, - wait_for_l2_block, wait_for_postgres_commitment, wait_for_prover_l1_height, NodeMode, + wait_for_l2_block, wait_for_postgres_commitment, wait_for_postgres_proofs, wait_for_proof, + wait_for_prover_l1_height, NodeMode, }; use crate::{ DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, DEFAULT_MIN_SOFT_CONFIRMATIONS_PER_COMMITMENT, @@ -784,6 +785,7 @@ async fn test_soft_confirmations_on_different_blocks() -> Result<(), anyhow::Err // publish new da block da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; for _ in 1..=6 { seq_test_client.spam_publish_batch_request().await.unwrap(); @@ -1065,14 +1067,40 @@ async fn test_soft_confirmations_status_one_l1() -> Result<(), anyhow::Error> { seq_test_client.send_publish_batch_request().await; } - // TODO check status=trusted - wait_for_l2_block(&full_node_test_client, 6, None).await; // publish new da block + // + // This will trigger the sequencer's DA monitor to see a newly published + // block and will therefore initiate a commitment submission to the MockDA. + // Therefore, creating yet another DA block. da_service.publish_test_block().await.unwrap(); - seq_test_client.send_publish_batch_request().await; // TODO https://github.com/chainwayxyz/citrea/issues/214 - seq_test_client.send_publish_batch_request().await; // TODO https://github.com/chainwayxyz/citrea/issues/214 + + // The above L1 block has been created, + // we wait until the block is actually received by the DA monitor. + wait_for_l1_block(&da_service, 2, None).await; + + // To make sure that we register one L2 block per L1 block, + // We have to submit an empty block for DA block #2. + // seq_test_client.send_publish_batch_request().await; + // wait_for_l2_block(&full_node_test_client, 7, None).await; + + // Wait for DA block #3 containing the commitment + // submitted by sequencer. + wait_for_l1_block(&da_service, 3, None).await; + + // now retrieve confirmation status from the sequencer and full node and check if they are the same + for i in 1..=6 { + let status_node = full_node_test_client + .ledger_get_soft_confirmation_status(i) + .await + .unwrap(); + + assert_eq!(SoftConfirmationStatus::Trusted, status_node.unwrap()); + } + + seq_test_client.send_publish_batch_request().await; + seq_test_client.send_publish_batch_request().await; wait_for_l2_block(&full_node_test_client, 8, None).await; @@ -1122,6 +1150,7 @@ async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> { // publish new da block da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; for _ in 2..=6 { seq_test_client.send_publish_batch_request().await; @@ -1141,6 +1170,8 @@ async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> { // publish new da block da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 3, None).await; + seq_test_client.send_publish_batch_request().await; seq_test_client.send_publish_batch_request().await; @@ -1255,13 +1286,14 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { // prover should not have any blocks saved assert_eq!(prover_node_test_client.eth_block_number().await, 0); - da_service.publish_test_block().await.unwrap(); - + // start l1 height = 1, end = 2 seq_test_client.send_publish_batch_request().await; // sequencer commitment should be sent da_service.publish_test_block().await.unwrap(); - // start l1 height = 1, end = 2 + wait_for_l1_block(&da_service, 2, None).await; + wait_for_l1_block(&da_service, 3, None).await; + seq_test_client.send_publish_batch_request().await; // wait here until we see from prover's rpc that it finished proving @@ -1273,11 +1305,12 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { .await; // prover should have synced all 4 l2 blocks + wait_for_l2_block(&prover_node_test_client, 4, None).await; assert_eq!(prover_node_test_client.eth_block_number().await, 4); seq_test_client.send_publish_batch_request().await; - // Still should have 4 blokcs there are no commitments yet + // Still should have 4 blocks there are no commitments yet wait_for_prover_l1_height( &prover_node_test_client, 4, @@ -1286,20 +1319,19 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { .await; assert_eq!(prover_node_test_client.eth_block_number().await, 4); - seq_test_client.send_publish_batch_request().await; - seq_test_client.send_publish_batch_request().await; - - // Still should have 4 blokcs there are no commitments yet + // Still should have 4 blocks there are no commitments yet assert_eq!(prover_node_test_client.eth_block_number().await, 4); + da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 4, None).await; + wait_for_l1_block(&da_service, 5, None).await; - // Commitment is sent right before the 9th block is published seq_test_client.send_publish_batch_request().await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( &prover_node_test_client, - 8, + 5, Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), ) .await; @@ -1308,10 +1340,10 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { // there is an extra soft confirmation due to the prover publishing a proof. This causes // a new MockDa block, which in turn causes the sequencer to publish an extra soft confirmation // becase it must not skip blocks. - assert_eq!(prover_node_test_client.eth_block_number().await, 8); + assert_eq!(prover_node_test_client.eth_block_number().await, 4); // on the 8th DA block, we should have a proof - let mut blobs = da_service.get_block_at(8).await.unwrap().blobs; + let mut blobs = da_service.get_block_at(4).await.unwrap().blobs; assert_eq!(blobs.len(), 1); @@ -1394,18 +1426,22 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { for _ in 0..3 { seq_test_client.send_publish_batch_request().await; } + wait_for_l2_block(&seq_test_client, 3, None).await; // prover should not have any blocks saved assert_eq!(prover_node_test_client.eth_block_number().await, 0); da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; seq_test_client.send_publish_batch_request().await; + wait_for_l2_block(&seq_test_client, 4, None).await; // sequencer commitment should be sent da_service.publish_test_block().await.unwrap(); - // start l1 height = 1, end = 2 - seq_test_client.send_publish_batch_request().await; + wait_for_l1_block(&da_service, 3, None).await; + // Block that contains the commitment + wait_for_l1_block(&da_service, 4, None).await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( @@ -1414,6 +1450,8 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), ) .await; + // Contains the proof + wait_for_l1_block(&da_service, 5, None).await; // prover should have synced all 4 l2 blocks assert_eq!(prover_node_test_client.eth_block_number().await, 4); @@ -1449,6 +1487,7 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { let prover_node_test_client = make_test_client(prover_node_port).await; seq_test_client.send_publish_batch_request().await; + wait_for_l2_block(&seq_test_client, 6, None).await; // Still should have 4 blocks there are no commitments yet assert_eq!(prover_node_test_client.eth_block_number().await, 4); @@ -1457,6 +1496,7 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { seq_test_client.send_publish_batch_request().await; seq_test_client.send_publish_batch_request().await; + wait_for_l2_block(&seq_test_client, 8, None).await; let _ = copy_dir_recursive(&prover_db_dir, &storage_dir.path().join("prover_copy2")); @@ -1486,12 +1526,18 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { let prover_node_port = prover_node_port_rx.await.unwrap(); let prover_node_test_client = make_test_client(prover_node_port).await; - // Still should have 4 blokcs there are no commitments yet - assert_eq!(prover_node_test_client.eth_block_number().await, 4); - da_service.publish_test_block().await.unwrap(); + // We have 8 blocks in total, make sure the prover syncs + // and starts proving the second commitment. + wait_for_l2_block(&prover_node_test_client, 8, None).await; + assert_eq!(prover_node_test_client.eth_block_number().await, 8); - // Commitment is sent right before the 9th block is published seq_test_client.send_publish_batch_request().await; + wait_for_l2_block(&seq_test_client, 9, None).await; + + da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 6, None).await; + // Commitment is sent + wait_for_l1_block(&da_service, 7, None).await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( @@ -1501,11 +1547,12 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { ) .await; - // Should now have 8 blocks = 2 commitments of blocks 1-4 and 5-9 + // Should now have 8 blocks = 2 commitments of blocks 1-4 and 5-8 // there is an extra soft confirmation due to the prover publishing a proof. This causes // a new MockDa block, which in turn causes the sequencer to publish an extra soft confirmation - assert_eq!(prover_node_test_client.eth_block_number().await, 9); - + // TODO: Debug why this is not including block 9 in the commitment + // https://github.com/chainwayxyz/citrea/issues/684 + assert!(prover_node_test_client.eth_block_number().await >= 8); // TODO: Also test with multiple commitments in single Mock DA Block seq_task.abort(); prover_node_task.abort(); @@ -1513,7 +1560,7 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { } #[tokio::test] -async fn test_system_transactons() -> Result<(), anyhow::Error> { +async fn test_system_transactions() -> Result<(), anyhow::Error> { // citrea::initialize_logging(); let system_contract_address = @@ -1532,6 +1579,7 @@ async fn test_system_transactons() -> Result<(), anyhow::Error> { for _ in 0..3 { da_service.publish_test_block().await.unwrap(); } + wait_for_l1_block(&da_service, 3, None).await; let (seq_test_client, full_node_test_client, seq_task, full_node_task, _) = initialize_test(TestConfig { @@ -1550,6 +1598,8 @@ async fn test_system_transactons() -> Result<(), anyhow::Error> { wait_for_l2_block(&seq_test_client, 5 * (i + 1), None).await; da_service.publish_test_block().await.unwrap(); + + wait_for_l1_block(&da_service, 4 + i, None).await; } seq_test_client.send_publish_batch_request().await; @@ -1693,6 +1743,8 @@ async fn test_system_tx_effect_on_block_gas_limit() -> Result<(), anyhow::Error> ..Default::default() }, db_config: Default::default(), + da_update_interval_ms: 1000, + block_production_interval_ms: 500, }), Some(true), DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, @@ -1898,6 +1950,8 @@ async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> { // second da block da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; + wait_for_l1_block(&da_service, 3, None).await; // before this the commitment will be sent // the commitment will be only in the first block so it is still not finalized @@ -1905,14 +1959,15 @@ async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> { seq_test_client.send_publish_batch_request().await; // wait for sync - wait_for_l2_block(&full_node_test_client, 5, None).await; + wait_for_l2_block(&full_node_test_client, 6, None).await; // should be synced - assert_eq!(full_node_test_client.eth_block_number().await, 5); + assert_eq!(full_node_test_client.eth_block_number().await, 6); // assume sequencer craashed seq_task.abort(); + wait_for_postgres_commitment(&db_test_client, 1, Some(Duration::from_secs(60))).await; let commitments = db_test_client.get_all_commitments().await.unwrap(); assert_eq!(commitments.len(), 1); @@ -1950,15 +2005,18 @@ async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> { let seq_test_client = make_test_client(seq_port).await; - wait_for_l2_block(&seq_test_client, 5, None).await; + wait_for_l2_block(&seq_test_client, 6, None).await; - assert_eq!(seq_test_client.eth_block_number().await as u64, 5); + assert_eq!(seq_test_client.eth_block_number().await as u64, 6); seq_test_client.send_publish_batch_request().await; seq_test_client.send_publish_batch_request().await; seq_test_client.send_publish_batch_request().await; da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 4, None).await; + wait_for_l1_block(&da_service, 5, None).await; + // new commitment will be sent here, it should send between 2 and 3 should not include 1 seq_test_client.send_publish_batch_request().await; @@ -2237,7 +2295,7 @@ async fn sequencer_crash_restore_mempool() -> Result<(), anyhow::Error> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_db_get_proof() { // citrea::initialize_logging(); @@ -2305,26 +2363,29 @@ async fn test_db_get_proof() { let prover_node_test_client = make_test_client(prover_node_port).await; da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; + da_service.publish_test_block().await.unwrap(); - // submits with new da block - test_client.send_publish_batch_request().await; - // prover node gets the commitment - test_client.send_publish_batch_request().await; - // da_service.publish_test_block().await.unwrap(); + // Commitment + wait_for_l1_block(&da_service, 3, None).await; + // Proof + wait_for_l1_block(&da_service, 4, None).await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( &prover_node_test_client, - 5, + 4, Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), ) .await; + wait_for_postgres_proofs(&db_test_client, 1, Some(Duration::from_secs(60))).await; + let ledger_proof = prover_node_test_client .ledger_get_proof_by_slot_height(4) .await; @@ -2360,7 +2421,7 @@ async fn test_db_get_proof() { prover_node_task.abort(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn full_node_verify_proof_and_store() { // citrea::initialize_logging(); @@ -2450,18 +2511,24 @@ async fn full_node_verify_proof_and_store() { let full_node_test_client = make_test_client(full_node_port).await; da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 1, None).await; + wait_for_l1_block(&da_service, 2, None).await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; + // submits with new da block, triggers commitment submission. da_service.publish_test_block().await.unwrap(); - // submits with new da block + // This is the above block created. + wait_for_l1_block(&da_service, 3, None).await; + // Commitment submitted + wait_for_l1_block(&da_service, 4, None).await; + // Full node sync commitment block test_client.send_publish_batch_request().await; - // prover node gets the commitment + wait_for_l2_block(&full_node_test_client, 5, None).await; + // Full node sync commitment block test_client.send_publish_batch_request().await; - // da_service.publish_test_block().await.unwrap(); + wait_for_l2_block(&full_node_test_client, 6, None).await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( @@ -2506,16 +2573,21 @@ async fn full_node_verify_proof_and_store() { // The proof will be in l1 block #5 because prover publishes it after the commitment and // in mock da submitting proof and commitments creates a new block. // For full node to see the proof, we publish another l2 block and now it will check #5 l1 block - test_client.send_publish_batch_request().await; - - wait_for_l2_block(&full_node_test_client, 7, None).await; wait_for_l1_block(&da_service, 5, None).await; + // Up until this moment, Full node has only seen 2 DA blocks. + // We need to force it to sync up to 5th DA block. + for i in 7..=8 { + test_client.send_publish_batch_request().await; + wait_for_l2_block(&full_node_test_client, i, None).await; + } + // So the full node should see the proof in block 5 + wait_for_proof(&full_node_test_client, 5, None).await; let full_node_proof = full_node_test_client .ledger_get_verified_proofs_by_slot_height(5) - .await; - + .await + .unwrap(); assert_eq!(prover_proof.proof, full_node_proof[0].proof); assert_eq!( @@ -2640,7 +2712,7 @@ async fn test_all_flow() { let full_node_test_client = make_test_client(full_node_port).await; da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 1, None).await; + wait_for_l1_block(&da_service, 2, None).await; test_client.send_publish_batch_request().await; @@ -2663,17 +2735,23 @@ async fn test_all_flow() { .unwrap(); test_client.send_publish_batch_request().await; + // Submit commitment da_service.publish_test_block().await.unwrap(); - // submits with new da block + // Commitment + wait_for_l1_block(&da_service, 3, None).await; + // Proof + wait_for_l1_block(&da_service, 4, None).await; + // Full node sync - commitment DA test_client.send_publish_batch_request().await; - // prover node gets the commitment + wait_for_l2_block(&full_node_test_client, 5, None).await; + // Full node sync - Proof DA test_client.send_publish_batch_request().await; - // da_service.publish_test_block().await.unwrap(); + wait_for_l2_block(&full_node_test_client, 6, None).await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( &prover_node_test_client, - 5, + 4, Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), ) .await; @@ -2726,14 +2804,16 @@ async fn test_all_flow() { // the proof will be in l1 block #5 because prover publishes it after the commitment and in mock da submitting proof and commitments creates a new block // For full node to see the proof, we publish another l2 block and now it will check #5 l1 block // 7th soft batch + wait_for_l1_block(&da_service, 5, None).await; test_client.send_publish_batch_request().await; - - sleep(Duration::from_secs(2)).await; + wait_for_l2_block(&full_node_test_client, 7, None).await; // So the full node should see the proof in block 5 + wait_for_proof(&full_node_test_client, 5, None).await; let full_node_proof = full_node_test_client .ledger_get_verified_proofs_by_slot_height(5) - .await; + .await + .unwrap(); assert_eq!(prover_proof.proof, full_node_proof[0].proof); @@ -2742,8 +2822,6 @@ async fn test_all_flow() { full_node_proof[0].state_transition ); - wait_for_l2_block(&full_node_test_client, 5, None).await; - full_node_test_client .ledger_get_soft_confirmation_status(5) .await @@ -2784,17 +2862,19 @@ async fn test_all_flow() { .unwrap(); // 8th soft batch test_client.send_publish_batch_request().await; - da_service.publish_test_block().await.unwrap(); + wait_for_l2_block(&full_node_test_client, 8, None).await; - // submits with new da block - test_client.send_publish_batch_request().await; - // prover node gets the commitment - test_client.send_publish_batch_request().await; + // Submit a commitment + da_service.publish_test_block().await.unwrap(); + // Commitment + wait_for_l1_block(&da_service, 6, None).await; + // Proof + wait_for_l1_block(&da_service, 7, None).await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( &prover_node_test_client, - 8, + 7, Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), ) .await; @@ -2823,15 +2903,16 @@ async fn test_all_flow() { ); // let full node see the proof - test_client.send_publish_batch_request().await; - - wait_for_l2_block(&full_node_test_client, 8, None).await; - - sleep(Duration::from_secs(2)).await; + for i in 9..13 { + test_client.send_publish_batch_request().await; + wait_for_l2_block(&full_node_test_client, i, None).await; + } + wait_for_proof(&full_node_test_client, 8, None).await; let full_node_proof_data = full_node_test_client .ledger_get_verified_proofs_by_slot_height(8) - .await; + .await + .unwrap(); assert_eq!(prover_proof_data.proof, full_node_proof_data[0].proof); assert_eq!( @@ -2853,7 +2934,6 @@ async fn test_all_flow() { for i in 1..=8 { // print statuses - let status = full_node_test_client .ledger_get_soft_confirmation_status(i) .await @@ -2863,13 +2943,16 @@ async fn test_all_flow() { assert_eq!(status, SoftConfirmationStatus::Proven); } - assert_eq!(test_client.eth_block_number().await, 11); + wait_for_l2_block(&test_client, 14, None).await; + assert_eq!(test_client.eth_block_number().await, 14); // Synced up to the latest block - assert_eq!(full_node_test_client.eth_block_number().await, 11); + wait_for_l2_block(&full_node_test_client, 14, Some(Duration::from_secs(60))).await; + assert!(full_node_test_client.eth_block_number().await >= 14); // Synced up to the latest commitment - assert_eq!(prover_node_test_client.eth_block_number().await, 8); + wait_for_l2_block(&prover_node_test_client, 9, Some(Duration::from_secs(60))).await; + assert!(prover_node_test_client.eth_block_number().await >= 9); seq_task.abort(); prover_node_task.abort(); @@ -2915,13 +2998,12 @@ async fn test_gas_limit_too_high() { test_mode: true, deposit_mempool_fetch_limit: 100, mempool_conf: SequencerMempoolConfig { - // Set the max number of txs per user account - // to be higher than the number of transactions - // we want to send. max_account_slots: tx_count * 2, ..Default::default() }, db_config: Default::default(), + da_update_interval_ms: 1000, + block_production_interval_ms: 1000, }), Some(true), 100, diff --git a/bin/citrea/tests/sequencer_commitments/mod.rs b/bin/citrea/tests/sequencer_commitments/mod.rs index 46b021e43..ea01ba832 100644 --- a/bin/citrea/tests/sequencer_commitments/mod.rs +++ b/bin/citrea/tests/sequencer_commitments/mod.rs @@ -21,9 +21,9 @@ use crate::{DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, DEFAULT_PROOF_WAIT_DURATION}; #[tokio::test] async fn sequencer_sends_commitments_to_da_layer() { - citrea::initialize_logging(); + // citrea::initialize_logging(); - let db_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); + let db_dir = tempdir_with_children(&["DA", "sequencer"]); let da_db_dir = db_dir.path().join("DA").to_path_buf(); let sequencer_db_dir = db_dir.path().join("sequencer").to_path_buf(); @@ -59,6 +59,7 @@ async fn sequencer_sends_commitments_to_da_layer() { } da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; let mut height = 1; let last_finalized = da_service @@ -82,9 +83,16 @@ async fn sequencer_sends_commitments_to_da_layer() { height += 1; } - da_service.publish_test_block().await.unwrap(); + // Publish one more L2 block test_client.send_publish_batch_request().await; + // Trigger a commitment + da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 3, None).await; + // The previous L1 block triggers a commitment + // which will create yet another L1 block. + wait_for_l1_block(&da_service, 4, None).await; + let start_l2_block: u64 = 1; let end_l2_block: u64 = 4; // can only be the block before the one comitment landed in let start_l1_block = 1; @@ -103,10 +111,8 @@ async fn sequencer_sends_commitments_to_da_layer() { test_client.send_publish_batch_request().await; } da_service.publish_test_block().await.unwrap(); - - test_client.send_publish_batch_request().await; - - wait_for_l2_block(&test_client, 5, None).await; + wait_for_l1_block(&da_service, 4, None).await; + wait_for_l1_block(&da_service, 5, None).await; let start_l2_block: u64 = end_l2_block + 1; let end_l2_block: u64 = end_l2_block + 5; // can only be the block before the one comitment landed in @@ -158,10 +164,8 @@ async fn check_sequencer_commitment( panic!("Expected SequencerCommitment, got {:?}", commitment); }; - let height = test_client.eth_block_number().await; - let commitments_last_soft_confirmation: SignedSoftConfirmationBatch = test_client - .ledger_get_soft_batch_by_number::(height - 1) // after commitment is sent another block is published + .ledger_get_soft_batch_by_number::(end_l2_block) // after commitment is sent another block is published .await .unwrap() .into(); @@ -240,6 +244,7 @@ async fn check_commitment_in_offchain_db() { let da_service = MockDaService::new(MockAddress::from([0; 32]), &da_db_dir); da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; // publish 3 soft confirmations, no commitment should be sent for _ in 0..3 { @@ -247,14 +252,15 @@ async fn check_commitment_in_offchain_db() { } da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 3, None).await; // publish 4th block test_client.send_publish_batch_request().await; - // new da block - da_service.publish_test_block().await.unwrap(); + wait_for_l2_block(&test_client, 4, None).await; // commitment should be published with this call - test_client.send_publish_batch_request().await; + da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 4, None).await; wait_for_postgres_commitment( &db_test_client, @@ -329,17 +335,19 @@ async fn test_ledger_get_commitments_on_slot() { let full_node_test_client = make_test_client(full_node_port).await; da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; da_service.publish_test_block().await.unwrap(); - // submits with new da block - test_client.send_publish_batch_request().await; + wait_for_l1_block(&da_service, 3, None).await; + // Commit + wait_for_l1_block(&da_service, 4, None).await; + // full node gets the commitment test_client.send_publish_batch_request().await; - // da_service.publish_test_block().await.unwrap(); wait_for_l2_block(&full_node_test_client, 6, None).await; @@ -377,7 +385,7 @@ async fn test_ledger_get_commitments_on_slot() { #[tokio::test] async fn test_ledger_get_commitments_on_slot_prover() { - citrea::initialize_logging(); + // citrea::initialize_logging(); let db_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); let da_db_dir = db_dir.path().join("DA").to_path_buf(); @@ -437,18 +445,17 @@ async fn test_ledger_get_commitments_on_slot_prover() { let prover_node_test_client = make_test_client(prover_node_port).await; da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 1, None).await; + wait_for_l1_block(&da_service, 2, None).await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; test_client.send_publish_batch_request().await; + da_service.publish_test_block().await.unwrap(); - // submits with new da block - test_client.send_publish_batch_request().await; - // prover node gets the commitment - test_client.send_publish_batch_request().await; - // da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 3, None).await; + // Commitment + wait_for_l1_block(&da_service, 4, None).await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( diff --git a/bin/citrea/tests/soft_confirmation_rule_enforcer/mod.rs b/bin/citrea/tests/soft_confirmation_rule_enforcer/mod.rs index dd2b30eff..d31a88a3b 100644 --- a/bin/citrea/tests/soft_confirmation_rule_enforcer/mod.rs +++ b/bin/citrea/tests/soft_confirmation_rule_enforcer/mod.rs @@ -1,9 +1,14 @@ +use std::time::Duration; + use citrea_stf::genesis_config::GenesisPaths; use sov_mock_da::{MockAddress, MockDaService}; +use tokio::time::sleep; use crate::evm::make_test_client; // use citrea::initialize_logging; -use crate::test_helpers::{start_rollup, tempdir_with_children, NodeMode}; +use crate::test_helpers::{ + start_rollup, tempdir_with_children, wait_for_l1_block, wait_for_l2_block, NodeMode, +}; use crate::{DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, DEFAULT_MIN_SOFT_CONFIRMATIONS_PER_COMMITMENT}; /// Transaction with equal nonce to last tx should not be accepted by mempool. @@ -54,22 +59,35 @@ async fn too_many_l2_block_per_l1_block() { if idx >= max_l2_blocks_per_l1 { // There should not be any more blocks published from this point // because the max L2 blocks per L1 is reached + wait_for_l2_block(&test_client, 10, None).await; assert_eq!(test_client.eth_block_number().await, 10); } } let mut last_block_number = test_client.eth_block_number().await; da_service.publish_test_block().await.unwrap(); + wait_for_l1_block(&da_service, 2, None).await; + + // Wait for the sequencer DA update interval to pass for it to recognize + // the new DA block. + sleep(Duration::from_secs(1)).await; for idx in 0..2 * max_l2_blocks_per_l1 + 1 { test_client.spam_publish_batch_request().await.unwrap(); if idx < max_l2_blocks_per_l1 { + wait_for_l2_block( + &test_client, + last_block_number + 1, + Some(Duration::from_secs(60)), + ) + .await; assert_eq!(test_client.eth_block_number().await, last_block_number + 1); } last_block_number += 1; if idx >= max_l2_blocks_per_l1 { // There should not be any more blocks published from this point // because the max L2 blocks per L1 is reached again + wait_for_l2_block(&test_client, 20, None).await; assert_eq!(test_client.eth_block_number().await, 20); } } diff --git a/bin/citrea/tests/test_client/mod.rs b/bin/citrea/tests/test_client/mod.rs index 71fb45971..42fa558a0 100644 --- a/bin/citrea/tests/test_client/mod.rs +++ b/bin/citrea/tests/test_client/mod.rs @@ -621,11 +621,11 @@ impl TestClient { pub(crate) async fn ledger_get_verified_proofs_by_slot_height( &self, height: u64, - ) -> Vec { + ) -> Option> { self.http_client .request("ledger_getVerifiedProofsBySlotHeight", rpc_params![height]) .await - .unwrap() + .ok() } pub(crate) async fn ledger_get_sequencer_commitments_on_slot_by_hash( diff --git a/bin/citrea/tests/test_helpers/mod.rs b/bin/citrea/tests/test_helpers/mod.rs index 6fce66500..c32f83ee8 100644 --- a/bin/citrea/tests/test_helpers/mod.rs +++ b/bin/citrea/tests/test_helpers/mod.rs @@ -158,6 +158,8 @@ pub fn create_default_sequencer_config( mempool_conf: Default::default(), // Offchain db will be active only in some tests db_config: None, + da_update_interval_ms: 500, + block_production_interval_ms: 500, // since running in test mode, we can set this to a lower value } } @@ -187,7 +189,7 @@ pub async fn wait_for_l2_block(sequencer_client: &TestClient, num: u64, timeout: let now = SystemTime::now(); if start + timeout <= now { - panic!("Timeout"); + panic!("Timeout. Latest L2 block is {:?}", latest_block.number); } sleep(Duration::from_secs(1)).await; @@ -210,7 +212,7 @@ pub async fn wait_for_prover_l1_height( let now = SystemTime::now(); if start + timeout <= now { - panic!("Timeout"); + panic!("Timeout. Latest prover L1 height is {}", latest_block); } sleep(Duration::from_secs(1)).await; @@ -229,11 +231,39 @@ pub async fn wait_for_l1_block(da_service: &MockDaService, num: u64, timeout: Op let now = SystemTime::now(); if start + timeout <= now { - panic!("Timeout"); + panic!("Timeout. Latest L1 block is {}", da_block); } sleep(Duration::from_secs(1)).await; } + // Let knowledgage of the new DA block propagate + sleep(Duration::from_secs(2)).await; +} + +pub async fn wait_for_proof(test_client: &TestClient, slot_height: u64, timeout: Option) { + let start = SystemTime::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(60)); // Default 60 seconds timeout + loop { + debug!( + "Waiting for L1 block height containing zkproof {}", + slot_height + ); + let proof = test_client + .ledger_get_verified_proofs_by_slot_height(slot_height) + .await; + if proof.is_some() { + break; + } + + let now = SystemTime::now(); + if start + timeout <= now { + panic!("Timeout while waiting for proof at height {}", slot_height); + } + + sleep(Duration::from_secs(1)).await; + } + // Let knowledgage of the new DA block propagate + sleep(Duration::from_secs(2)).await; } pub async fn wait_for_postgres_commitment( @@ -252,7 +282,30 @@ pub async fn wait_for_postgres_commitment( let now = SystemTime::now(); if start + timeout <= now { - panic!("Timeout"); + panic!("Timeout. {} commitments exist at this point", commitments); + } + + sleep(Duration::from_secs(1)).await; + } +} + +pub async fn wait_for_postgres_proofs( + db_test_client: &PostgresConnector, + num: usize, + timeout: Option, +) { + let start = SystemTime::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(30)); // Default 30 seconds timeout + loop { + debug!("Waiting for {} L1 proofs to be published", num); + let commitments = db_test_client.get_all_proof_data().await.unwrap().len(); + if commitments >= num { + break; + } + + let now = SystemTime::now(); + if start + timeout <= now { + panic!("Timeout. {} proofs exist at this point", commitments); } sleep(Duration::from_secs(1)).await; diff --git a/configs/bitcoin-regtest/sequencer_config.toml b/configs/bitcoin-regtest/sequencer_config.toml index 80e1d519c..34756184f 100644 --- a/configs/bitcoin-regtest/sequencer_config.toml +++ b/configs/bitcoin-regtest/sequencer_config.toml @@ -2,6 +2,8 @@ private_key = "1212121212121212121212121212121212121212121212121212121212121212" min_soft_confirmations_per_commitment = 1000 test_mode = false deposit_mempool_fetch_limit = 10 +block_production_interval_ms = 1000 +da_update_interval_ms = 2000 [mempool_conf] # Mempool Configuration - https://github.com/ledgerwatch/erigon/wiki/Transaction-Pool-Design pending_tx_limit = 100000 diff --git a/configs/mock-dockerized/sequencer_config.toml b/configs/mock-dockerized/sequencer_config.toml index 2a981bb0b..6e0e00f2f 100644 --- a/configs/mock-dockerized/sequencer_config.toml +++ b/configs/mock-dockerized/sequencer_config.toml @@ -2,6 +2,8 @@ private_key = "1212121212121212121212121212121212121212121212121212121212121212" min_soft_confirmations_per_commitment = 10 test_mode = false deposit_mempool_fetch_limit = 10 +block_production_interval_ms = 1000 +da_update_interval_ms = 2000 [mempool_conf] # Mempool Configuration - https://github.com/ledgerwatch/erigon/wiki/Transaction-Pool-Design pending_tx_limit = 100000 diff --git a/configs/mock/sequencer_config.toml b/configs/mock/sequencer_config.toml index 2a981bb0b..6e0e00f2f 100644 --- a/configs/mock/sequencer_config.toml +++ b/configs/mock/sequencer_config.toml @@ -2,6 +2,8 @@ private_key = "1212121212121212121212121212121212121212121212121212121212121212" min_soft_confirmations_per_commitment = 10 test_mode = false deposit_mempool_fetch_limit = 10 +block_production_interval_ms = 1000 +da_update_interval_ms = 2000 [mempool_conf] # Mempool Configuration - https://github.com/ledgerwatch/erigon/wiki/Transaction-Pool-Design pending_tx_limit = 100000 diff --git a/crates/sequencer/src/commitment_controller.rs b/crates/sequencer/src/commitment_controller.rs index 938fb756f..5bb1f1376 100644 --- a/crates/sequencer/src/commitment_controller.rs +++ b/crates/sequencer/src/commitment_controller.rs @@ -52,7 +52,7 @@ pub fn get_commitment_info( let Some((l2_start, mut l2_end)) = ledger_db.get_l2_range_by_l1_height(SlotNumber(l1_start))? else { - bail!("Sequencer: Failed to get L1 L2 connection"); + return Ok(None); }; // Take while sum of l2 ranges <= min_soft_confirmations_per_commitment diff --git a/crates/sequencer/src/config.rs b/crates/sequencer/src/config.rs index da039288b..666f84547 100644 --- a/crates/sequencer/src/config.rs +++ b/crates/sequencer/src/config.rs @@ -16,6 +16,10 @@ pub struct SequencerConfig { pub mempool_conf: SequencerMempoolConfig, /// Offchain db config pub db_config: Option, + /// DA layer update loop interval in ms + pub da_update_interval_ms: u64, + /// Block production interval in ms + pub block_production_interval_ms: u64, } /// Mempool Config for the sequencer @@ -74,6 +78,8 @@ mod tests { min_soft_confirmations_per_commitment = 123 test_mode = false deposit_mempool_fetch_limit = 10 + da_update_interval_ms = 1000 + block_production_interval_ms = 1000 [mempool_conf] pending_tx_limit = 100000 pending_tx_size = 200 @@ -110,6 +116,8 @@ mod tests { max_account_slots: 16, }, db_config: Some(SharedBackupDbConfig::default()), + da_update_interval_ms: 1000, + block_production_interval_ms: 1000, }; assert_eq!(config, expected); } diff --git a/crates/sequencer/src/sequencer.rs b/crates/sequencer/src/sequencer.rs index 89bcabe90..d97cf226a 100644 --- a/crates/sequencer/src/sequencer.rs +++ b/crates/sequencer/src/sequencer.rs @@ -42,8 +42,8 @@ use sov_rollup_interface::storage::HierarchicalStorageManager; use sov_rollup_interface::zk::ZkvmHost; use sov_stf_runner::{InitVariant, RollupPublicKeys, RpcConfig}; use tokio::sync::oneshot::channel as oneshot_channel; -use tokio::sync::Mutex; -use tokio::time::sleep; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::{sleep, Instant}; use tracing::{debug, error, info, instrument, warn}; use crate::commitment_controller; @@ -55,6 +55,10 @@ use crate::rpc::{create_rpc_module, RpcContext}; use crate::utils::recover_raw_transaction; type StateRoot = >::StateRoot; +/// Represents information about the current DA state. +/// +/// Contains previous height, latest finalized block and fee rate. +type L1Data = (::FilteredBlock, u128); pub struct CitreaSequencer where @@ -91,7 +95,7 @@ enum L2BlockMode { impl CitreaSequencer where C: Context, - Da: DaService, + Da: DaService + Clone, Sm: HierarchicalStorageManager, Vm: ZkvmHost, Stf: StateTransitionFunction< @@ -142,8 +146,7 @@ where let deposit_mempool = Arc::new(Mutex::new(DepositDataMempool::new())); - let sov_tx_signer_priv_key = - C::PrivateKey::try_from(&hex::decode(&config.private_key).unwrap()).unwrap(); + let sov_tx_signer_priv_key = C::PrivateKey::try_from(&hex::decode(&config.private_key)?)?; let soft_confirmation_rule_enforcer = SoftConfirmationRuleEnforcer::::Spec>::default(); @@ -328,7 +331,8 @@ where l1_fee_rate: u128, l2_block_mode: L2BlockMode, pg_pool: &Option, - ) -> anyhow::Result<()> { + last_used_l1_height: u64, + ) -> anyhow::Result { let da_height = da_block.header().height(); let (l2_height, l1_height) = match self .ledger_db @@ -458,7 +462,7 @@ where tracing::debug!("Finalizing l2 height: {:?}", l2_height); self.storage_manager.finalize_l2(l2_height)?; - return Ok(()); + return Ok(last_used_l1_height); } info!( @@ -527,11 +531,14 @@ where } }); } + // connect L1 and L2 height self.ledger_db.extend_l2_range_of_l1_slot( SlotNumber(da_block.header().height()), BatchNumber(l2_height), )?; + + Ok(da_block.header().height()) } (Err(err), batch_workspace) => { warn!( @@ -539,209 +546,99 @@ where err ); batch_workspace.revert(); - return Err(anyhow!( + Err(anyhow!( "Failed to apply begin soft confirmation hook: {:?}", err - )); - } - } - Ok(()) - } - - #[instrument(level = "trace", skip_all, err, ret)] - pub async fn build_block( - &mut self, - pg_pool: &Option, - da_height_tx: UnboundedSender, - ) -> anyhow::Result<()> { - // best txs with base fee - // get base fee from last blocks => header => next base fee() function - - let mut prev_l1_height = self - .ledger_db - .get_head_soft_batch()? - .map(|(_, sb)| sb.da_slot_height); - - if prev_l1_height.is_none() { - prev_l1_height = Some( - self.da_service - .get_last_finalized_block_header() - .await - .map_err(|e| anyhow!(e))? - .height(), - ); - } - - let prev_l1_height = prev_l1_height.expect("Should be set at this point"); - - debug!("Sequencer: prev L1 height: {:?}", prev_l1_height); - - let last_finalized_height = self - .da_service - .get_last_finalized_block_header() - .await - .map_err(|e| anyhow!(e))? - .height(); - - debug!( - "Sequencer: last finalized height: {:?}", - last_finalized_height - ); - - let fee_rate_range = self.get_l1_fee_rate_range()?; - - let l1_fee_rate = self - .da_service - .get_fee_rate() - .await - .map_err(|e| anyhow!(e))?; - - let l1_fee_rate = l1_fee_rate.clamp(*fee_rate_range.start(), *fee_rate_range.end()); - - let last_commitable_l1_height = match last_finalized_height.cmp(&prev_l1_height) { - Ordering::Less => { - panic!("DA L1 height is less than Ledger finalized height"); - } - Ordering::Equal => None, - Ordering::Greater => { - // Compare if there is no skip - if last_finalized_height - prev_l1_height > 1 { - // This shouldn't happen. If it does, then we should produce at least 1 block for the blocks in between - for skipped_height in (prev_l1_height + 1)..last_finalized_height { - debug!( - "Sequencer: publishing empty L2 for skipped L1 block: {:?}", - skipped_height - ); - let da_block = self - .da_service - .get_block_at(skipped_height) - .await - .map_err(|e| anyhow!(e))?; - // pool does not need to be passed here as no tx is included - self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &None) - .await?; - } - } - let last_commitable_l1_height = last_finalized_height - 1; - Some(last_commitable_l1_height) + )) } - }; - - if let Some(last_commitable_l1_height) = last_commitable_l1_height { - da_height_tx - .unbounded_send(last_commitable_l1_height) - .expect("Commitment thread is dead"); - // TODO: this is where we would include forced transactions from the new L1 block } - - let last_finalized_block = self - .da_service - .get_block_at(last_finalized_height) - .await - .map_err(|e| anyhow!(e))?; - - self.produce_l2_block( - last_finalized_block, - l1_fee_rate, - L2BlockMode::NotEmpty, - pg_pool, - ) - .await?; - Ok(()) } - fn spawn_commitment_thread(&self) -> UnboundedSender { - let (da_height_tx, mut da_height_rx) = unbounded::(); - let ledger_db = self.ledger_db.clone(); + async fn submit_commitment(&self, prev_l1_height: u64) -> anyhow::Result<()> { + debug!("Sequencer: new L1 block, checking if commitment should be submitted"); let inscription_queue = self.da_service.get_send_transaction_queue(); let min_soft_confirmations_per_commitment = self.config.min_soft_confirmations_per_commitment; - let db_config = self.config.db_config.clone(); - tokio::spawn(async move { - while let Some(prev_l1_height) = da_height_rx.next().await { - debug!("Sequencer: new L1 block, checking if commitment should be submitted"); - - let commitment_info = commitment_controller::get_commitment_info( - &ledger_db, - min_soft_confirmations_per_commitment, - prev_l1_height, - ) - .unwrap(); // TODO unwrap() - - if let Some(commitment_info) = commitment_info { - debug!("Sequencer: enough soft confirmations to submit commitment"); - let l2_range_to_submit = commitment_info.l2_height_range.clone(); - - // calculate exclusive range end - let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly - - let soft_confirmation_hashes = ledger_db - .get_soft_batch_range(&(*l2_range_to_submit.start()..range_end)) - .unwrap() // TODO unwrap - .iter() - .map(|sb| sb.hash) - .collect::>(); - - let commitment = commitment_controller::get_commitment( - commitment_info.clone(), - soft_confirmation_hashes, - ) - .unwrap(); // TODO unwrap - - info!("Sequencer: submitting commitment: {:?}", commitment); - - let blob = DaData::SequencerCommitment(commitment.clone()) - .try_to_vec() - .map_err(|e| anyhow!(e)) - .unwrap(); // TODO unwrap - let (notify, rx) = oneshot_channel(); - let request = BlobWithNotifier { blob, notify }; - inscription_queue - .send(request) - .expect("Bitcoin service already stopped"); - let tx_id = rx - .await - .expect("DA service is dead") - .expect("send_transaction cannot fail"); - - ledger_db - .set_last_sequencer_commitment_l1_height(SlotNumber( - commitment_info.l1_height_range.end().0, - )) - .expect("Sequencer: Failed to set last sequencer commitment L1 height"); - - warn!("Commitment info: {:?}", commitment_info); - let l1_start_height = commitment_info.l1_height_range.start().0; - let l1_end_height = commitment_info.l1_height_range.end().0; - let l2_start = l2_range_to_submit.start().0 as u32; - let l2_end = l2_range_to_submit.end().0 as u32; - if let Some(db_config) = db_config.clone() { - match PostgresConnector::new(db_config).await { - Ok(pg_connector) => { - pg_connector - .insert_sequencer_commitment( - l1_start_height as u32, - l1_end_height as u32, - Into::<[u8; 32]>::into(tx_id).to_vec(), - commitment.l1_start_block_hash.to_vec(), - commitment.l1_end_block_hash.to_vec(), - l2_start, - l2_end, - commitment.merkle_root.to_vec(), - CommitmentStatus::Mempool, - ) - .await - .expect("Sequencer: Failed to insert sequencer commitment"); - } - Err(e) => { - warn!("Failed to connect to postgres: {:?}", e); - } - } + let commitment_info = commitment_controller::get_commitment_info( + &self.ledger_db, + min_soft_confirmations_per_commitment, + prev_l1_height, + )?; + + if let Some(commitment_info) = commitment_info { + debug!("Sequencer: enough soft confirmations to submit commitment"); + let l2_range_to_submit = commitment_info.l2_height_range.clone(); + + // calculate exclusive range end + let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly + + let soft_confirmation_hashes = self + .ledger_db + .get_soft_batch_range(&(*l2_range_to_submit.start()..range_end))? + .iter() + .map(|sb| sb.hash) + .collect::>(); + + let commitment = commitment_controller::get_commitment( + commitment_info.clone(), + soft_confirmation_hashes, + )?; + + info!("Sequencer: submitting commitment: {:?}", commitment); + + let blob = DaData::SequencerCommitment(commitment.clone()) + .try_to_vec() + .map_err(|e| anyhow!(e))?; + let (notify, rx) = oneshot_channel(); + let request = BlobWithNotifier { blob, notify }; + inscription_queue + .send(request) + .map_err(|_| anyhow!("Bitcoin service already stopped!"))?; + let tx_id = rx + .await + .map_err(|_| anyhow!("DA service is dead!"))? + .map_err(|_| anyhow!("Send transaction cannot fail"))?; + + self.ledger_db + .set_last_sequencer_commitment_l1_height(SlotNumber( + commitment_info.l1_height_range.end().0, + )) + .map_err(|_| { + anyhow!("Sequencer: Failed to set last sequencer commitment L1 height") + })?; + + warn!("Commitment info: {:?}", commitment_info); + let l1_start_height = commitment_info.l1_height_range.start().0; + let l1_end_height = commitment_info.l1_height_range.end().0; + let l2_start = l2_range_to_submit.start().0 as u32; + let l2_end = l2_range_to_submit.end().0 as u32; + if let Some(db_config) = self.config.db_config.clone() { + match PostgresConnector::new(db_config).await { + Ok(pg_connector) => { + pg_connector + .insert_sequencer_commitment( + l1_start_height as u32, + l1_end_height as u32, + Into::<[u8; 32]>::into(tx_id).to_vec(), + commitment.l1_start_block_hash.to_vec(), + commitment.l1_end_block_hash.to_vec(), + l2_start, + l2_end, + commitment.merkle_root.to_vec(), + CommitmentStatus::Mempool, + ) + .await + .map_err(|_| { + anyhow!("Sequencer: Failed to insert sequencer commitment") + })?; + } + Err(e) => { + warn!("Failed to connect to postgres: {:?}", e); } } } - }); - da_height_tx + } + Ok(()) } #[instrument(level = "trace", skip(self), err, ret)] @@ -752,8 +649,6 @@ where .await .map_err(|e| anyhow!(e))?; - let da_height_tx = self.spawn_commitment_thread(); - // If connected to offchain db first check if the commitments are in sync let mut pg_pool = None; if let Some(db_config) = self.config.db_config.clone() { @@ -780,22 +675,184 @@ where }; } - // If sequencer is in test mode, it will build a block every time it receives a message - if self.config.test_mode { - loop { - if (self.l2_force_block_rx.next().await).is_some() { - if let Err(e) = self.build_block(&pg_pool, da_height_tx.clone()).await { - error!("Sequencer error: {}", e); - } + // Initialize our knowledge of the state of the DA-layer + let fee_rate_range = get_l1_fee_rate_range::( + self.storage.clone(), + self.soft_confirmation_rule_enforcer.clone(), + )?; + let (mut last_finalized_block, l1_fee_rate) = + match get_da_block_data(self.da_service.clone()).await { + Ok(l1_data) => l1_data, + Err(e) => { + error!("{}", e); + return Err(e); } + }; + let mut l1_fee_rate = l1_fee_rate.clamp(*fee_rate_range.start(), *fee_rate_range.end()); + let mut last_finalized_height = last_finalized_block.header().height(); + + let mut last_used_l1_height = match self.ledger_db.get_head_soft_batch() { + Ok(Some((_, sb))) => sb.da_slot_height, + Ok(None) => last_finalized_height, // starting for the first time + Err(e) => { + return Err(anyhow!("previous L1 height: {}", e)); } - } - // If sequencer is in production mode, it will build a block every 2 seconds - else { - loop { - sleep(Duration::from_secs(2)).await; - if let Err(e) = self.build_block(&pg_pool, da_height_tx.clone()).await { - error!("Sequencer error: {}", e); + }; + + debug!("Sequencer: Last used L1 height: {:?}", last_used_l1_height); + + // Setup required workers to update our knowledge of the DA layer every X seconds (configurable). + let (da_height_update_tx, mut da_height_update_rx) = mpsc::channel(1); + let (da_commitment_tx, mut da_commitment_rx) = unbounded::(); + let da_monitor = da_block_monitor( + self.da_service.clone(), + da_height_update_tx, + self.config.da_update_interval_ms, + ); + tokio::pin!(da_monitor); + + let target_block_time = Duration::from_millis(self.config.block_production_interval_ms); + let mut parent_block_exec_time = Duration::from_secs(0); + + // In case the sequencer falls behind on DA blocks, we need to produce at least 1 + // empty block per DA block. Which means that we have to keep count of missed blocks + // and only resume normal operations once the sequencer has caught up. + let mut missed_da_blocks_count = 0; + + loop { + let mut interval = tokio::time::interval(target_block_time - parent_block_exec_time); + // The first ticket completes immediately. + // See: https://docs.rs/tokio/latest/tokio/time/struct.Interval.html#method.tick + interval.tick().await; + + tokio::select! { + // Run the DA monitor worker + _ = &mut da_monitor => {}, + // Receive updates from DA layer worker. + l1_data = da_height_update_rx.recv() => { + // Stop receiving updates from DA layer until we have caught up. + if missed_da_blocks_count > 0 { + continue; + } + if let Some(l1_data) = l1_data { + (last_finalized_block, l1_fee_rate) = l1_data; + last_finalized_height = last_finalized_block.header().height(); + + if last_finalized_block.header().height() > last_used_l1_height { + let skipped_blocks = last_finalized_height - last_used_l1_height - 1; + if skipped_blocks > 0 { + // This shouldn't happen. If it does, then we should produce at least 1 block for the blocks in between + warn!( + "Sequencer is falling behind on L1 blocks by {:?} blocks", + skipped_blocks + ); + + // Missed DA blocks means that we produce n - 1 empty blocks, 1 per missed DA block. + missed_da_blocks_count = skipped_blocks; + } + } + + if let Err(e) = self.maybe_submit_commitment(da_commitment_tx.clone(), last_finalized_height, last_used_l1_height).await { + error!("Sequencer error: {}", e); + } + } + }, + prev_l1_height = da_commitment_rx.select_next_some() => { + if let Err(e) = self.submit_commitment(prev_l1_height).await { + error!("Failed to submit commitment: {}", e); + } + }, + // If sequencer is in test mode, it will build a block every time it receives a message + // The RPC from which the sender can be called is only registered for test mode. This means + // that evey though we check the receiver here, it'll never be "ready" to be consumed unless in test mode. + _ = self.l2_force_block_rx.next(), if self.config.test_mode => { + if missed_da_blocks_count > 0 { + debug!("We have {} missed DA blocks", missed_da_blocks_count); + for _ in 1..=missed_da_blocks_count { + let needed_da_block_height = last_used_l1_height + 1; + let da_block = self + .da_service + .get_block_at(needed_da_block_height) + .await + .map_err(|e| anyhow!(e))?; + + debug!("Created an empty L2 for L1={}", needed_da_block_height); + if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height).await { + error!("Sequencer error: {}", e); + } + } + missed_da_blocks_count = 0; + } + + let l1_fee_rate_range = + match get_l1_fee_rate_range::(self.storage.clone(), self.soft_confirmation_rule_enforcer.clone()) { + Ok(fee_rate_range) => fee_rate_range, + Err(e) => { + error!("Could not fetch L1 fee rate range: {}", e); + continue; + } + }; + let l1_fee_rate = l1_fee_rate.clamp(*l1_fee_rate_range.start(), *l1_fee_rate_range.end()); + match self.produce_l2_block(last_finalized_block.clone(), l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height).await { + Ok(l1_block_number) => { + last_used_l1_height = l1_block_number; + }, + Err(e) => { + error!("Sequencer error: {}", e); + } + } + }, + // If sequencer is in production mode, it will build a block every 2 seconds + _ = interval.tick(), if !self.config.test_mode => { + // By default, we produce a non-empty block IFF we were caught up all the way to + // last_finalized_block. If there are missed DA blocks, we start producing + // empty blocks at ~2 second rate, 1 L2 block per respective missed DA block + // until we know we caught up with L1. + let da_block = last_finalized_block.clone(); + + if missed_da_blocks_count > 0 { + debug!("We have {} missed DA blocks", missed_da_blocks_count); + for _ in 1..=missed_da_blocks_count { + let needed_da_block_height = last_used_l1_height + 1; + let da_block = self + .da_service + .get_block_at(needed_da_block_height) + .await + .map_err(|e| anyhow!(e))?; + + debug!("Created an empty L2 for L1={}", needed_da_block_height); + if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height).await { + error!("Sequencer error: {}", e); + } + } + missed_da_blocks_count = 0; + } + + let l1_fee_rate_range = + match get_l1_fee_rate_range::(self.storage.clone(), self.soft_confirmation_rule_enforcer.clone()) { + Ok(fee_rate_range) => fee_rate_range, + Err(e) => { + error!("Could not fetch L1 fee rate range: {}", e); + continue; + } + }; + let l1_fee_rate = l1_fee_rate.clamp(*l1_fee_rate_range.start(), *l1_fee_rate_range.end()); + + let instant = Instant::now(); + match self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height).await { + Ok(l1_block_number) => { + // Set the next iteration's wait time to produce a block based on the + // previous block's execution time. + // This is mainly to make sure we account for the execution time to + // achieve consistent 2-second block production. + parent_block_exec_time = instant.elapsed(); + + last_used_l1_height = l1_block_number; + }, + Err(e) => { + error!("Sequencer error: {}", e); + } + } } } } @@ -926,8 +983,7 @@ where let mempool_txs = pg_connector.get_all_txs().await?; for tx in mempool_txs { let recovered = - recover_raw_transaction(reth_primitives::Bytes::from(tx.tx.as_slice().to_vec())) - .unwrap(); + recover_raw_transaction(reth_primitives::Bytes::from(tx.tx.as_slice().to_vec()))?; let pooled_tx = EthPooledTransaction::from_recovered_pooled_transaction(recovered); let _ = self.mempool.add_external_transaction(pooled_tx).await?; @@ -961,12 +1017,29 @@ where } } - fn get_l1_fee_rate_range(&self) -> Result, anyhow::Error> { - let mut working_set = WorkingSet::::new(self.storage.clone()); + async fn maybe_submit_commitment( + &self, + da_commitment_tx: UnboundedSender, + last_finalized_height: u64, + last_used_l1_height: u64, + ) -> anyhow::Result<()> { + let commit_up_to = match last_finalized_height.cmp(&last_used_l1_height) { + Ordering::Less => { + panic!("DA L1 height is less than Ledger finalized height. DA L1 height: {}, Finalized height: {}", last_finalized_height, last_used_l1_height); + } + Ordering::Equal => None, + Ordering::Greater => { + let commit_up_to = last_finalized_height - 1; + Some(commit_up_to) + } + }; - self.soft_confirmation_rule_enforcer - .get_next_min_max_l1_fee_rate(&mut working_set) - .map_err(|e| anyhow::anyhow!("Error reading min max l1 fee rate: {}", e)) + if let Some(commit_up_to) = commit_up_to { + if da_commitment_tx.unbounded_send(commit_up_to).is_err() { + error!("Commitment thread is dead!"); + } + } + Ok(()) } fn get_account_updates(&self) -> Result, anyhow::Error> { @@ -999,3 +1072,70 @@ where Ok(updates) } } + +fn get_l1_fee_rate_range( + storage: C::Storage, + rule_enforcer: SoftConfirmationRuleEnforcer, +) -> Result, anyhow::Error> +where + C: Context, + Da: DaService, +{ + let mut working_set = WorkingSet::::new(storage); + + rule_enforcer + .get_next_min_max_l1_fee_rate(&mut working_set) + .map_err(|e| anyhow::anyhow!("Error reading min max l1 fee rate: {}", e)) +} + +async fn da_block_monitor(da_service: Da, sender: mpsc::Sender>, loop_interval: u64) +where + Da: DaService + Clone, +{ + loop { + let l1_data = match get_da_block_data(da_service.clone()).await { + Ok(l1_data) => l1_data, + Err(e) => { + error!("Could not fetch L1 data, {}", e); + continue; + } + }; + + let _ = sender.send(l1_data).await; + + sleep(Duration::from_millis(loop_interval)).await; + } +} + +async fn get_da_block_data(da_service: Da) -> anyhow::Result> +where + Da: DaService, +{ + let last_finalized_height = match da_service.get_last_finalized_block_header().await { + Ok(header) => header.height(), + Err(e) => { + return Err(anyhow!("Finalized height: {}", e)); + } + }; + + let last_finalized_block = match da_service.get_block_at(last_finalized_height).await { + Ok(block) => block, + Err(e) => { + return Err(anyhow!("Finalized block: {}", e)); + } + }; + + debug!( + "Sequencer: last finalized height: {:?}", + last_finalized_height + ); + + let l1_fee_rate = match da_service.get_fee_rate().await { + Ok(fee_rate) => fee_rate, + Err(e) => { + return Err(anyhow!("L1 fee rate: {}", e)); + } + }; + + Ok((last_finalized_block, l1_fee_rate)) +} diff --git a/crates/sovereign-sdk/full-node/sov-stf-runner/src/runner.rs b/crates/sovereign-sdk/full-node/sov-stf-runner/src/runner.rs index 5768755db..de107b978 100644 --- a/crates/sovereign-sdk/full-node/sov-stf-runner/src/runner.rs +++ b/crates/sovereign-sdk/full-node/sov-stf-runner/src/runner.rs @@ -413,7 +413,7 @@ where // start fetching blocks from sequencer, when you see a soft batch with l1 height more than end_l1_height, stop // while getting the blocks to all the same ops as full node // after stopping call continue and look for a new seq_commitment - // change the itemnumbers only after the sync is done so not for every da block + // change the item numbers only after the sync is done so not for every da block loop { let inner_client = &self.sequencer_client; diff --git a/crates/sovereign-sdk/module-system/module-implementations/sov-prover-incentives/Cargo.toml b/crates/sovereign-sdk/module-system/module-implementations/sov-prover-incentives/Cargo.toml index 52aee9380..2ae139d46 100644 --- a/crates/sovereign-sdk/module-system/module-implementations/sov-prover-incentives/Cargo.toml +++ b/crates/sovereign-sdk/module-system/module-implementations/sov-prover-incentives/Cargo.toml @@ -16,9 +16,7 @@ tempfile = { workspace = true } sov-mock-da = { path = "../../../adapters/mock-da", features = ["native"] } sov-mock-zkvm = { path = "../../../adapters/mock-zkvm" } sov-modules-api = { path = "../../sov-modules-api", features = ["native"] } -sov-prover-storage-manager = { path = "../../../full-node/sov-prover-storage-manager", features = [ - "test-utils", -] } +sov-prover-storage-manager = { path = "../../../full-node/sov-prover-storage-manager", features = ["test-utils"] } [dependencies]