diff --git a/crates/sequencer/src/sequencer.rs b/crates/sequencer/src/sequencer.rs index 382306dd1..c15810171 100644 --- a/crates/sequencer/src/sequencer.rs +++ b/crates/sequencer/src/sequencer.rs @@ -7,6 +7,7 @@ use std::time::Duration; use std::vec; use anyhow::anyhow; +use borsh::BorshDeserialize; use citrea_evm::{CallMessage, Evm, RlpEvmTransaction, MIN_TRANSACTION_GAS}; use citrea_primitives::types::SoftConfirmationHash; use citrea_stf::runtime::Runtime; @@ -31,11 +32,11 @@ use sov_db::schema::types::{BatchNumber, SlotNumber}; use sov_modules_api::hooks::HookSoftConfirmationInfo; use sov_modules_api::transaction::Transaction; use sov_modules_api::{ - Context, EncodeCall, PrivateKey, SignedSoftConfirmationBatch, SlotData, StateDiff, - UnsignedSoftConfirmationBatch, WorkingSet, + BlobReaderTrait, Context, EncodeCall, PrivateKey, SignedSoftConfirmationBatch, SlotData, + StateDiff, UnsignedSoftConfirmationBatch, WorkingSet, }; use sov_modules_stf_blueprint::StfBlueprintTrait; -use sov_rollup_interface::da::{BlockHeaderTrait, DaData, DaSpec}; +use sov_rollup_interface::da::{BlockHeaderTrait, DaData, DaSpec, SequencerCommitment}; use sov_rollup_interface::services::da::{BlobWithNotifier, DaService}; use sov_rollup_interface::stf::{SoftBatchReceipt, StateTransitionFunction}; use sov_rollup_interface::storage::HierarchicalStorageManager; @@ -611,13 +612,25 @@ where #[instrument(level = "trace", skip(self), err, ret)] pub async fn resubmit_pending_commitments(&mut self) -> anyhow::Result<()> { - let pending_commitments = self.ledger_db.get_pending_commitments_l2_range()?; - debug!("Pending commitments: {:?}", pending_commitments); - for (l2_start, l2_end) in pending_commitments { - let commitment_info = commitment_controller::CommitmentInfo { - l2_height_range: l2_start..=l2_end, - }; - self.submit_commitment(commitment_info, true).await?; + let pending_db_commitments = self.ledger_db.get_pending_commitments_l2_range()?; + debug!("Pending db commitments: {:?}", pending_db_commitments); + let pending_da_commitments = self.get_pending_da_commitments().await; + debug!("Pending da commitments: {:?}", pending_db_commitments); + for (l2_start, l2_end) in pending_db_commitments { + if pending_da_commitments.iter().any(|commitment| { + commitment.l2_start_block_number == l2_start.0 + && commitment.l2_end_block_number == l2_end.0 + }) { + // Delete from pending db if it is already in DA mempool + self.ledger_db + .delete_pending_commitment_l2_range(&(l2_start, l2_end))?; + } else { + // Submit commitment + let commitment_info = commitment_controller::CommitmentInfo { + l2_height_range: l2_start..=l2_end, + }; + self.submit_commitment(commitment_info, true).await?; + } } Ok(()) @@ -730,6 +743,24 @@ where Ok(()) } + async fn get_pending_da_commitments(&self) -> Vec { + self.da_service + .get_relevant_blobs_of_pending_transactions() + .await + .into_iter() + .filter_map(|mut blob| match DaData::try_from_slice(blob.full_data()) { + Ok(da_data) => match da_data { + DaData::SequencerCommitment(commitment) => Some(commitment), + _ => None, + }, + Err(err) => { + warn!("Pending transaction blob failed to be parsed: {}", err); + None + } + }) + .collect() + } + #[instrument(level = "trace", skip(self), err, ret)] pub async fn run(&mut self) -> Result<(), anyhow::Error> { // Resubmit if there were pending commitments on restart