Skip to content

Commit

Permalink
More instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Feb 14, 2024
1 parent 77add8f commit 70031b1
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 187 deletions.
159 changes: 83 additions & 76 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,87 +69,94 @@ async fn broadcast_relayer_txs(
);

for tx in txs {
tracing::info!(tx_id = tx.id, nonce = tx.nonce, "Sending transaction");

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block fees")?;

let max_base_fee_per_gas = fees.fee_estimates.base_fee_per_gas;

let (max_fee_per_gas, max_priority_fee_per_gas) =
calculate_gas_fees_from_estimates(
&fees.fee_estimates,
tx.priority.to_percentile_index(),
max_base_fee_per_gas,
);

let mut typed_transaction =
TypedTransaction::Eip1559(Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
});

// Fill and simulate the transaction
middleware
.fill_transaction(&mut typed_transaction, None)
.await?;

// Get the raw signed tx and derive the tx hash
let raw_signed_tx = middleware
.signer()
.raw_signed_tx(&typed_transaction)
.await?;
let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx));

tracing::debug!(tx_id = tx.id, "Saving transaction");
app.db
.insert_tx_broadcast(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::debug!(tx_id = tx.id, "Sending transaction");

let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await;

let pending_tx = match pending_tx {
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(tx_id = tx.id, error = ?err, "Failed to send transaction");
continue;
}
};

tracing::info!(
tx_id = tx.id,
tx_nonce = tx.nonce,
tx_hash = ?tx_hash,
?pending_tx,
"Transaction broadcast"
);
broadcast_relayer_tx(app, tx).await?;
}

Ok(())
}

#[tracing::instrument(skip(app, tx), fields(relayer_id = tx.relayer_id, tx_id = tx.id))]
async fn broadcast_relayer_tx(app: &App, tx: UnsentTx) -> eyre::Result<()> {
tracing::info!(tx_id = tx.id, nonce = tx.nonce, "Sending transaction");

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block fees")?;

let max_base_fee_per_gas = fees.fee_estimates.base_fee_per_gas;

let (max_fee_per_gas, max_priority_fee_per_gas) =
calculate_gas_fees_from_estimates(
&fees.fee_estimates,
tx.priority.to_percentile_index(),
max_base_fee_per_gas,
);

let mut typed_transaction =
TypedTransaction::Eip1559(Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
});

// Fill and simulate the transaction
middleware
.fill_transaction(&mut typed_transaction, None)
.await?;

// Get the raw signed tx and derive the tx hash
let raw_signed_tx = middleware
.signer()
.raw_signed_tx(&typed_transaction)
.await?;
let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx));

tracing::debug!(tx_id = tx.id, "Saving transaction");
app.db
.insert_tx_broadcast(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::debug!(tx_id = tx.id, "Sending transaction");

let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await;

let pending_tx = match pending_tx {
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(tx_id = tx.id, error = ?err, "Failed to send transaction");
return Ok(());
}
};

tracing::info!(
tx_id = tx.id,
tx_nonce = tx.nonce,
tx_hash = ?tx_hash,
?pending_tx,
"Transaction broadcast"
);

Ok(())
}

fn sort_txs_by_relayer(
mut txs: Vec<UnsentTx>,
) -> HashMap<String, Vec<UnsentTx>> {
Expand Down
184 changes: 96 additions & 88 deletions src/tasks/escalate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::StreamExt;
use crate::app::App;
use crate::broadcast_utils::should_send_relayer_transactions;
use crate::db::TxForEscalation;
use crate::types::RelayerInfo;

pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
Expand All @@ -38,107 +39,114 @@ pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
}
}

#[tracing::instrument(skip(app, txs))]
async fn escalate_relayer_txs(
app: &App,
relayer_id: String,
txs: Vec<TxForEscalation>,
) -> eyre::Result<()> {
let relayer = app.db.get_relayer(&relayer_id).await?;

for tx in txs {
if !should_send_relayer_transactions(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer.id,
"Skipping relayer escalations"
);

return Ok(());
}

tracing::info!(
tx_id = tx.id,
escalation_count = tx.escalation_count,
"Escalating transaction"
);
if txs.is_empty() {
tracing::info!("No transactions to escalate");
}

let escalation = tx.escalation_count + 1;
for tx in txs {
escalate_relayer_tx(app, &relayer, tx).await?;
}

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;
Ok(())
}

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let initial_max_fee_per_gas = tx.initial_max_fee_per_gas.0;

let max_fee_per_gas_increase =
initial_max_fee_per_gas * increased_gas_price_percentage / factor;

let max_fee_per_gas =
initial_max_fee_per_gas + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(tx_id = tx.id, error = ?err, "Failed to escalate transaction");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

tracing::info!(
tx_id = tx.id,
?tx_hash,
?initial_max_fee_per_gas,
?max_fee_per_gas_increase,
?max_fee_per_gas,
?max_priority_fee_per_gas,
?pending_tx,
"Escalated transaction"
);

app.db
.escalate_tx(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;
#[tracing::instrument(skip(app, relayer, tx), fields(tx_id = tx.id))]
async fn escalate_relayer_tx(
app: &App,
relayer: &RelayerInfo,
tx: TxForEscalation,
) -> eyre::Result<()> {
if !should_send_relayer_transactions(app, relayer).await? {
tracing::warn!(relayer_id = relayer.id, "Skipping relayer escalations");

tracing::info!(tx_id = tx.id, "Escalated transaction saved");
return Ok(());
}

tracing::info!(
tx_id = tx.id,
escalation_count = tx.escalation_count,
"Escalating transaction"
);

let escalation = tx.escalation_count + 1;

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let initial_max_fee_per_gas = tx.initial_max_fee_per_gas.0;

let max_fee_per_gas_increase =
initial_max_fee_per_gas * increased_gas_price_percentage / factor;

let max_fee_per_gas = initial_max_fee_per_gas + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(tx_id = tx.id, error = ?err, "Failed to escalate transaction");
return Ok(());
}
};

let tx_hash = pending_tx.tx_hash();

tracing::info!(
tx_id = tx.id,
?tx_hash,
?initial_max_fee_per_gas,
?max_fee_per_gas_increase,
?max_fee_per_gas,
?max_priority_fee_per_gas,
?pending_tx,
"Escalated transaction"
);

app.db
.escalate_tx(&tx.id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas)
.await?;

tracing::info!(tx_id = tx.id, "Escalated transaction saved");

Ok(())
}

Expand Down
Loading

0 comments on commit 70031b1

Please sign in to comment.