Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to fix flaky tests #887

Merged
merged 8 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions bin/citrea/tests/e2e/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ async fn test_soft_confirmations_status_one_l1() -> Result<(), anyhow::Error> {

#[tokio::test(flavor = "multi_thread")]
async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -1221,7 +1221,7 @@ async fn test_soft_confirmations_status_two_l1() -> Result<(), anyhow::Error> {

#[tokio::test(flavor = "multi_thread")]
async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -1292,12 +1292,10 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {

// start l1 height = 1, end = 2
seq_test_client.send_publish_batch_request().await;

// sequencer commitment should be sent
da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 2, None).await;
wait_for_l1_block(&da_service, 3, None).await;

// Submit an L2 block to prevent sequencer from falling behind.
seq_test_client.send_publish_batch_request().await;

// wait here until we see from prover's rpc that it finished proving
Expand All @@ -1308,43 +1306,49 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {
)
.await;

// Submit an L2 block to prevent sequencer from falling behind.
seq_test_client.send_publish_batch_request().await;

// prover should have synced all 6 l2 blocks
// ps there are 6 blocks because:
// when a new proof is submitted in mock da a new empty da block is published
// and for every empty da block sequencer publishes a new empty soft confirmation in order to not skip a block
wait_for_l2_block(&prover_node_test_client, 6, None).await;
sleep(Duration::from_secs(1)).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 6);

seq_test_client.send_publish_batch_request().await;
da_service.publish_test_block().await.unwrap();
// Trigger another commitment
for _ in 7..=8 {
seq_test_client.send_publish_batch_request().await;
}
wait_for_l2_block(&seq_test_client, 8, None).await;
// Allow for the L2 block to be commited and stored
// Otherwise, the L2 block height might be registered but it hasn't
// been processed inside the EVM yet.
sleep(Duration::from_secs(1)).await;
assert_eq!(seq_test_client.eth_block_number().await, 8);
wait_for_l1_block(&da_service, 4, None).await;
// Still should have 4 blocks there are no commitments yet
wait_for_prover_l1_height(
&prover_node_test_client,
4,
Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)),
)
.await;
wait_for_l1_block(&da_service, 5, None).await;
seq_test_client.send_publish_batch_request().await;

// wait here until we see from prover's rpc that it finished proving
wait_for_prover_l1_height(
&prover_node_test_client,
5,
4,
Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)),
)
.await;
wait_for_l2_block(&seq_test_client, 8, None).await;
assert_eq!(seq_test_client.eth_block_number().await, 8);

// Should now have 8 blocks = 2 commitments of blocks 1-4 and 5-9
// there is an extra soft confirmation due to the prover publishing a proof. This causes
// a new MockDa block, which in turn causes the sequencer to publish an extra soft confirmation
// becase it must not skip blocks.
wait_for_l2_block(&prover_node_test_client, 8, None).await;
// Allow for the L2 block to be commited and stored
// Otherwise, the L2 block height might be registered but it hasn't
// been processed inside the EVM yet.
sleep(Duration::from_secs(1)).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 8);
// on the 8th DA block, we should have a proof
let mut blobs = da_service.get_block_at(4).await.unwrap().blobs;
let mut blobs = da_service.get_block_at(3).await.unwrap().blobs;

assert_eq!(blobs.len(), 1);

Expand All @@ -1365,7 +1369,7 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {

#[tokio::test(flavor = "multi_thread")]
async fn test_reopen_prover() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -1510,6 +1514,10 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> {
wait_for_l2_block(&seq_test_client, 6, None).await;
// Still should have 4 blocks there are no commitments yet
wait_for_l2_block(&prover_node_test_client, 6, None).await;
// Allow for the L2 block to be commited and stored
// Otherwise, the L2 block height might be registered but it hasn't
// been processed inside the EVM yet.
sleep(Duration::from_secs(1)).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 6);

thread_kill_sender.send("kill").unwrap();
Expand Down Expand Up @@ -2666,7 +2674,7 @@ async fn full_node_verify_proof_and_store() {

#[tokio::test(flavor = "multi_thread")]
async fn test_all_flow() {
// citrea::initialize_logging(tracing::Level::DEBUG);
citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover", "full-node"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down Expand Up @@ -3285,7 +3293,7 @@ async fn test_full_node_sync_status() {

#[tokio::test(flavor = "multi_thread")]
async fn test_sequencer_commitment_threshold() {
// citrea::initialize_logging(tracing::Level::DEBUG);
citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down
2 changes: 1 addition & 1 deletion bin/citrea/tests/evm/gas_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

#[tokio::test(flavor = "multi_thread")]
async fn test_gas_price_increase() -> Result<(), anyhow::Error> {
// citrea::initialize_logging(tracing::Level::INFO);
citrea::initialize_logging(tracing::Level::DEBUG);

let storage_dir = tempdir_with_children(&["DA", "sequencer"]);
let da_db_dir = storage_dir.path().join("DA").to_path_buf();
Expand Down
130 changes: 70 additions & 60 deletions crates/fullnode/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -325,7 +326,7 @@ where
let end_l2_height = sequencer_commitment.l2_end_block_number;

tracing::info!(
"Processing sequencer commitment. L2 Range = {:?} - {:?}.",
"Processing sequencer commitment. L2 Range = {}-{}.",
start_l2_height,
end_l2_height,
);
Expand Down Expand Up @@ -484,19 +485,22 @@ where
);
tokio::pin!(l2_sync_worker);

// Keep a list of commitments and proofs which have been attempted to process but failed,
// due to the sync lagging behind on L2 blocks.
// On every L1 block, we try to process the previously registered commitments and proofs
// having had the time to sync L2 blocks.
let mut pending_sequencer_commitments = Vec::<SequencerCommitment>::new();
let mut pending_zk_proofs = Vec::<Proof>::new();
let mut pending_l1_blocks: VecDeque<<Da as DaService>::FilteredBlock> =
VecDeque::<Da::FilteredBlock>::new();
let pending_l1 = &mut pending_l1_blocks;

let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await;

loop {
select! {
_ = &mut l1_sync_worker => {},
_ = &mut l2_sync_worker => {},
Some(l1_block) = l1_rx.recv() => {
self.process_l1_block(&mut pending_sequencer_commitments,&mut pending_zk_proofs, l1_block).await;
pending_l1.push_back(l1_block);
},
_ = interval.tick() => {
self.process_l1_block(pending_l1).await
},
Some(l2_blocks) = l2_rx.recv() => {
for (l2_height, l2_block) in l2_blocks {
Expand All @@ -512,15 +516,65 @@ where

pub async fn process_l1_block(
&self,
pending_sequencer_commitments: &mut Vec<SequencerCommitment>,
pending_zk_proofs: &mut Vec<Proof>,
l1_block: Da::FilteredBlock,
pending_l1_blocks: &mut VecDeque<<Da as DaService>::FilteredBlock>,
) {
// Set the l1 height of the l1 hash
self.ledger_db
.set_l1_height_of_l1_hash(l1_block.header().hash().into(), l1_block.header().height())
.unwrap();
while !pending_l1_blocks.is_empty() {
let l1_block = pending_l1_blocks
.front()
.expect("Pending l1 blocks cannot be empty");
// Set the l1 height of the l1 hash
self.ledger_db
.set_l1_height_of_l1_hash(
l1_block.header().hash().into(),
l1_block.header().height(),
)
.unwrap();

let (sequencer_commitments, zk_proofs) =
self.extract_relevant_l1_data(l1_block.clone());

for zk_proof in zk_proofs.clone().iter() {
if let Err(e) = self
.process_zk_proof(l1_block.clone(), zk_proof.clone())
.await
{
match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg);
return;
}
SyncError::Error(e) => {
error!("Could not process ZK proofs: {}...skipping", e);
}
}
}
}

for sequencer_commitment in sequencer_commitments.clone().iter() {
if let Err(e) = self
.process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone())
.await
{
match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg);
return;
}
SyncError::Error(e) => {
error!("Could not process sequencer commitments: {}... skipping", e);
}
}
}
}

pending_l1_blocks.pop_front();
}
}

fn extract_relevant_l1_data(
&self,
l1_block: Da::FilteredBlock,
) -> (Vec<SequencerCommitment>, Vec<Proof>) {
let mut sequencer_commitments = Vec::<SequencerCommitment>::new();
let mut zk_proofs = Vec::<Proof>::new();

Expand Down Expand Up @@ -558,51 +612,7 @@ where
// TODO: This is where force transactions will land - try to parse DA data force transaction
}
});

pending_zk_proofs.extend(zk_proofs);
pending_sequencer_commitments.extend(sequencer_commitments);

for (index, zk_proof) in pending_zk_proofs.clone().iter().enumerate() {
match self
.process_zk_proof(l1_block.clone(), zk_proof.clone())
.await
{
Ok(()) => {
pending_zk_proofs.remove(index);
}
Err(e) => match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg);
}
SyncError::Error(e) => {
error!("Could not process ZK proofs: {}...skipping", e);
pending_zk_proofs.remove(index);
}
},
}
}

for (index, sequencer_commitment) in
pending_sequencer_commitments.clone().iter().enumerate()
{
match self
.process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone())
.await
{
Ok(()) => {
pending_sequencer_commitments.remove(index);
}
Err(e) => match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg);
}
SyncError::Error(e) => {
error!("Could not process sequencer commitments: {}... skipping", e);
pending_sequencer_commitments.remove(index);
}
},
}
}
(sequencer_commitments, zk_proofs)
}
}

Expand Down
2 changes: 0 additions & 2 deletions crates/sequencer/src/commitment_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ pub fn get_commitment_info(
debug!("State diff threshold reached. Committing...");
}

debug!("L2 range to submit: {}..{}", l2_start, l2_end);

Ok(Some(CommitmentInfo {
l2_height_range: BatchNumber(l2_start)..=BatchNumber(l2_end),
}))
Expand Down
10 changes: 5 additions & 5 deletions crates/sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ where
self.storage_manager
.save_change_set_l2(l2_height, slot_result.change_set)?;

tracing::debug!("Finalizing l2 height: {:?}", l2_height);
self.storage_manager.finalize_l2(l2_height)?;

// connect L1 and L2 height
Expand Down Expand Up @@ -594,7 +593,7 @@ where
&mut self,
state_diff_threshold_reached: bool,
) -> anyhow::Result<()> {
debug!("Sequencer: new L1 block, checking if commitment should be submitted");
debug!("Sequencer: Checking if commitment should be submitted");
let inscription_queue = self.da_service.get_send_transaction_queue();
let min_soft_confirmations_per_commitment =
self.config.min_soft_confirmations_per_commitment;
Expand Down Expand Up @@ -623,7 +622,10 @@ where
soft_confirmation_hashes,
)?;

debug!("Sequencer: submitting commitment: {:?}", commitment);
debug!(
"Sequencer: submitting commitment for L2 range {}-{}",
commitment.l2_start_block_number, commitment.l2_end_block_number
);

let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone()))
.map_err(|e| anyhow!(e))?;
Expand All @@ -645,8 +647,6 @@ where
anyhow!("Sequencer: Failed to set last sequencer commitment L2 height")
})?;

debug!("Commitment info: {:?}", commitment_info);

let l2_start = l2_range_to_submit.start().0 as u32;
let l2_end = l2_range_to_submit.end().0 as u32;
if let Some(db_config) = self.config.db_config.clone() {
Expand Down
Loading