Skip to content

Commit

Permalink
Push L1 and process every 1 sec
Browse files Browse the repository at this point in the history
  • Loading branch information
rakanalh committed Jul 13, 2024
1 parent 58aed5f commit 3bd26d9
Showing 1 changed file with 70 additions and 60 deletions.
130 changes: 70 additions & 60 deletions crates/fullnode/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -325,7 +326,7 @@ where
let end_l2_height = sequencer_commitment.l2_end_block_number;

tracing::info!(
"Processing sequencer commitment. L2 Range = {:?} - {:?}.",
"Processing sequencer commitment. L2 Range = {}-{}.",
start_l2_height,
end_l2_height,
);
Expand Down Expand Up @@ -484,19 +485,22 @@ where
);
tokio::pin!(l2_sync_worker);

// Keep a list of commitments and proofs which have been attempted to process but failed,
// due to the sync lagging behind on L2 blocks.
// On every L1 block, we try to process the previously registered commitments and proofs
// having had the time to sync L2 blocks.
let mut pending_sequencer_commitments = Vec::<SequencerCommitment>::new();
let mut pending_zk_proofs = Vec::<Proof>::new();
let mut pending_l1_blocks: VecDeque<<Da as DaService>::FilteredBlock> =
VecDeque::<Da::FilteredBlock>::new();
let pending_l1 = &mut pending_l1_blocks;

let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await;

loop {
select! {
_ = &mut l1_sync_worker => {},
_ = &mut l2_sync_worker => {},
Some(l1_block) = l1_rx.recv() => {
self.process_l1_block(&mut pending_sequencer_commitments,&mut pending_zk_proofs, l1_block).await;
pending_l1.push_back(l1_block);
},
_ = interval.tick() => {
self.process_l1_block(pending_l1).await
},
Some(l2_blocks) = l2_rx.recv() => {
for (l2_height, l2_block) in l2_blocks {
Expand All @@ -512,15 +516,65 @@ where

pub async fn process_l1_block(
&self,
pending_sequencer_commitments: &mut Vec<SequencerCommitment>,
pending_zk_proofs: &mut Vec<Proof>,
l1_block: Da::FilteredBlock,
pending_l1_blocks: &mut VecDeque<<Da as DaService>::FilteredBlock>,
) {
// Set the l1 height of the l1 hash
self.ledger_db
.set_l1_height_of_l1_hash(l1_block.header().hash().into(), l1_block.header().height())
.unwrap();
while !pending_l1_blocks.is_empty() {
let l1_block = pending_l1_blocks
.front()
.expect("Pending l1 blocks cannot be empty");
// Set the l1 height of the l1 hash
self.ledger_db
.set_l1_height_of_l1_hash(
l1_block.header().hash().into(),
l1_block.header().height(),
)
.unwrap();

let (sequencer_commitments, zk_proofs) =
self.extract_relevant_l1_data(l1_block.clone());

for zk_proof in zk_proofs.clone().iter() {
if let Err(e) = self
.process_zk_proof(l1_block.clone(), zk_proof.clone())
.await
{
match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg);
return;
}
SyncError::Error(e) => {
error!("Could not process ZK proofs: {}...skipping", e);
}
}
}
}

for sequencer_commitment in sequencer_commitments.clone().iter() {
if let Err(e) = self
.process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone())
.await
{
match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg);
return;
}
SyncError::Error(e) => {
error!("Could not process sequencer commitments: {}... skipping", e);
}
}
}
}

pending_l1_blocks.pop_front();
}
}

fn extract_relevant_l1_data(
&self,
l1_block: Da::FilteredBlock,
) -> (Vec<SequencerCommitment>, Vec<Proof>) {
let mut sequencer_commitments = Vec::<SequencerCommitment>::new();
let mut zk_proofs = Vec::<Proof>::new();

Expand Down Expand Up @@ -558,51 +612,7 @@ where
// TODO: This is where force transactions will land - try to parse DA data force transaction
}
});

pending_zk_proofs.extend(zk_proofs);
pending_sequencer_commitments.extend(sequencer_commitments);

for (index, zk_proof) in pending_zk_proofs.clone().iter().enumerate() {
match self
.process_zk_proof(l1_block.clone(), zk_proof.clone())
.await
{
Ok(()) => {
pending_zk_proofs.remove(index);
}
Err(e) => match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process ZK proofs. Missing L2 blocks {:?} - {:?}. msg = {}", start_l2_height, end_l2_height, msg);
}
SyncError::Error(e) => {
error!("Could not process ZK proofs: {}...skipping", e);
pending_zk_proofs.remove(index);
}
},
}
}

for (index, sequencer_commitment) in
pending_sequencer_commitments.clone().iter().enumerate()
{
match self
.process_sequencer_commitment(l1_block.clone(), sequencer_commitment.clone())
.await
{
Ok(()) => {
pending_sequencer_commitments.remove(index);
}
Err(e) => match e {
SyncError::MissingL2(msg, start_l2_height, end_l2_height) => {
warn!("Could not completely process sequencer commitments. Missing L2 blocks {:?} - {:?}, msg = {}", start_l2_height, end_l2_height, msg);
}
SyncError::Error(e) => {
error!("Could not process sequencer commitments: {}... skipping", e);
pending_sequencer_commitments.remove(index);
}
},
}
}
(sequencer_commitments, zk_proofs)
}
}

Expand Down

0 comments on commit 3bd26d9

Please sign in to comment.