diff --git a/bin/citrea/tests/e2e/mod.rs b/bin/citrea/tests/e2e/mod.rs index 1ddbcc67e..a8e46ea5f 100644 --- a/bin/citrea/tests/e2e/mod.rs +++ b/bin/citrea/tests/e2e/mod.rs @@ -1129,7 +1129,7 @@ async fn test_soft_confirmations_status_one_l1() -> Result<(), anyhow::Error> { #[tokio::test(flavor = "multi_thread")] async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> { - // citrea::initialize_logging(tracing::Level::INFO); + // citrea::initialize_logging(tracing::Level::DEBUG); let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); let da_db_dir = storage_dir.path().join("DA").to_path_buf(); @@ -1221,7 +1221,7 @@ async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> { #[tokio::test(flavor = "multi_thread")] async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { - // citrea::initialize_logging(tracing::Level::INFO); + // citrea::initialize_logging(tracing::Level::DEBUG); let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover"]); let da_db_dir = storage_dir.path().join("DA").to_path_buf(); @@ -1292,12 +1292,10 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { // start l1 height = 1, end = 2 seq_test_client.send_publish_batch_request().await; - // sequencer commitment should be sent - da_service.publish_test_block().await.unwrap(); wait_for_l1_block(&da_service, 2, None).await; - wait_for_l1_block(&da_service, 3, None).await; + // Submit an L2 block to prevent sequencer from falling behind. seq_test_client.send_publish_batch_request().await; // wait here until we see from prover's rpc that it finished proving @@ -1308,43 +1306,49 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { ) .await; + // Submit an L2 block to prevent sequencer from falling behind. + seq_test_client.send_publish_batch_request().await; + // prover should have synced all 6 l2 blocks // ps there are 6 blocks because: // when a new proof is submitted in mock da a new empty da block is published // and for every empty da block sequencer publishes a new empty soft confirmation in order to not skip a block wait_for_l2_block(&prover_node_test_client, 6, None).await; + sleep(Duration::from_secs(1)).await; assert_eq!(prover_node_test_client.eth_block_number().await, 6); - seq_test_client.send_publish_batch_request().await; - da_service.publish_test_block().await.unwrap(); + // Trigger another commitment + for _ in 7..=8 { + seq_test_client.send_publish_batch_request().await; + } + wait_for_l2_block(&seq_test_client, 8, None).await; + // Allow for the L2 block to be commited and stored + // Otherwise, the L2 block height might be registered but it hasn't + // been processed inside the EVM yet. + sleep(Duration::from_secs(1)).await; + assert_eq!(seq_test_client.eth_block_number().await, 8); wait_for_l1_block(&da_service, 4, None).await; - // Still should have 4 blocks there are no commitments yet - wait_for_prover_l1_height( - &prover_node_test_client, - 4, - Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), - ) - .await; - wait_for_l1_block(&da_service, 5, None).await; - seq_test_client.send_publish_batch_request().await; // wait here until we see from prover's rpc that it finished proving wait_for_prover_l1_height( &prover_node_test_client, - 5, + 4, Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), ) .await; - wait_for_l2_block(&seq_test_client, 8, None).await; - assert_eq!(seq_test_client.eth_block_number().await, 8); + // Should now have 8 blocks = 2 commitments of blocks 1-4 and 5-9 // there is an extra soft confirmation due to the prover publishing a proof. This causes // a new MockDa block, which in turn causes the sequencer to publish an extra soft confirmation // becase it must not skip blocks. wait_for_l2_block(&prover_node_test_client, 8, None).await; + // Allow for the L2 block to be commited and stored + // Otherwise, the L2 block height might be registered but it hasn't + // been processed inside the EVM yet. + sleep(Duration::from_secs(1)).await; assert_eq!(prover_node_test_client.eth_block_number().await, 8); // on the 8th DA block, we should have a proof - let mut blobs = da_service.get_block_at(4).await.unwrap().blobs; + let mut blobs = da_service.get_block_at(3).await.unwrap().blobs; assert_eq!(blobs.len(), 1); @@ -1365,7 +1369,7 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { #[tokio::test(flavor = "multi_thread")] async fn test_reopen_prover() -> Result<(), anyhow::Error> { - // citrea::initialize_logging(tracing::Level::INFO); + // citrea::initialize_logging(tracing::Level::DEBUG); let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover"]); let da_db_dir = storage_dir.path().join("DA").to_path_buf(); @@ -1510,6 +1514,10 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> { wait_for_l2_block(&seq_test_client, 6, None).await; // Still should have 4 blocks there are no commitments yet wait_for_l2_block(&prover_node_test_client, 6, None).await; + // Allow for the L2 block to be commited and stored + // Otherwise, the L2 block height might be registered but it hasn't + // been processed inside the EVM yet. + sleep(Duration::from_secs(1)).await; assert_eq!(prover_node_test_client.eth_block_number().await, 6); thread_kill_sender.send("kill").unwrap(); @@ -1915,7 +1923,7 @@ fn find_subarray(haystack: &[u8], needle: &[u8]) -> Option { #[tokio::test(flavor = "multi_thread")] async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> { - // citrea::initialize_logging(tracing::Level::INFO); + citrea::initialize_logging(tracing::Level::DEBUG); let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); let da_db_dir = storage_dir.path().join("DA").to_path_buf(); @@ -1997,20 +2005,18 @@ async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> { wait_for_l2_block(&seq_test_client, 4, None).await; // second da block - da_service.publish_test_block().await.unwrap(); wait_for_l1_block(&da_service, 2, None).await; - wait_for_l1_block(&da_service, 3, None).await; - // before this the commitment will be sent - // the commitment will be only in the first block so it is still not finalized - // so the full node won't see the commitment + // Push a new L2 block into the new L1 block(2) to prevent + // sequencer from falling behind and creating automatic empty block. + // This makes the process a bit more deterministic on the test's end. seq_test_client.send_publish_batch_request().await; - - // wait for sync - wait_for_l2_block(&full_node_test_client, 6, None).await; - - // should be synced - assert_eq!(full_node_test_client.eth_block_number().await, 6); + wait_for_l2_block(&full_node_test_client, 5, None).await; + // Allow for the L2 block to be commited and stored + // Otherwise, the L2 block height might be registered but it hasn't + // been processed inside the EVM yet. + sleep(Duration::from_secs(1)).await; + assert_eq!(full_node_test_client.eth_block_number().await, 5); // assume sequencer craashed seq_task.abort(); @@ -2053,21 +2059,14 @@ async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> { let seq_test_client = make_test_client(seq_port).await; - wait_for_l2_block(&seq_test_client, 6, None).await; - - assert_eq!(seq_test_client.eth_block_number().await as u64, 6); + assert_eq!(seq_test_client.eth_block_number().await as u64, 5); seq_test_client.send_publish_batch_request().await; seq_test_client.send_publish_batch_request().await; seq_test_client.send_publish_batch_request().await; - wait_for_l2_block(&seq_test_client, 9, None).await; - - da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 4, None).await; - wait_for_l1_block(&da_service, 5, None).await; + wait_for_l2_block(&seq_test_client, 8, None).await; - // new commitment will be sent here, it should send between 2 and 3 should not include 1 - seq_test_client.send_publish_batch_request().await; + wait_for_l1_block(&da_service, 3, None).await; wait_for_postgres_commitment( &db_test_client, diff --git a/bin/citrea/tests/evm/gas_price.rs b/bin/citrea/tests/evm/gas_price.rs index f5def33a5..3a9e87e5e 100644 --- a/bin/citrea/tests/evm/gas_price.rs +++ b/bin/citrea/tests/evm/gas_price.rs @@ -17,7 +17,7 @@ use crate::{ #[tokio::test(flavor = "multi_thread")] async fn test_gas_price_increase() -> Result<(), anyhow::Error> { - // citrea::initialize_logging(tracing::Level::INFO); + // citrea::initialize_logging(tracing::Level::DEBUG); let storage_dir = tempdir_with_children(&["DA", "sequencer"]); let da_db_dir = storage_dir.path().join("DA").to_path_buf(); diff --git a/crates/fullnode/src/runner.rs b/crates/fullnode/src/runner.rs index 16a81c8dc..3afea0fd8 100644 --- a/crates/fullnode/src/runner.rs +++ b/crates/fullnode/src/runner.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::marker::PhantomData; use std::net::SocketAddr; use std::sync::Arc; @@ -325,7 +326,7 @@ where let end_l2_height = sequencer_commitment.l2_end_block_number; tracing::info!( - "Processing sequencer commitment. L2 Range = {:?} - {:?}.", + "Processing sequencer commitment. L2 Range = {}-{}.", start_l2_height, end_l2_height, ); @@ -484,19 +485,22 @@ where ); tokio::pin!(l2_sync_worker); - // Keep a list of commitments and proofs which have been attempted to process but failed, - // due to the sync lagging behind on L2 blocks. - // On every L1 block, we try to process the previously registered commitments and proofs - // having had the time to sync L2 blocks. - let mut pending_sequencer_commitments = Vec::::new(); - let mut pending_zk_proofs = Vec::::new(); + let mut pending_l1_blocks: VecDeque<::FilteredBlock> = + VecDeque::::new(); + let pending_l1 = &mut pending_l1_blocks; + + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.tick().await; loop { select! { _ = &mut l1_sync_worker => {}, _ = &mut l2_sync_worker => {}, Some(l1_block) = l1_rx.recv() => { - self.process_l1_block(&mut pending_sequencer_commitments,&mut pending_zk_proofs, l1_block).await; + pending_l1.push_back(l1_block); + }, + _ = interval.tick() => { + self.process_l1_block(pending_l1).await }, Some(l2_blocks) = l2_rx.recv() => { for (l2_height, l2_block) in l2_blocks { @@ -512,15 +516,65 @@ where pub async fn process_l1_block( &self, - pending_sequencer_commitments: &mut Vec, - pending_zk_proofs: &mut Vec, - l1_block: Da::FilteredBlock, + pending_l1_blocks: &mut VecDeque<::FilteredBlock>, ) { - // Set the l1 height of the l1 hash - self.ledger_db - .set_l1_height_of_l1_hash(l1_block.header().hash().into(), l1_block.header().height()) - .unwrap(); + while !pending_l1_blocks.is_empty() { + let l1_block = pending_l1_blocks + .front() + .expect("Pending l1 blocks cannot be empty"); + // Set the l1 height of the l1 hash + self.ledger_db + .set_l1_height_of_l1_hash( + l1_block.header().hash().into(), + l1_block.header().height(), + ) + .unwrap(); + + let (sequencer_commitments, zk_proofs) = + self.extract_relevant_l1_data(l1_block.clone()); + + for zk_proof in zk_proofs.clone().iter() { + if let Err(e) = self + .process_zk_proof(l1_block.clone(), zk_proof.clone()) + .await + { + match e { + SyncError::MissingL2(msg, start_l2_height, end_l2_height) => { + warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg); + return; + } + SyncError::Error(e) => { + error!("Could not process ZK proofs: {}...skipping", e); + } + } + } + } + + for sequencer_commitment in sequencer_commitments.clone().iter() { + if let Err(e) = self + .process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone()) + .await + { + match e { + SyncError::MissingL2(msg, start_l2_height, end_l2_height) => { + warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg); + return; + } + SyncError::Error(e) => { + error!("Could not process sequencer commitments: {}... skipping", e); + } + } + } + } + + pending_l1_blocks.pop_front(); + } + } + fn extract_relevant_l1_data( + &self, + l1_block: Da::FilteredBlock, + ) -> (Vec, Vec) { let mut sequencer_commitments = Vec::::new(); let mut zk_proofs = Vec::::new(); @@ -558,51 +612,7 @@ where // TODO: This is where force transactions will land - try to parse DA data force transaction } }); - - pending_zk_proofs.extend(zk_proofs); - pending_sequencer_commitments.extend(sequencer_commitments); - - for (index, zk_proof) in pending_zk_proofs.clone().iter().enumerate() { - match self - .process_zk_proof(l1_block.clone(), zk_proof.clone()) - .await - { - Ok(()) => { - pending_zk_proofs.remove(index); - } - Err(e) => match e { - SyncError::MissingL2(msg, start_l2_height, end_l2_height) => { - warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg); - } - SyncError::Error(e) => { - error!("Could not process ZK proofs: {}...skipping", e); - pending_zk_proofs.remove(index); - } - }, - } - } - - for (index, sequencer_commitment) in - pending_sequencer_commitments.clone().iter().enumerate() - { - match self - .process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone()) - .await - { - Ok(()) => { - pending_sequencer_commitments.remove(index); - } - Err(e) => match e { - SyncError::MissingL2(msg, start_l2_height, end_l2_height) => { - warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg); - } - SyncError::Error(e) => { - error!("Could not process sequencer commitments: {}... skipping", e); - pending_sequencer_commitments.remove(index); - } - }, - } - } + (sequencer_commitments, zk_proofs) } } diff --git a/crates/sequencer/src/commitment_controller.rs b/crates/sequencer/src/commitment_controller.rs index 18e79f39f..5281790c4 100644 --- a/crates/sequencer/src/commitment_controller.rs +++ b/crates/sequencer/src/commitment_controller.rs @@ -57,8 +57,6 @@ pub fn get_commitment_info( debug!("State diff threshold reached. Committing..."); } - debug!("L2 range to submit: {}..{}", l2_start, l2_end); - Ok(Some(CommitmentInfo { l2_height_range: BatchNumber(l2_start)..=BatchNumber(l2_end), })) diff --git a/crates/sequencer/src/sequencer.rs b/crates/sequencer/src/sequencer.rs index 5301ff60a..03638b121 100644 --- a/crates/sequencer/src/sequencer.rs +++ b/crates/sequencer/src/sequencer.rs @@ -514,7 +514,6 @@ where self.storage_manager .save_change_set_l2(l2_height, slot_result.change_set)?; - tracing::debug!("Finalizing l2 height: {:?}", l2_height); self.storage_manager.finalize_l2(l2_height)?; // connect L1 and L2 height @@ -594,7 +593,7 @@ where &mut self, state_diff_threshold_reached: bool, ) -> anyhow::Result<()> { - debug!("Sequencer: new L1 block, checking if commitment should be submitted"); + debug!("Sequencer: Checking if commitment should be submitted"); let inscription_queue = self.da_service.get_send_transaction_queue(); let min_soft_confirmations_per_commitment = self.config.min_soft_confirmations_per_commitment; @@ -623,7 +622,10 @@ where soft_confirmation_hashes, )?; - debug!("Sequencer: submitting commitment: {:?}", commitment); + debug!( + "Sequencer: submitting commitment for L2 range {}-{}", + commitment.l2_start_block_number, commitment.l2_end_block_number + ); let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone())) .map_err(|e| anyhow!(e))?; @@ -645,8 +647,6 @@ where anyhow!("Sequencer: Failed to set last sequencer commitment L2 height") })?; - debug!("Commitment info: {:?}", commitment_info); - let l2_start = l2_range_to_submit.start().0 as u32; let l2_end = l2_range_to_submit.end().0 as u32; if let Some(db_config) = self.config.db_config.clone() {