From cc91eb17249ef6dd02147686c2bcf36cde1f1f7a Mon Sep 17 00:00:00 2001 From: yaziciahmet Date: Mon, 15 Jul 2024 14:24:10 +0200 Subject: [PATCH] Separate submit_commitment into its own fn --- crates/sequencer/src/sequencer.rs | 312 +++++++++++------------------- 1 file changed, 116 insertions(+), 196 deletions(-) diff --git a/crates/sequencer/src/sequencer.rs b/crates/sequencer/src/sequencer.rs index 1f69d763b..47b96a112 100644 --- a/crates/sequencer/src/sequencer.rs +++ b/crates/sequencer/src/sequencer.rs @@ -609,112 +609,7 @@ where state_diff_threshold_reached, )?; if let Some(commitment_info) = commitment_info { - let l2_range_to_submit = commitment_info.l2_height_range.clone(); - - // calculate exclusive range end - let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly - - let soft_confirmation_hashes = self - .ledger_db - .get_soft_batch_range(&(*l2_range_to_submit.start()..range_end))? - .iter() - .map(|sb| sb.hash) - .collect::>(); - - let commitment = commitment_controller::get_commitment( - commitment_info.clone(), - soft_confirmation_hashes, - )?; - - debug!( - "Sequencer: submitting commitment for L2 range {}-{}", - commitment.l2_start_block_number, commitment.l2_end_block_number - ); - - let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone())) - .map_err(|e| anyhow!(e))?; - let (notify, rx) = oneshot_channel(); - let request = BlobWithNotifier { blob, notify }; - self.da_service - .get_send_transaction_queue() - .send(request) - .map_err(|_| anyhow!("Bitcoin service already stopped!"))?; - - info!("Sent commitment to DA queue. L2 range: #{}-{}", commitment.l2_start_block_number, commitment.l2_end_block_number); - - // Add commitment to pending commitments - let mut pending_commitments = self.pending_commitments_l2_range.lock().await; - pending_commitments.push((*l2_range_to_submit.start(), *l2_range_to_submit.end())); - self.ledger_db - .set_pending_commitments_l2_range(&pending_commitments)?; - - self.ledger_db.set_state_diff(vec![])?; - self.last_state_diff = vec![]; - - let ledger_db = self.ledger_db.clone(); - let db_config = self.config.db_config.clone(); - let pending_commitments_l2_range = self.pending_commitments_l2_range.clone(); - // Handle DA response asynchronously - tokio::spawn(async move { - let result: anyhow::Result<()> = async move { - let tx_id = rx - .await - .map_err(|_| anyhow!("DA service is dead!"))? - .map_err(|_| anyhow!("Send transaction cannot fail"))?; - - ledger_db - .set_last_sequencer_commitment_l2_height(BatchNumber( - commitment_info.l2_height_range.end().0, - )) - .map_err(|_| { - anyhow!("Sequencer: Failed to set last sequencer commitment L2 height") - })?; - - debug!("Commitment info: {:?}", commitment_info); - - let l2_start = l2_range_to_submit.start().0 as u32; - let l2_end = l2_range_to_submit.end().0 as u32; - if let Some(db_config) = db_config { - match PostgresConnector::new(db_config).await { - Ok(pg_connector) => { - pg_connector - .insert_sequencer_commitment( - Into::<[u8; 32]>::into(tx_id).to_vec(), - l2_start, - l2_end, - commitment.merkle_root.to_vec(), - CommitmentStatus::Mempool, - ) - .await - .map_err(|_| { - anyhow!("Sequencer: Failed to insert sequencer commitment") - })?; - } - Err(e) => { - warn!("Failed to connect to postgres: {:?}", e); - } - } - } - - // Remove commitment from pending commitments - let mut pending_commitments = pending_commitments_l2_range.lock().await; - pending_commitments.retain(|&(start, end)| { - start != *l2_range_to_submit.start() || end != *l2_range_to_submit.end() - }); - ledger_db.set_pending_commitments_l2_range(&pending_commitments)?; - - info!("New commitment. L2 range: #{}-{}", l2_start, l2_end); - Ok(()) - } - .await; - - if let Err(err) = result { - error!( - "Error in spawned task for handling commitment result: {}", - err - ); - } - }); + self.submit_commitment(commitment_info, false).await?; } Ok(()) } @@ -726,101 +621,126 @@ where let commitment_info = commitment_controller::CommitmentInfo { l2_height_range: l2_start..=l2_end, }; - let l2_range_to_submit = commitment_info.l2_height_range.clone(); - - // calculate exclusive range end - let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly - - let soft_confirmation_hashes = self - .ledger_db - .get_soft_batch_range(&(*l2_range_to_submit.start()..range_end))? - .iter() - .map(|sb| sb.hash) - .collect::>(); - - let commitment = commitment_controller::get_commitment( - commitment_info.clone(), - soft_confirmation_hashes, - )?; - - debug!("Sequencer: submitting commitment: {:?}", commitment); - - let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone())) - .map_err(|e| anyhow!(e))?; - let (notify, rx) = oneshot_channel(); - let request = BlobWithNotifier { blob, notify }; - self.da_service - .get_send_transaction_queue() - .send(request) - .map_err(|_| anyhow!("Bitcoin service already stopped!"))?; - - info!("Sent commitment to DA queue. L2 range: #{}-{}", commitment.l2_start_block_number, commitment.l2_end_block_number); - - let ledger_db = self.ledger_db.clone(); - let db_config = self.config.db_config.clone(); - let pending_commitments_l2_range = self.pending_commitments_l2_range.clone(); - // Handle DA response asynchronously - tokio::spawn(async move { - let result: anyhow::Result<()> = async move { - let tx_id = rx - .await - .map_err(|_| anyhow!("DA service is dead!"))? - .map_err(|_| anyhow!("Send transaction cannot fail"))?; - - ledger_db - .set_last_sequencer_commitment_l2_height(BatchNumber( - commitment_info.l2_height_range.end().0, - )) - .map_err(|_| { - anyhow!("Sequencer: Failed to set last sequencer commitment L2 height") - })?; - - debug!("Commitment info: {:?}", commitment_info); - - let l2_start = l2_range_to_submit.start().0 as u32; - let l2_end = l2_range_to_submit.end().0 as u32; - if let Some(db_config) = db_config { - match PostgresConnector::new(db_config).await { - Ok(pg_connector) => { - pg_connector - .insert_sequencer_commitment( - Into::<[u8; 32]>::into(tx_id).to_vec(), - l2_start, - l2_end, - commitment.merkle_root.to_vec(), - CommitmentStatus::Mempool, - ) - .await - .map_err(|_| { - anyhow!("Sequencer: Failed to insert sequencer commitment") - })?; - } - Err(e) => { - warn!("Failed to connect to postgres: {:?}", e); - } + self.submit_commitment(commitment_info, true).await?; + } + + Ok(()) + } + + async fn submit_commitment( + &mut self, + commitment_info: commitment_controller::CommitmentInfo, + resubmit: bool, + ) -> anyhow::Result<()> { + let l2_range_to_submit = commitment_info.l2_height_range.clone(); + + // calculate exclusive range end + let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly + + let soft_confirmation_hashes = self + .ledger_db + .get_soft_batch_range(&(*l2_range_to_submit.start()..range_end))? + .iter() + .map(|sb| sb.hash) + .collect::>(); + + let commitment = commitment_controller::get_commitment( + commitment_info.clone(), + soft_confirmation_hashes, + )?; + + debug!("Sequencer: submitting commitment: {:?}", commitment); + + let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone())) + .map_err(|e| anyhow!(e))?; + let (notify, rx) = oneshot_channel(); + let request = BlobWithNotifier { blob, notify }; + self.da_service + .get_send_transaction_queue() + .send(request) + .map_err(|_| anyhow!("Bitcoin service already stopped!"))?; + + info!( + "Sent commitment to DA queue. L2 range: #{}-{}", + commitment.l2_start_block_number, commitment.l2_end_block_number + ); + + if !resubmit { + // Add commitment to pending commitments + let mut pending_commitments = self.pending_commitments_l2_range.lock().await; + pending_commitments.push((*l2_range_to_submit.start(), *l2_range_to_submit.end())); + self.ledger_db + .set_pending_commitments_l2_range(&pending_commitments)?; + + // Clear state diff + self.ledger_db.set_state_diff(vec![])?; + self.last_state_diff = vec![]; + } + + let ledger_db = self.ledger_db.clone(); + let db_config = self.config.db_config.clone(); + let pending_commitments_l2_range = self.pending_commitments_l2_range.clone(); + // Handle DA response asynchronously + tokio::spawn(async move { + let result: anyhow::Result<()> = async move { + let tx_id = rx + .await + .map_err(|_| anyhow!("DA service is dead!"))? + .map_err(|_| anyhow!("Send transaction cannot fail"))?; + + ledger_db + .set_last_sequencer_commitment_l2_height(BatchNumber( + commitment_info.l2_height_range.end().0, + )) + .map_err(|_| { + anyhow!("Sequencer: Failed to set last sequencer commitment L2 height") + })?; + + debug!("Commitment info: {:?}", commitment_info); + + let l2_start = l2_range_to_submit.start().0 as u32; + let l2_end = l2_range_to_submit.end().0 as u32; + if let Some(db_config) = db_config { + match PostgresConnector::new(db_config).await { + Ok(pg_connector) => { + pg_connector + .insert_sequencer_commitment( + Into::<[u8; 32]>::into(tx_id).to_vec(), + l2_start, + l2_end, + commitment.merkle_root.to_vec(), + CommitmentStatus::Mempool, + ) + .await + .map_err(|_| { + anyhow!("Sequencer: Failed to insert sequencer commitment") + })?; + } + Err(e) => { + warn!("Failed to connect to postgres: {:?}", e); } } + } - // Remove commitment from pending commitments - let mut pending_commitments = pending_commitments_l2_range.lock().await; - pending_commitments.retain(|&(start, end)| { - start != *l2_range_to_submit.start() || end != *l2_range_to_submit.end() - }); - ledger_db.set_pending_commitments_l2_range(&pending_commitments)?; + // Remove commitment from pending commitments + let mut pending_commitments = pending_commitments_l2_range.lock().await; + pending_commitments.retain(|&(start, end)| { + start != *l2_range_to_submit.start() || end != *l2_range_to_submit.end() + }); + ledger_db.set_pending_commitments_l2_range(&pending_commitments)?; - info!("New commitment. L2 range: #{}-{}", l2_start, l2_end); - Ok(()) - } - .await; + info!("New commitment. L2 range: #{}-{}", l2_start, l2_end); + Ok(()) + } + .await; - if let Err(err) = result { - error!( - "Error in spawned task for handling commitment result: {}", - err - ); - } - }); - } + if let Err(err) = result { + error!( + "Error in spawned task for handling commitment result: {}", + err + ); + } + }); Ok(()) }