From 22a039598def916da0b73abc4020c40f597e7bd9 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Tue, 9 Jan 2024 20:10:28 +0100 Subject: [PATCH] Escalate per relayer & don't if relayer disabled --- src/broadcast_utils.rs | 13 ++- src/tasks/broadcast.rs | 6 +- src/tasks/escalate.rs | 210 +++++++++++++++++++++++++---------------- src/types.rs | 2 +- 4 files changed, 146 insertions(+), 85 deletions(-) diff --git a/src/broadcast_utils.rs b/src/broadcast_utils.rs index ba6b405..3de31b5 100644 --- a/src/broadcast_utils.rs +++ b/src/broadcast_utils.rs @@ -3,6 +3,7 @@ use eyre::ContextCompat; use self::gas_estimation::FeesEstimate; use crate::app::App; +use crate::types::RelayerInfo; pub mod gas_estimation; @@ -21,9 +22,17 @@ pub fn calculate_gas_fees_from_estimates( pub async fn should_send_transaction( app: &App, - relayer_id: &str, + relayer: &RelayerInfo, ) -> eyre::Result { - let relayer = app.db.get_relayer(relayer_id).await?; + if !relayer.enabled { + tracing::warn!( + relayer_id = relayer.id, + chain_id = relayer.chain_id, + "Relayer is disabled, skipping transactions broadcast" + ); + + return Ok(false); + } for gas_limit in &relayer.gas_price_limits.0 { let chain_fees = app diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 7f33f21..ce1f44a 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -44,14 +44,16 @@ async fn broadcast_relayer_txs( app: &App, relayer_id: String, txs: Vec, -) -> Result<(), eyre::Error> { +) -> eyre::Result<()> { if txs.is_empty() { return Ok(()); } tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs"); - if !should_send_transaction(app, &relayer_id).await? { + let relayer = app.db.get_relayer(&relayer_id).await?; + + if !should_send_transaction(app, &relayer).await? { tracing::warn!( relayer_id = relayer_id, "Skipping transaction broadcasts" diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 3a8c51f..d275525 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use ethers::providers::Middleware; @@ -5,9 +6,12 @@ use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::transaction::eip2930::AccessList; use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, U256}; use eyre::ContextCompat; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use crate::app::App; use crate::broadcast_utils::should_send_transaction; +use crate::db::TxForEscalation; pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { loop { @@ -16,90 +20,136 @@ pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { .get_txs_for_escalation(app.config.service.escalation_interval) .await?; - for tx in txs_for_escalation { - tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx"); + let txs_for_escalation = split_txs_per_relayer(txs_for_escalation); - if !should_send_transaction(&app, &tx.relayer_id).await? { - tracing::warn!(id = tx.id, "Skipping transaction broadcast"); - continue; - } + let mut futures = FuturesUnordered::new(); - let escalation = tx.escalation_count + 1; - - let middleware = app - .signer_middleware(tx.chain_id, tx.key_id.clone()) - .await?; - - let fees = app - .db - .get_latest_block_fees_by_chain_id(tx.chain_id) - .await? - .context("Missing block")?; - - // Min increase of 20% on the priority fee required for a replacement tx - let factor = U256::from(100); - let increased_gas_price_percentage = - factor + U256::from(20 * (1 + escalation)); - - let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0 - * increased_gas_price_percentage - / factor; - - let max_fee_per_gas = - tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase; - - let max_priority_fee_per_gas = - max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; - - tracing::warn!( - "Initial tx fees are max = {}, priority = {}", - tx.initial_max_fee_per_gas.0, - tx.initial_max_priority_fee_per_gas.0 - ); - tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}"); - - let eip1559_tx = Eip1559TransactionRequest { - from: None, - to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), - gas: Some(tx.gas_limit.0), - value: Some(tx.value.0), - data: Some(tx.data.into()), - nonce: Some(tx.nonce.into()), - access_list: AccessList::default(), - max_priority_fee_per_gas: Some(max_priority_fee_per_gas), - max_fee_per_gas: Some(max_fee_per_gas), - chain_id: Some(tx.chain_id.into()), - }; - - let pending_tx = middleware - .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) - .await; - - let pending_tx = match pending_tx { - Ok(pending_tx) => { - tracing::info!(?pending_tx, "Tx sent successfully"); - pending_tx - } - Err(err) => { - tracing::error!(error = ?err, "Failed to send tx"); - continue; - } - }; - - let tx_hash = pending_tx.tx_hash(); - - app.db - .escalate_tx( - &tx.id, - tx_hash, - max_fee_per_gas, - max_priority_fee_per_gas, - ) - .await?; - - tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated"); + for (relayer_id, txs) in txs_for_escalation { + futures.push(escalate_relayer_txs(&app, relayer_id, txs)); + } + + while let Some(result) = futures.next().await { + if let Err(err) = result { + tracing::error!(error = ?err, "Failed escalating txs"); + } } tokio::time::sleep(app.config.service.escalation_interval).await; } } + +async fn escalate_relayer_txs( + app: &App, + relayer_id: String, + txs: Vec, +) -> eyre::Result<()> { + let relayer = app.db.get_relayer(&relayer_id).await?; + + for tx in txs { + tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx"); + + if !should_send_transaction(&app, &relayer).await? { + tracing::warn!(id = tx.id, "Skipping transaction broadcast"); + + return Ok(()); + } + + let escalation = tx.escalation_count + 1; + + let middleware = app + .signer_middleware(tx.chain_id, tx.key_id.clone()) + .await?; + + let fees = app + .db + .get_latest_block_fees_by_chain_id(tx.chain_id) + .await? + .context("Missing block")?; + + // Min increase of 20% on the priority fee required for a replacement tx + let factor = U256::from(100); + let increased_gas_price_percentage = + factor + U256::from(20 * (1 + escalation)); + + let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0 + * increased_gas_price_percentage + / factor; + + let max_fee_per_gas = + tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase; + + let max_priority_fee_per_gas = + max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; + + tracing::warn!( + "Initial tx fees are max = {}, priority = {}", + tx.initial_max_fee_per_gas.0, + tx.initial_max_priority_fee_per_gas.0 + ); + tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}"); + + let eip1559_tx = Eip1559TransactionRequest { + from: None, + to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), + gas: Some(tx.gas_limit.0), + value: Some(tx.value.0), + data: Some(tx.data.into()), + nonce: Some(tx.nonce.into()), + access_list: AccessList::default(), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + chain_id: Some(tx.chain_id.into()), + }; + + let pending_tx = middleware + .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) + .await; + + let pending_tx = match pending_tx { + Ok(pending_tx) => { + tracing::info!(?pending_tx, "Tx sent successfully"); + pending_tx + } + Err(err) => { + tracing::error!(error = ?err, "Failed to send tx"); + continue; + } + }; + + let tx_hash = pending_tx.tx_hash(); + + app.db + .escalate_tx( + &tx.id, + tx_hash, + max_fee_per_gas, + max_priority_fee_per_gas, + ) + .await?; + + tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated"); + } + + Ok(()) +} + +fn split_txs_per_relayer( + txs: Vec, +) -> HashMap> { + let mut txs_per_relayer = HashMap::new(); + + for tx in txs { + let relayer_id = tx.relayer_id.clone(); + + let txs_for_relayer = + txs_per_relayer.entry(relayer_id).or_insert_with(Vec::new); + + txs_for_relayer.push(tx); + } + + for (_, txs) in txs_per_relayer.iter_mut() { + txs.sort_by_key(|tx| tx.escalation_count); + } + + txs_per_relayer +} diff --git a/src/types.rs b/src/types.rs index 62c8bae..b349d08 100644 --- a/src/types.rs +++ b/src/types.rs @@ -87,7 +87,7 @@ mod tests { value: U256Wrapper(U256::zero()), chain_id: 1, }]), - enabled: true + enabled: true, }; let json = serde_json::to_string_pretty(&info).unwrap();