Skip to content

Commit

Permalink
Invoke commitment after each L2 block
Browse files Browse the repository at this point in the history
This would let commitment controller decide whether to go through with
the commitment or not.
  • Loading branch information
rakanalh committed Jul 8, 2024
1 parent d283c42 commit 478023c
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions crates/sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,7 @@ where
l2_block_mode: L2BlockMode,
pg_pool: &Option<PostgresConnector>,
last_used_l1_height: u64,
da_commitment_tx: UnboundedSender<bool>,
) -> anyhow::Result<u64> {
) -> anyhow::Result<(u64, bool)> {
let da_height = da_block.header().height();
let (l2_height, l1_height) = match self
.ledger_db
Expand Down Expand Up @@ -478,7 +477,7 @@ where

tracing::debug!("Finalizing l2 height: {:?}", l2_height);
self.storage_manager.finalize_l2(l2_height)?;
return Ok(last_used_l1_height);
return Ok((last_used_l1_height, false));
}

trace!(
Expand Down Expand Up @@ -553,12 +552,9 @@ where
);
// Serialize the state diff to check size later.
let serialized_state_diff = bincode::serialize(&merged_state_diff)?;
if serialized_state_diff.len() as u64 > MAX_STATEDIFF_SIZE_COMMITMENT_THRESHOLD {
// If we exceed the threshold, we should notify the commitment
// worker to initiate a commitment.
if da_commitment_tx.unbounded_send(true).is_err() {
error!("Commitment thread is dead!");
}
let state_diff_threshold_reached =
serialized_state_diff.len() as u64 > MAX_STATEDIFF_SIZE_COMMITMENT_THRESHOLD;
if state_diff_threshold_reached {
self.last_state_diff.clone_from(&slot_result.state_diff);
self.ledger_db
.set_state_diff(self.last_state_diff.clone())?;
Expand All @@ -582,7 +578,7 @@ where
});
}

Ok(da_block.header().height())
Ok((da_block.header().height(), state_diff_threshold_reached))
}
(Err(err), batch_workspace) => {
warn!(
Expand Down Expand Up @@ -800,10 +796,6 @@ where
missed_da_blocks_count = skipped_blocks;
}
}

if let Err(e) = self.maybe_submit_commitment(da_commitment_tx.clone()).await {
error!("Sequencer error: {}", e);
}
}
},
force = da_commitment_rx.select_next_some() => {
Expand All @@ -826,7 +818,7 @@ where
.map_err(|e| anyhow!(e))?;

debug!("Created an empty L2 for L1={}", needed_da_block_height);
if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height, da_commitment_tx.clone()).await {
if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height).await {
error!("Sequencer error: {}", e);
}
}
Expand All @@ -842,9 +834,13 @@ where
}
};
let l1_fee_rate = l1_fee_rate.clamp(*l1_fee_rate_range.start(), *l1_fee_rate_range.end());
match self.produce_l2_block(last_finalized_block.clone(), l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height, da_commitment_tx.clone()).await {
Ok(l1_block_number) => {
match self.produce_l2_block(last_finalized_block.clone(), l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height).await {
Ok((l1_block_number, state_diff_threshold_reached)) => {
last_used_l1_height = l1_block_number;

if da_commitment_tx.unbounded_send(state_diff_threshold_reached).is_err() {
error!("Commitment thread is dead!");
}
},
Err(e) => {
error!("Sequencer error: {}", e);
Expand All @@ -870,7 +866,7 @@ where
.map_err(|e| anyhow!(e))?;

debug!("Created an empty L2 for L1={}", needed_da_block_height);
if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height, da_commitment_tx.clone()).await {
if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height).await {
error!("Sequencer error: {}", e);
}
}
Expand All @@ -888,20 +884,24 @@ where
let l1_fee_rate = l1_fee_rate.clamp(*l1_fee_rate_range.start(), *l1_fee_rate_range.end());

let instant = Instant::now();
match self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height, da_commitment_tx.clone()).await {
Ok(l1_block_number) => {
match self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height).await {
Ok((l1_block_number, state_diff_threshold_reached)) => {
// Set the next iteration's wait time to produce a block based on the
// previous block's execution time.
// This is mainly to make sure we account for the execution time to
// achieve consistent 2-second block production.
parent_block_exec_time = instant.elapsed();

last_used_l1_height = l1_block_number;

if da_commitment_tx.unbounded_send(state_diff_threshold_reached).is_err() {
error!("Commitment thread is dead!");
}
},
Err(e) => {
error!("Sequencer error: {}", e);
}
}
};
}
}
}
Expand Down Expand Up @@ -1063,16 +1063,6 @@ where
Ok(())
}

async fn maybe_submit_commitment(
&self,
da_commitment_tx: UnboundedSender<bool>,
) -> anyhow::Result<()> {
if da_commitment_tx.unbounded_send(false).is_err() {
error!("Commitment thread is dead!");
}
Ok(())
}

fn get_account_updates(&self) -> Result<Vec<ChangedAccount>, anyhow::Error> {
let head = self
.db_provider
Expand Down

0 comments on commit 478023c

Please sign in to comment.