Skip to content

Commit

Permalink
Escalate per relayer & don't if relayer disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Jan 9, 2024
1 parent 0b96030 commit 22a0395
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 85 deletions.
13 changes: 11 additions & 2 deletions src/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use eyre::ContextCompat;

use self::gas_estimation::FeesEstimate;
use crate::app::App;
use crate::types::RelayerInfo;

pub mod gas_estimation;

Expand All @@ -21,9 +22,17 @@ pub fn calculate_gas_fees_from_estimates(

pub async fn should_send_transaction(
app: &App,
relayer_id: &str,
relayer: &RelayerInfo,
) -> eyre::Result<bool> {
let relayer = app.db.get_relayer(relayer_id).await?;
if !relayer.enabled {
tracing::warn!(
relayer_id = relayer.id,
chain_id = relayer.chain_id,
"Relayer is disabled, skipping transactions broadcast"
);

return Ok(false);
}

for gas_limit in &relayer.gas_price_limits.0 {
let chain_fees = app
Expand Down
6 changes: 4 additions & 2 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ async fn broadcast_relayer_txs(
app: &App,
relayer_id: String,
txs: Vec<UnsentTx>,
) -> Result<(), eyre::Error> {
) -> eyre::Result<()> {
if txs.is_empty() {
return Ok(());
}

tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs");

if !should_send_transaction(app, &relayer_id).await? {
let relayer = app.db.get_relayer(&relayer_id).await?;

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer_id,
"Skipping transaction broadcasts"
Expand Down
210 changes: 130 additions & 80 deletions src/tasks/escalate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::collections::HashMap;
use std::sync::Arc;

use ethers::providers::Middleware;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::types::transaction::eip2930::AccessList;
use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, U256};
use eyre::ContextCompat;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

use crate::app::App;
use crate::broadcast_utils::should_send_transaction;
use crate::db::TxForEscalation;

pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
Expand All @@ -16,90 +20,136 @@ pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
.get_txs_for_escalation(app.config.service.escalation_interval)
.await?;

for tx in txs_for_escalation {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");
let txs_for_escalation = split_txs_per_relayer(txs_for_escalation);

if !should_send_transaction(&app, &tx.relayer_id).await? {
tracing::warn!(id = tx.id, "Skipping transaction broadcast");
continue;
}
let mut futures = FuturesUnordered::new();

let escalation = tx.escalation_count + 1;

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
}
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

app.db
.escalate_tx(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
for (relayer_id, txs) in txs_for_escalation {
futures.push(escalate_relayer_txs(&app, relayer_id, txs));
}

while let Some(result) = futures.next().await {
if let Err(err) = result {
tracing::error!(error = ?err, "Failed escalating txs");
}
}

tokio::time::sleep(app.config.service.escalation_interval).await;
}
}

async fn escalate_relayer_txs(
app: &App,
relayer_id: String,
txs: Vec<TxForEscalation>,
) -> eyre::Result<()> {
let relayer = app.db.get_relayer(&relayer_id).await?;

for tx in txs {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");

if !should_send_transaction(&app, &relayer).await? {

Check failure on line 51 in src/tasks/escalate.rs

View workflow job for this annotation

GitHub Actions / cargo test

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 51 in src/tasks/escalate.rs

View workflow job for this annotation

GitHub Actions / cargo test

this expression creates a reference which is immediately dereferenced by the compiler
tracing::warn!(id = tx.id, "Skipping transaction broadcast");

return Ok(());
}

let escalation = tx.escalation_count + 1;

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
}
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

app.db
.escalate_tx(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
}

Ok(())
}

fn split_txs_per_relayer(
txs: Vec<TxForEscalation>,
) -> HashMap<String, Vec<TxForEscalation>> {
let mut txs_per_relayer = HashMap::new();

for tx in txs {
let relayer_id = tx.relayer_id.clone();

let txs_for_relayer =
txs_per_relayer.entry(relayer_id).or_insert_with(Vec::new);

txs_for_relayer.push(tx);
}

for (_, txs) in txs_per_relayer.iter_mut() {
txs.sort_by_key(|tx| tx.escalation_count);
}

txs_per_relayer
}
2 changes: 1 addition & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ mod tests {
value: U256Wrapper(U256::zero()),
chain_id: 1,
}]),
enabled: true
enabled: true,
};

let json = serde_json::to_string_pretty(&info).unwrap();
Expand Down

0 comments on commit 22a0395

Please sign in to comment.