Skip to content

Commit

Permalink
Separate submit_commitment into its own fn
Browse files Browse the repository at this point in the history
  • Loading branch information
yaziciahmet committed Jul 15, 2024
1 parent b953f99 commit cc91eb1
Showing 1 changed file with 116 additions and 196 deletions.
312 changes: 116 additions & 196 deletions crates/sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,112 +609,7 @@ where
state_diff_threshold_reached,
)?;
if let Some(commitment_info) = commitment_info {
let l2_range_to_submit = commitment_info.l2_height_range.clone();

// calculate exclusive range end
let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly

let soft_confirmation_hashes = self
.ledger_db
.get_soft_batch_range(&(*l2_range_to_submit.start()..range_end))?
.iter()
.map(|sb| sb.hash)
.collect::<Vec<[u8; 32]>>();

let commitment = commitment_controller::get_commitment(
commitment_info.clone(),
soft_confirmation_hashes,
)?;

debug!(
"Sequencer: submitting commitment for L2 range {}-{}",
commitment.l2_start_block_number, commitment.l2_end_block_number
);

let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone()))
.map_err(|e| anyhow!(e))?;
let (notify, rx) = oneshot_channel();
let request = BlobWithNotifier { blob, notify };
self.da_service
.get_send_transaction_queue()
.send(request)
.map_err(|_| anyhow!("Bitcoin service already stopped!"))?;

info!("Sent commitment to DA queue. L2 range: #{}-{}", commitment.l2_start_block_number, commitment.l2_end_block_number);

// Add commitment to pending commitments
let mut pending_commitments = self.pending_commitments_l2_range.lock().await;
pending_commitments.push((*l2_range_to_submit.start(), *l2_range_to_submit.end()));
self.ledger_db
.set_pending_commitments_l2_range(&pending_commitments)?;

self.ledger_db.set_state_diff(vec![])?;
self.last_state_diff = vec![];

let ledger_db = self.ledger_db.clone();
let db_config = self.config.db_config.clone();
let pending_commitments_l2_range = self.pending_commitments_l2_range.clone();
// Handle DA response asynchronously
tokio::spawn(async move {
let result: anyhow::Result<()> = async move {
let tx_id = rx
.await
.map_err(|_| anyhow!("DA service is dead!"))?
.map_err(|_| anyhow!("Send transaction cannot fail"))?;

ledger_db
.set_last_sequencer_commitment_l2_height(BatchNumber(
commitment_info.l2_height_range.end().0,
))
.map_err(|_| {
anyhow!("Sequencer: Failed to set last sequencer commitment L2 height")
})?;

debug!("Commitment info: {:?}", commitment_info);

let l2_start = l2_range_to_submit.start().0 as u32;
let l2_end = l2_range_to_submit.end().0 as u32;
if let Some(db_config) = db_config {
match PostgresConnector::new(db_config).await {
Ok(pg_connector) => {
pg_connector
.insert_sequencer_commitment(
Into::<[u8; 32]>::into(tx_id).to_vec(),
l2_start,
l2_end,
commitment.merkle_root.to_vec(),
CommitmentStatus::Mempool,
)
.await
.map_err(|_| {
anyhow!("Sequencer: Failed to insert sequencer commitment")
})?;
}
Err(e) => {
warn!("Failed to connect to postgres: {:?}", e);
}
}
}

// Remove commitment from pending commitments
let mut pending_commitments = pending_commitments_l2_range.lock().await;
pending_commitments.retain(|&(start, end)| {
start != *l2_range_to_submit.start() || end != *l2_range_to_submit.end()
});
ledger_db.set_pending_commitments_l2_range(&pending_commitments)?;

info!("New commitment. L2 range: #{}-{}", l2_start, l2_end);
Ok(())
}
.await;

if let Err(err) = result {
error!(
"Error in spawned task for handling commitment result: {}",
err
);
}
});
self.submit_commitment(commitment_info, false).await?;
}
Ok(())
}
Expand All @@ -726,101 +621,126 @@ where
let commitment_info = commitment_controller::CommitmentInfo {
l2_height_range: l2_start..=l2_end,
};
let l2_range_to_submit = commitment_info.l2_height_range.clone();

// calculate exclusive range end
let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly

let soft_confirmation_hashes = self
.ledger_db
.get_soft_batch_range(&(*l2_range_to_submit.start()..range_end))?
.iter()
.map(|sb| sb.hash)
.collect::<Vec<[u8; 32]>>();

let commitment = commitment_controller::get_commitment(
commitment_info.clone(),
soft_confirmation_hashes,
)?;

debug!("Sequencer: submitting commitment: {:?}", commitment);

let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone()))
.map_err(|e| anyhow!(e))?;
let (notify, rx) = oneshot_channel();
let request = BlobWithNotifier { blob, notify };
self.da_service
.get_send_transaction_queue()
.send(request)
.map_err(|_| anyhow!("Bitcoin service already stopped!"))?;

info!("Sent commitment to DA queue. L2 range: #{}-{}", commitment.l2_start_block_number, commitment.l2_end_block_number);

let ledger_db = self.ledger_db.clone();
let db_config = self.config.db_config.clone();
let pending_commitments_l2_range = self.pending_commitments_l2_range.clone();
// Handle DA response asynchronously
tokio::spawn(async move {
let result: anyhow::Result<()> = async move {
let tx_id = rx
.await
.map_err(|_| anyhow!("DA service is dead!"))?
.map_err(|_| anyhow!("Send transaction cannot fail"))?;

ledger_db
.set_last_sequencer_commitment_l2_height(BatchNumber(
commitment_info.l2_height_range.end().0,
))
.map_err(|_| {
anyhow!("Sequencer: Failed to set last sequencer commitment L2 height")
})?;

debug!("Commitment info: {:?}", commitment_info);

let l2_start = l2_range_to_submit.start().0 as u32;
let l2_end = l2_range_to_submit.end().0 as u32;
if let Some(db_config) = db_config {
match PostgresConnector::new(db_config).await {
Ok(pg_connector) => {
pg_connector
.insert_sequencer_commitment(
Into::<[u8; 32]>::into(tx_id).to_vec(),
l2_start,
l2_end,
commitment.merkle_root.to_vec(),
CommitmentStatus::Mempool,
)
.await
.map_err(|_| {
anyhow!("Sequencer: Failed to insert sequencer commitment")
})?;
}
Err(e) => {
warn!("Failed to connect to postgres: {:?}", e);
}
self.submit_commitment(commitment_info, true).await?;
}

Ok(())
}

async fn submit_commitment(
&mut self,
commitment_info: commitment_controller::CommitmentInfo,
resubmit: bool,
) -> anyhow::Result<()> {
let l2_range_to_submit = commitment_info.l2_height_range.clone();

// calculate exclusive range end
let range_end = BatchNumber(l2_range_to_submit.end().0 + 1); // cannnot add u64 to BatchNumber directly

let soft_confirmation_hashes = self
.ledger_db
.get_soft_batch_range(&(*l2_range_to_submit.start()..range_end))?
.iter()
.map(|sb| sb.hash)
.collect::<Vec<[u8; 32]>>();

let commitment = commitment_controller::get_commitment(
commitment_info.clone(),
soft_confirmation_hashes,
)?;

debug!("Sequencer: submitting commitment: {:?}", commitment);

let blob = borsh::to_vec(&DaData::SequencerCommitment(commitment.clone()))
.map_err(|e| anyhow!(e))?;
let (notify, rx) = oneshot_channel();
let request = BlobWithNotifier { blob, notify };
self.da_service
.get_send_transaction_queue()
.send(request)
.map_err(|_| anyhow!("Bitcoin service already stopped!"))?;

info!(
"Sent commitment to DA queue. L2 range: #{}-{}",
commitment.l2_start_block_number, commitment.l2_end_block_number
);

if !resubmit {
// Add commitment to pending commitments
let mut pending_commitments = self.pending_commitments_l2_range.lock().await;
pending_commitments.push((*l2_range_to_submit.start(), *l2_range_to_submit.end()));
self.ledger_db
.set_pending_commitments_l2_range(&pending_commitments)?;

// Clear state diff
self.ledger_db.set_state_diff(vec![])?;
self.last_state_diff = vec![];
}

let ledger_db = self.ledger_db.clone();
let db_config = self.config.db_config.clone();
let pending_commitments_l2_range = self.pending_commitments_l2_range.clone();
// Handle DA response asynchronously
tokio::spawn(async move {
let result: anyhow::Result<()> = async move {
let tx_id = rx
.await
.map_err(|_| anyhow!("DA service is dead!"))?
.map_err(|_| anyhow!("Send transaction cannot fail"))?;

ledger_db
.set_last_sequencer_commitment_l2_height(BatchNumber(
commitment_info.l2_height_range.end().0,
))
.map_err(|_| {
anyhow!("Sequencer: Failed to set last sequencer commitment L2 height")
})?;

debug!("Commitment info: {:?}", commitment_info);

let l2_start = l2_range_to_submit.start().0 as u32;
let l2_end = l2_range_to_submit.end().0 as u32;
if let Some(db_config) = db_config {
match PostgresConnector::new(db_config).await {
Ok(pg_connector) => {
pg_connector
.insert_sequencer_commitment(
Into::<[u8; 32]>::into(tx_id).to_vec(),
l2_start,
l2_end,
commitment.merkle_root.to_vec(),
CommitmentStatus::Mempool,
)
.await
.map_err(|_| {
anyhow!("Sequencer: Failed to insert sequencer commitment")
})?;
}
Err(e) => {
warn!("Failed to connect to postgres: {:?}", e);
}
}
}

// Remove commitment from pending commitments
let mut pending_commitments = pending_commitments_l2_range.lock().await;
pending_commitments.retain(|&(start, end)| {
start != *l2_range_to_submit.start() || end != *l2_range_to_submit.end()
});
ledger_db.set_pending_commitments_l2_range(&pending_commitments)?;
// Remove commitment from pending commitments
let mut pending_commitments = pending_commitments_l2_range.lock().await;
pending_commitments.retain(|&(start, end)| {
start != *l2_range_to_submit.start() || end != *l2_range_to_submit.end()
});
ledger_db.set_pending_commitments_l2_range(&pending_commitments)?;

info!("New commitment. L2 range: #{}-{}", l2_start, l2_end);
Ok(())
}
.await;
info!("New commitment. L2 range: #{}-{}", l2_start, l2_end);
Ok(())
}
.await;

if let Err(err) = result {
error!(
"Error in spawned task for handling commitment result: {}",
err
);
}
});
}
if let Err(err) = result {
error!(
"Error in spawned task for handling commitment result: {}",
err
);
}
});

Ok(())
}
Expand Down

0 comments on commit cc91eb1

Please sign in to comment.