diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 3e6664b..e2e97e8 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -69,87 +69,94 @@ async fn broadcast_relayer_txs( ); for tx in txs { - tracing::info!(tx_id = tx.id, nonce = tx.nonce, "Sending transaction"); - - let middleware = app - .signer_middleware(tx.chain_id, tx.key_id.clone()) - .await?; - - let fees = app - .db - .get_latest_block_fees_by_chain_id(tx.chain_id) - .await? - .context("Missing block fees")?; - - let max_base_fee_per_gas = fees.fee_estimates.base_fee_per_gas; - - let (max_fee_per_gas, max_priority_fee_per_gas) = - calculate_gas_fees_from_estimates( - &fees.fee_estimates, - tx.priority.to_percentile_index(), - max_base_fee_per_gas, - ); - - let mut typed_transaction = - TypedTransaction::Eip1559(Eip1559TransactionRequest { - from: None, - to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), - gas: Some(tx.gas_limit.0), - value: Some(tx.value.0), - data: Some(tx.data.into()), - nonce: Some(tx.nonce.into()), - access_list: AccessList::default(), - max_priority_fee_per_gas: Some(max_priority_fee_per_gas), - max_fee_per_gas: Some(max_fee_per_gas), - chain_id: Some(tx.chain_id.into()), - }); - - // Fill and simulate the transaction - middleware - .fill_transaction(&mut typed_transaction, None) - .await?; - - // Get the raw signed tx and derive the tx hash - let raw_signed_tx = middleware - .signer() - .raw_signed_tx(&typed_transaction) - .await?; - let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx)); - - tracing::debug!(tx_id = tx.id, "Saving transaction"); - app.db - .insert_tx_broadcast( - &tx.id, - tx_hash, - max_fee_per_gas, - max_priority_fee_per_gas, - ) - .await?; - - tracing::debug!(tx_id = tx.id, "Sending transaction"); - - let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await; - - let pending_tx = match pending_tx { - Ok(pending_tx) => pending_tx, - Err(err) => { - tracing::error!(tx_id = tx.id, error = ?err, "Failed to send transaction"); - continue; - } - }; - - tracing::info!( - tx_id = tx.id, - tx_nonce = tx.nonce, - tx_hash = ?tx_hash, - ?pending_tx, - "Transaction broadcast" - ); + broadcast_relayer_tx(app, tx).await?; } Ok(()) } +#[tracing::instrument(skip(app, tx), fields(relayer_id = tx.relayer_id, tx_id = tx.id))] +async fn broadcast_relayer_tx(app: &App, tx: UnsentTx) -> eyre::Result<()> { + tracing::info!(tx_id = tx.id, nonce = tx.nonce, "Sending transaction"); + + let middleware = app + .signer_middleware(tx.chain_id, tx.key_id.clone()) + .await?; + + let fees = app + .db + .get_latest_block_fees_by_chain_id(tx.chain_id) + .await? + .context("Missing block fees")?; + + let max_base_fee_per_gas = fees.fee_estimates.base_fee_per_gas; + + let (max_fee_per_gas, max_priority_fee_per_gas) = + calculate_gas_fees_from_estimates( + &fees.fee_estimates, + tx.priority.to_percentile_index(), + max_base_fee_per_gas, + ); + + let mut typed_transaction = + TypedTransaction::Eip1559(Eip1559TransactionRequest { + from: None, + to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), + gas: Some(tx.gas_limit.0), + value: Some(tx.value.0), + data: Some(tx.data.into()), + nonce: Some(tx.nonce.into()), + access_list: AccessList::default(), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + chain_id: Some(tx.chain_id.into()), + }); + + // Fill and simulate the transaction + middleware + .fill_transaction(&mut typed_transaction, None) + .await?; + + // Get the raw signed tx and derive the tx hash + let raw_signed_tx = middleware + .signer() + .raw_signed_tx(&typed_transaction) + .await?; + let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx)); + + tracing::debug!(tx_id = tx.id, "Saving transaction"); + app.db + .insert_tx_broadcast( + &tx.id, + tx_hash, + max_fee_per_gas, + max_priority_fee_per_gas, + ) + .await?; + + tracing::debug!(tx_id = tx.id, "Sending transaction"); + + let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await; + + let pending_tx = match pending_tx { + Ok(pending_tx) => pending_tx, + Err(err) => { + tracing::error!(tx_id = tx.id, error = ?err, "Failed to send transaction"); + return Ok(()); + } + }; + + tracing::info!( + tx_id = tx.id, + tx_nonce = tx.nonce, + tx_hash = ?tx_hash, + ?pending_tx, + "Transaction broadcast" + ); + + Ok(()) +} + fn sort_txs_by_relayer( mut txs: Vec, ) -> HashMap> { diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 76cf4da..cae22f0 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -12,6 +12,7 @@ use futures::StreamExt; use crate::app::App; use crate::broadcast_utils::should_send_relayer_transactions; use crate::db::TxForEscalation; +use crate::types::RelayerInfo; pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { loop { @@ -38,6 +39,7 @@ pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { } } +#[tracing::instrument(skip(app, txs))] async fn escalate_relayer_txs( app: &App, relayer_id: String, @@ -45,100 +47,106 @@ async fn escalate_relayer_txs( ) -> eyre::Result<()> { let relayer = app.db.get_relayer(&relayer_id).await?; - for tx in txs { - if !should_send_relayer_transactions(app, &relayer).await? { - tracing::warn!( - relayer_id = relayer.id, - "Skipping relayer escalations" - ); - - return Ok(()); - } - - tracing::info!( - tx_id = tx.id, - escalation_count = tx.escalation_count, - "Escalating transaction" - ); + if txs.is_empty() { + tracing::info!("No transactions to escalate"); + } - let escalation = tx.escalation_count + 1; + for tx in txs { + escalate_relayer_tx(app, &relayer, tx).await?; + } - let middleware = app - .signer_middleware(tx.chain_id, tx.key_id.clone()) - .await?; + Ok(()) +} - let fees = app - .db - .get_latest_block_fees_by_chain_id(tx.chain_id) - .await? - .context("Missing block")?; - - // Min increase of 20% on the priority fee required for a replacement tx - let factor = U256::from(100); - let increased_gas_price_percentage = - factor + U256::from(20 * (1 + escalation)); - - let initial_max_fee_per_gas = tx.initial_max_fee_per_gas.0; - - let max_fee_per_gas_increase = - initial_max_fee_per_gas * increased_gas_price_percentage / factor; - - let max_fee_per_gas = - initial_max_fee_per_gas + max_fee_per_gas_increase; - - let max_priority_fee_per_gas = - max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; - - let eip1559_tx = Eip1559TransactionRequest { - from: None, - to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), - gas: Some(tx.gas_limit.0), - value: Some(tx.value.0), - data: Some(tx.data.into()), - nonce: Some(tx.nonce.into()), - access_list: AccessList::default(), - max_priority_fee_per_gas: Some(max_priority_fee_per_gas), - max_fee_per_gas: Some(max_fee_per_gas), - chain_id: Some(tx.chain_id.into()), - }; - - let pending_tx = middleware - .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) - .await; - - let pending_tx = match pending_tx { - Ok(pending_tx) => pending_tx, - Err(err) => { - tracing::error!(tx_id = tx.id, error = ?err, "Failed to escalate transaction"); - continue; - } - }; - - let tx_hash = pending_tx.tx_hash(); - - tracing::info!( - tx_id = tx.id, - ?tx_hash, - ?initial_max_fee_per_gas, - ?max_fee_per_gas_increase, - ?max_fee_per_gas, - ?max_priority_fee_per_gas, - ?pending_tx, - "Escalated transaction" - ); - - app.db - .escalate_tx( - &tx.id, - tx_hash, - max_fee_per_gas, - max_priority_fee_per_gas, - ) - .await?; +#[tracing::instrument(skip(app, relayer, tx), fields(tx_id = tx.id))] +async fn escalate_relayer_tx( + app: &App, + relayer: &RelayerInfo, + tx: TxForEscalation, +) -> eyre::Result<()> { + if !should_send_relayer_transactions(app, relayer).await? { + tracing::warn!(relayer_id = relayer.id, "Skipping relayer escalations"); - tracing::info!(tx_id = tx.id, "Escalated transaction saved"); + return Ok(()); } + tracing::info!( + tx_id = tx.id, + escalation_count = tx.escalation_count, + "Escalating transaction" + ); + + let escalation = tx.escalation_count + 1; + + let middleware = app + .signer_middleware(tx.chain_id, tx.key_id.clone()) + .await?; + + let fees = app + .db + .get_latest_block_fees_by_chain_id(tx.chain_id) + .await? + .context("Missing block")?; + + // Min increase of 20% on the priority fee required for a replacement tx + let factor = U256::from(100); + let increased_gas_price_percentage = + factor + U256::from(20 * (1 + escalation)); + + let initial_max_fee_per_gas = tx.initial_max_fee_per_gas.0; + + let max_fee_per_gas_increase = + initial_max_fee_per_gas * increased_gas_price_percentage / factor; + + let max_fee_per_gas = initial_max_fee_per_gas + max_fee_per_gas_increase; + + let max_priority_fee_per_gas = + max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; + + let eip1559_tx = Eip1559TransactionRequest { + from: None, + to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), + gas: Some(tx.gas_limit.0), + value: Some(tx.value.0), + data: Some(tx.data.into()), + nonce: Some(tx.nonce.into()), + access_list: AccessList::default(), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + chain_id: Some(tx.chain_id.into()), + }; + + let pending_tx = middleware + .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) + .await; + + let pending_tx = match pending_tx { + Ok(pending_tx) => pending_tx, + Err(err) => { + tracing::error!(tx_id = tx.id, error = ?err, "Failed to escalate transaction"); + return Ok(()); + } + }; + + let tx_hash = pending_tx.tx_hash(); + + tracing::info!( + tx_id = tx.id, + ?tx_hash, + ?initial_max_fee_per_gas, + ?max_fee_per_gas_increase, + ?max_fee_per_gas, + ?max_priority_fee_per_gas, + ?pending_tx, + "Escalated transaction" + ); + + app.db + .escalate_tx(&tx.id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas) + .await?; + + tracing::info!(tx_id = tx.id, "Escalated transaction saved"); + Ok(()) } diff --git a/src/tasks/index.rs b/src/tasks/index.rs index dcfe76c..4edb23a 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -41,6 +41,7 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { } } +#[tracing::instrument(skip(app, rpc, block))] pub async fn index_block( app: Arc, chain_id: u64, @@ -91,6 +92,7 @@ pub async fn index_block( Ok(()) } +#[tracing::instrument(skip(app, rpc, latest_block))] pub async fn backfill_to_block( app: Arc, chain_id: u64, @@ -203,41 +205,48 @@ pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> { async fn update_relayer_nonces( relayers: &[RelayerInfo], - app: &Arc, + app: &App, rpc: &Provider, chain_id: u64, ) -> Result<(), eyre::Error> { let mut futures = FuturesUnordered::new(); for relayer in relayers { - let app = app.clone(); + futures.push(update_relayer_nonce(app, rpc, relayer, chain_id)); + } - futures.push(async move { - let tx_count = - rpc.get_transaction_count(relayer.address.0, None).await?; + while let Some(result) = futures.next().await { + result?; + } - tracing::info!( - relayer_id = relayer.id, - nonce = ?tx_count, - relayer_address = ?relayer.address.0, - "Updating relayer nonce" - ); + Ok(()) +} - app.db - .update_relayer_nonce( - chain_id, - relayer.address.0, - tx_count.as_u64(), - ) - .await?; +#[tracing::instrument(skip(app, rpc, relayer), fields(relayer_id = relayer.id))] +async fn update_relayer_nonce( + app: &App, + rpc: &Provider, + relayer: &RelayerInfo, + chain_id: u64, +) -> eyre::Result<()> { + let tx_count = rpc.get_transaction_count(relayer.address.0, None).await?; - Result::<(), eyre::Report>::Ok(()) - }) + if tx_count.as_u64() == relayer.current_nonce { + return Ok(()); } - while let Some(result) = futures.next().await { - result?; - } + tracing::info!( + relayer_id = relayer.id, + current_nonce = %relayer.current_nonce, + nonce = %relayer.nonce, + new_current_nonce = %tx_count.as_u64(), + relayer_address = ?relayer.address.0, + "Updating relayer nonce" + ); + + app.db + .update_relayer_nonce(chain_id, relayer.address.0, tx_count.as_u64()) + .await?; Ok(()) }