Skip to content

Commit

Permalink
Attempt to fix flaky tests (#887)
Browse files Browse the repository at this point in the history
* Turn on debug logs for failing tests

* Cleanup redundant logs

* Push L1 and process every 1 sec

* Turn on debugging for test_prover_sync_with_commitments

* Wait until EVM processes block

* Fix test sync prover with commitment

* Turn off logging for tests

* Make sequencer crash and replace more deterministic
  • Loading branch information
rakanalh authored Jul 13, 2024
1 parent f64d5bd commit f634146
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 111 deletions.
85 changes: 42 additions & 43 deletions bin/citrea/tests/e2e/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ async fn test_soft_confirmations_status_one_l1() -> Result<(), anyhow::Error> {

#[tokio::test(flavor = "multi_thread")]
async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
// citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -1221,7 +1221,7 @@ async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> {

#[tokio::test(flavor = "multi_thread")]
async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
// citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -1292,12 +1292,10 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {

// start l1 height = 1, end = 2
seq_test_client.send_publish_batch_request().await;

// sequencer commitment should be sent
da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 2, None).await;
wait_for_l1_block(&da_service, 3, None).await;

// Submit an L2 block to prevent sequencer from falling behind.
seq_test_client.send_publish_batch_request().await;

// wait here until we see from prover's rpc that it finished proving
Expand All @@ -1308,43 +1306,49 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {
)
.await;

// Submit an L2 block to prevent sequencer from falling behind.
seq_test_client.send_publish_batch_request().await;

// prover should have synced all 6 l2 blocks
// ps there are 6 blocks because:
// when a new proof is submitted in mock da a new empty da block is published
// and for every empty da block sequencer publishes a new empty soft confirmation in order to not skip a block
wait_for_l2_block(&prover_node_test_client, 6, None).await;
sleep(Duration::from_secs(1)).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 6);

seq_test_client.send_publish_batch_request().await;
da_service.publish_test_block().await.unwrap();
// Trigger another commitment
for _ in 7..=8 {
seq_test_client.send_publish_batch_request().await;
}
wait_for_l2_block(&seq_test_client, 8, None).await;
// Allow for the L2 block to be commited and stored
// Otherwise, the L2 block height might be registered but it hasn't
// been processed inside the EVM yet.
sleep(Duration::from_secs(1)).await;
assert_eq!(seq_test_client.eth_block_number().await, 8);
wait_for_l1_block(&da_service, 4, None).await;
// Still should have 4 blocks there are no commitments yet
wait_for_prover_l1_height(
&prover_node_test_client,
4,
Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)),
)
.await;
wait_for_l1_block(&da_service, 5, None).await;
seq_test_client.send_publish_batch_request().await;

// wait here until we see from prover's rpc that it finished proving
wait_for_prover_l1_height(
&prover_node_test_client,
5,
4,
Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)),
)
.await;
wait_for_l2_block(&seq_test_client, 8, None).await;
assert_eq!(seq_test_client.eth_block_number().await, 8);

// Should now have 8 blocks = 2 commitments of blocks 1-4 and 5-9
// there is an extra soft confirmation due to the prover publishing a proof. This causes
// a new MockDa block, which in turn causes the sequencer to publish an extra soft confirmation
// becase it must not skip blocks.
wait_for_l2_block(&prover_node_test_client, 8, None).await;
// Allow for the L2 block to be commited and stored
// Otherwise, the L2 block height might be registered but it hasn't
// been processed inside the EVM yet.
sleep(Duration::from_secs(1)).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 8);
// on the 8th DA block, we should have a proof
let mut blobs = da_service.get_block_at(4).await.unwrap().blobs;
let mut blobs = da_service.get_block_at(3).await.unwrap().blobs;

assert_eq!(blobs.len(), 1);

Expand All @@ -1365,7 +1369,7 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {

#[tokio::test(flavor = "multi_thread")]
async fn test_reopen_prover() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
// citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -1510,6 +1514,10 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> {
wait_for_l2_block(&seq_test_client, 6, None).await;
// Still should have 4 blocks there are no commitments yet
wait_for_l2_block(&prover_node_test_client, 6, None).await;
// Allow for the L2 block to be commited and stored
// Otherwise, the L2 block height might be registered but it hasn't
// been processed inside the EVM yet.
sleep(Duration::from_secs(1)).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 6);

thread_kill_sender.send("kill").unwrap();
Expand Down Expand Up @@ -1915,7 +1923,7 @@ fn find_subarray(haystack: &[u8], needle: &[u8]) -> Option<usize> {

#[tokio::test(flavor = "multi_thread")]
async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -1997,20 +2005,18 @@ async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> {
wait_for_l2_block(&seq_test_client, 4, None).await;

// second da block
da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 2, None).await;
wait_for_l1_block(&da_service, 3, None).await;

// before this the commitment will be sent
// the commitment will be only in the first block so it is still not finalized
// so the full node won't see the commitment
// Push a new L2 block into the new L1 block(2) to prevent
// sequencer from falling behind and creating automatic empty block.
// This makes the process a bit more deterministic on the test's end.
seq_test_client.send_publish_batch_request().await;

// wait for sync
wait_for_l2_block(&full_node_test_client, 6, None).await;

// should be synced
assert_eq!(full_node_test_client.eth_block_number().await, 6);
wait_for_l2_block(&full_node_test_client, 5, None).await;
// Allow for the L2 block to be commited and stored
// Otherwise, the L2 block height might be registered but it hasn't
// been processed inside the EVM yet.
sleep(Duration::from_secs(1)).await;
assert_eq!(full_node_test_client.eth_block_number().await, 5);

// assume sequencer craashed
seq_task.abort();
Expand Down Expand Up @@ -2053,21 +2059,14 @@ async fn sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> {

let seq_test_client = make_test_client(seq_port).await;

wait_for_l2_block(&seq_test_client, 6, None).await;

assert_eq!(seq_test_client.eth_block_number().await as u64, 6);
assert_eq!(seq_test_client.eth_block_number().await as u64, 5);

seq_test_client.send_publish_batch_request().await;
seq_test_client.send_publish_batch_request().await;
seq_test_client.send_publish_batch_request().await;
wait_for_l2_block(&seq_test_client, 9, None).await;

da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 4, None).await;
wait_for_l1_block(&da_service, 5, None).await;
wait_for_l2_block(&seq_test_client, 8, None).await;

// new commitment will be sent here, it should send between 2 and 3 should not include 1
seq_test_client.send_publish_batch_request().await;
wait_for_l1_block(&da_service, 3, None).await;

wait_for_postgres_commitment(
&db_test_client,
Expand Down
2 changes: 1 addition & 1 deletion bin/citrea/tests/evm/gas_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

#[tokio::test(flavor = "multi_thread")]
async fn test_gas_price_increase() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
// citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down
130 changes: 70 additions & 60 deletions crates/fullnode/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -325,7 +326,7 @@ where
let end_l2_height = sequencer_commitment.l2_end_block_number;

tracing::info!(
"Processing sequencer commitment. L2 Range = {:?} - {:?}.",
"Processing sequencer commitment. L2 Range = {}-{}.",
start_l2_height,
end_l2_height,
);
Expand Down Expand Up @@ -484,19 +485,22 @@ where
);
tokio::pin!(l2_sync_worker);

// Keep a list of commitments and proofs which have been attempted to process but failed,
// due to the sync lagging behind on L2 blocks.
// On every L1 block, we try to process the previously registered commitments and proofs
// having had the time to sync L2 blocks.
let mut pending_sequencer_commitments = Vec::<SequencerCommitment>::new();
let mut pending_zk_proofs = Vec::<Proof>::new();
let mut pending_l1_blocks: VecDeque<<Da as DaService>::FilteredBlock> =
VecDeque::<Da::FilteredBlock>::new();
let pending_l1 = &mut pending_l1_blocks;

let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await;

loop {
select! {
_ = &mut l1_sync_worker => {},
_ = &mut l2_sync_worker => {},
Some(l1_block) = l1_rx.recv() => {
self.process_l1_block(&mut pending_sequencer_commitments,&mut pending_zk_proofs, l1_block).await;
pending_l1.push_back(l1_block);
},
_ = interval.tick() => {
self.process_l1_block(pending_l1).await
},
Some(l2_blocks) = l2_rx.recv() => {
for (l2_height, l2_block) in l2_blocks {
Expand All @@ -512,15 +516,65 @@ where

pub async fn process_l1_block(
&self,
pending_sequencer_commitments: &mut Vec<SequencerCommitment>,
pending_zk_proofs: &mut Vec<Proof>,
l1_block: Da::FilteredBlock,
pending_l1_blocks: &mut VecDeque<<Da as DaService>::FilteredBlock>,
) {
// Set the l1 height of the l1 hash
self.ledger_db
.set_l1_height_of_l1_hash(l1_block.header().hash().into(), l1_block.header().height())
.unwrap();
while !pending_l1_blocks.is_empty() {
let l1_block = pending_l1_blocks
.front()
.expect("Pending l1 blocks cannot be empty");
// Set the l1 height of the l1 hash
self.ledger_db
.set_l1_height_of_l1_hash(
l1_block.header().hash().into(),
l1_block.header().height(),
)
.unwrap();

let (sequencer_commitments, zk_proofs) =
self.extract_relevant_l1_data(l1_block.clone());

for zk_proof in zk_proofs.clone().iter() {
if let Err(e) = self
.process_zk_proof(l1_block.clone(), zk_proof.clone())
.await
{
match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg);
return;
}
SyncError::Error(e) => {
error!("Could not process ZK proofs: {}...skipping", e);
}
}
}
}

for sequencer_commitment in sequencer_commitments.clone().iter() {
if let Err(e) = self
.process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone())
.await
{
match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg);
return;
}
SyncError::Error(e) => {
error!("Could not process sequencer commitments: {}... skipping", e);
}
}
}
}

pending_l1_blocks.pop_front();
}
}

fn extract_relevant_l1_data(
&self,
l1_block: Da::FilteredBlock,
) -> (Vec<SequencerCommitment>, Vec<Proof>) {
let mut sequencer_commitments = Vec::<SequencerCommitment>::new();
let mut zk_proofs = Vec::<Proof>::new();

Expand Down Expand Up @@ -558,51 +612,7 @@ where
// TODO: This is where force transactions will land - try to parse DA data force transaction
}
});

pending_zk_proofs.extend(zk_proofs);
pending_sequencer_commitments.extend(sequencer_commitments);

for (index, zk_proof) in pending_zk_proofs.clone().iter().enumerate() {
match self
.process_zk_proof(l1_block.clone(), zk_proof.clone())
.await
{
Ok(()) => {
pending_zk_proofs.remove(index);
}
Err(e) => match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg);
}
SyncError::Error(e) => {
error!("Could not process ZK proofs: {}...skipping", e);
pending_zk_proofs.remove(index);
}
},
}
}

for (index, sequencer_commitment) in
pending_sequencer_commitments.clone().iter().enumerate()
{
match self
.process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone())
.await
{
Ok(()) => {
pending_sequencer_commitments.remove(index);
}
Err(e) => match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg);
}
SyncError::Error(e) => {
error!("Could not process sequencer commitments: {}... skipping", e);
pending_sequencer_commitments.remove(index);
}
},
}
}
(sequencer_commitments, zk_proofs)
}
}

Expand Down
2 changes: 0 additions & 2 deletions crates/sequencer/src/commitment_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ pub fn get_commitment_info(
debug!("State diff threshold reached. Committing...");
}

debug!("L2 range to submit: {}..{}", l2_start, l2_end);

Ok(Some(CommitmentInfo {
l2_height_range: BatchNumber(l2_start)..=BatchNumber(l2_end),
}))
Expand Down
Loading

0 comments on commit f634146

Please sign in to comment.