Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve-logging #15

Merged
merged 4 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn calculate_gas_fees_from_estimates(
(max_fee_per_gas, max_priority_fee_per_gas)
}

pub async fn should_send_transaction(
pub async fn should_send_relayer_transactions(
app: &App,
relayer: &RelayerInfo,
) -> eyre::Result<bool> {
Expand All @@ -43,6 +43,7 @@ pub async fn should_send_transaction(

if chain_fees.gas_price > gas_limit.value.0 {
tracing::warn!(
relayer_id = relayer.id,
chain_id = relayer.chain_id,
gas_price = ?chain_fees.gas_price,
gas_limit = ?gas_limit.value.0,
Expand Down
48 changes: 27 additions & 21 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::types::{RelayerInfo, RelayerUpdate, TransactionPriority};

pub mod data;

use self::data::{
AddressWrapper, BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind,
};
use self::data::{BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind};
pub use self::data::{TxForEscalation, TxStatus, UnsentTx};

// Statically link in migration files
Expand Down Expand Up @@ -141,6 +139,32 @@ impl Database {
.await?)
}

pub async fn get_relayers_by_chain_id(
&self,
chain_id: u64,
) -> eyre::Result<Vec<RelayerInfo>> {
Ok(sqlx::query_as(
r#"
SELECT
id,
name,
chain_id,
key_id,
address,
nonce,
current_nonce,
max_inflight_txs,
gas_price_limits,
enabled
FROM relayers
WHERE chain_id = $1
"#,
)
.bind(chain_id as i64)
.fetch_all(&self.pool)
.await?)
}

pub async fn get_relayer(&self, id: &str) -> eyre::Result<RelayerInfo> {
Ok(sqlx::query_as(
r#"
Expand Down Expand Up @@ -781,24 +805,6 @@ impl Database {
.await?)
}

pub async fn get_relayer_addresses(
&self,
chain_id: u64,
) -> eyre::Result<Vec<Address>> {
let items: Vec<(AddressWrapper,)> = sqlx::query_as(
r#"
SELECT address
FROM relayers
WHERE chain_id = $1
"#,
)
.bind(chain_id as i64)
.fetch_all(&self.pool)
.await?;

Ok(items.into_iter().map(|(wrapper,)| wrapper.0).collect())
}

pub async fn update_relayer_nonce(
&self,
chain_id: u64,
Expand Down
2 changes: 1 addition & 1 deletion src/server/routes/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn send_tx(
)
.await?;

tracing::info!(id = tx_id, "Tx created");
tracing::info!(tx_id, "Transaction created");

Ok(Json(SendTxResponse { tx_id }))
}
Expand Down
6 changes: 3 additions & 3 deletions src/task_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ where
let mut failures = vec![];

loop {
tracing::info!(label, "Running task");
tracing::info!(task_label = label, "Running task");

let result = task(app.clone()).await;

if let Err(err) = result {
tracing::error!(label, error = ?err, "Task failed");
tracing::error!(task_label = label, error = ?err, "Task failed");

failures.push(Instant::now());
let backoff = determine_backoff(&failures);
Expand All @@ -47,7 +47,7 @@ where

prune_failures(&mut failures);
} else {
tracing::info!(label, "Task finished");
tracing::info!(task_label = label, "Task finished");
break;
}
}
Expand Down
65 changes: 39 additions & 26 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ use itertools::Itertools;

use crate::app::App;
use crate::broadcast_utils::{
calculate_gas_fees_from_estimates, should_send_transaction,
calculate_gas_fees_from_estimates, should_send_relayer_transactions,
};
use crate::db::UnsentTx;

const NO_TXS_SLEEP_DURATION: Duration = Duration::from_secs(2);

pub async fn broadcast_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
// Get all unsent txs and broadcast
let txs = app.db.get_unsent_txs().await?;
let num_txs = txs.len();

let txs_by_relayer = sort_txs_by_relayer(txs);

let mut futures = FuturesUnordered::new();
Expand All @@ -31,11 +34,13 @@ pub async fn broadcast_txs(app: Arc<App>) -> eyre::Result<()> {

while let Some(result) = futures.next().await {
if let Err(err) = result {
tracing::error!(error = ?err, "Failed broadcasting txs");
tracing::error!(error = ?err, "Failed broadcasting transactions");
}
}

tokio::time::sleep(Duration::from_secs(1)).await;
if num_txs == 0 {
tokio::time::sleep(NO_TXS_SLEEP_DURATION).await;
}
}
}

Expand All @@ -49,21 +54,22 @@ async fn broadcast_relayer_txs(
return Ok(());
}

tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs");

let relayer = app.db.get_relayer(&relayer_id).await?;

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer_id,
"Skipping transaction broadcasts"
);
if !should_send_relayer_transactions(app, &relayer).await? {
tracing::warn!(relayer_id = relayer_id, "Skipping relayer broadcasts");

return Ok(());
}

tracing::info!(
relayer_id,
num_txs = txs.len(),
"Broadcasting relayer transactions"
);

for tx in txs {
tracing::info!(id = tx.id, "Sending tx");
tracing::info!(tx_id = tx.id, nonce = tx.nonce, "Sending transaction");

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
Expand Down Expand Up @@ -103,16 +109,22 @@ async fn broadcast_relayer_txs(
.fill_transaction(&mut typed_transaction, None)
.await?;

tracing::debug!(?tx.id, "Simulating tx");
tracing::debug!(tx_id = tx.id, "Simulating transaction");

// Simulate the transaction
match middleware.call(&typed_transaction, None).await {
Ok(_) => {
tracing::info!(?tx.id, "Tx simulated successfully");
tracing::info!(
tx_id = tx.id,
"Transaction simulated successfully"
);
}
Err(err) => {
tracing::error!(?tx.id, error = ?err, "Failed to simulate tx");
continue;
tracing::error!(tx_id = tx.id, error = ?err, "Failed to simulate transaction");

// If we fail while broadcasting a tx with nonce `n`,
// it doesn't make sense to broadcast tx with nonce `n + 1`
return Ok(());
}
};

Expand All @@ -133,24 +145,25 @@ async fn broadcast_relayer_txs(
)
.await?;

tracing::debug!(?tx.id, "Sending tx");
tracing::debug!(tx_id = tx.id, "Sending transaction");

// TODO: Be smarter about error handling - a tx can fail to be sent
// e.g. because the relayer is out of funds
// but we don't want to retry it forever
let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await;

match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
}
let pending_tx = match pending_tx {
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(?tx.id, error = ?err, "Failed to send tx");
tracing::error!(tx_id = tx.id, error = ?err, "Failed to send transaction");
continue;
}
};

tracing::info!(id = tx.id, hash = ?tx_hash, "Tx broadcast");
tracing::info!(
tx_id = tx.id,
tx_nonce = tx.nonce,
tx_hash = ?tx_hash,
?pending_tx,
"Transaction broadcast"
);
}

Ok(())
Expand Down
53 changes: 31 additions & 22 deletions src/tasks/escalate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;

use crate::app::App;
use crate::broadcast_utils::should_send_transaction;
use crate::broadcast_utils::should_send_relayer_transactions;
use crate::db::TxForEscalation;

pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
Expand Down Expand Up @@ -46,14 +46,21 @@ async fn escalate_relayer_txs(
let relayer = app.db.get_relayer(&relayer_id).await?;

for tx in txs {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(id = tx.id, "Skipping transaction broadcast");
if !should_send_relayer_transactions(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer.id,
"Skipping relayer escalations"
);

return Ok(());
}

tracing::info!(
tx_id = tx.id,
escalation_count = tx.escalation_count,
"Escalating transaction"
);

let escalation = tx.escalation_count + 1;

let middleware = app
Expand All @@ -71,23 +78,17 @@ async fn escalate_relayer_txs(
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;
let initial_max_fee_per_gas = tx.initial_max_fee_per_gas.0;

let max_fee_per_gas_increase =
initial_max_fee_per_gas * increased_gas_price_percentage / factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;
initial_max_fee_per_gas + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
Expand All @@ -106,18 +107,26 @@ async fn escalate_relayer_txs(
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
}
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
tracing::error!(tx_id = tx.id, error = ?err, "Failed to escalate transaction");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

tracing::info!(
tx_id = tx.id,
?tx_hash,
?initial_max_fee_per_gas,
?max_fee_per_gas_increase,
?max_fee_per_gas,
?max_priority_fee_per_gas,
?pending_tx,
"Escalated transaction"
);

app.db
.escalate_tx(
&tx.id,
Expand All @@ -127,7 +136,7 @@ async fn escalate_relayer_txs(
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
tracing::info!(tx_id = tx.id, "Escalated transaction saved");
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/handle_reorgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub async fn handle_hard_reorgs(app: Arc<App>) -> eyre::Result<()> {
let reorged_txs = app.db.handle_hard_reorgs().await?;

for tx in reorged_txs {
tracing::info!(id = tx, "Tx hard reorged");
tracing::info!(tx_id = tx, "Transaction hard reorged");
}

tokio::time::sleep(app.config.service.hard_reorg_interval).await;
Expand All @@ -23,7 +23,7 @@ pub async fn handle_soft_reorgs(app: Arc<App>) -> eyre::Result<()> {
let txs = app.db.handle_soft_reorgs().await?;

for tx in txs {
tracing::info!(id = tx, "Tx soft reorged");
tracing::info!(tx_id = tx, "Transaction soft reorged");
}

tokio::time::sleep(app.config.service.soft_reorg_interval).await;
Expand Down
Loading