diff --git a/src/db.rs b/src/db.rs index 269e9b6..fa413f1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -292,6 +292,23 @@ impl Database { Ok(()) } + // Gets all transactions that were simulated but not sent + pub async fn recover_simulated_txs(&self) -> eyre::Result> { + Ok(sqlx::query_as( + r#" + SELECT r.id as relayer_id, t.id, t.tx_to, t.data, t.value, t.gas_limit, t.priority, t.nonce, r.key_id, r.chain_id + FROM transactions t + INNER JOIN tx_hashes h ON (h.tx_id = t.id) + INNER JOIN relayers r ON (t.relayer_id = r.id + LEFT JOIN sent_transactions s ON (t.id = s.tx_id) + WHERE s.tx_id IS NULL + ORDER BY r.id, t.nonce ASC; + "#, + ) + .fetch_all(&self.pool) + .await?) + } + pub async fn get_latest_block_number_without_fee_estimates( &self, chain_id: u64, diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 154276b..784f52c 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -18,39 +18,38 @@ use crate::broadcast_utils::{ use crate::db::UnsentTx; pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { - loop { - let mut txs = app.db.get_unsent_txs().await?; - - txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); + // Recovery any unsent transactions that were simulated but never sent + let recovered_txs = app.db.recover_simulated_txs().await?; + broadcast_unsent_txs(&app, recovered_txs).await?; - let txs_by_relayer = - txs.into_iter().group_by(|tx| tx.relayer_id.clone()); + loop { + // Get all unsent txs and broadcast + let txs = app.db.get_unsent_txs().await?; + broadcast_unsent_txs(&app, txs).await?; - let txs_by_relayer: HashMap<_, _> = txs_by_relayer - .into_iter() - .map(|(relayer_id, txs)| { - let mut txs = txs.collect_vec(); + tokio::time::sleep(Duration::from_secs(1)).await; + } +} - txs.sort_unstable_by_key(|tx| tx.nonce); +async fn broadcast_unsent_txs( + app: &App, + txs: Vec, +) -> eyre::Result<()> { + let txs_by_relayer = sort_txs_by_relayer(txs); - (relayer_id, txs) - }) - .collect(); + let mut futures = FuturesUnordered::new(); - let mut futures = FuturesUnordered::new(); + for (relayer_id, txs) in txs_by_relayer { + futures.push(broadcast_relayer_txs(&app, relayer_id, txs)); + } - for (relayer_id, txs) in txs_by_relayer { - futures.push(broadcast_relayer_txs(&app, relayer_id, txs)); + while let Some(result) = futures.next().await { + if let Err(err) = result { + tracing::error!(error = ?err, "Failed broadcasting txs"); } - - while let Some(result) = futures.next().await { - if let Err(err) = result { - tracing::error!(error = ?err, "Failed broadcasting txs"); - } - } - - tokio::time::sleep(Duration::from_secs(1)).await; } + + Ok(()) } #[tracing::instrument(skip(app, txs))] @@ -176,3 +175,21 @@ async fn broadcast_relayer_txs( Ok(()) } + +fn sort_txs_by_relayer( + mut txs: Vec, +) -> HashMap> { + txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); + let txs_by_relayer = txs.into_iter().group_by(|tx| tx.relayer_id.clone()); + + txs_by_relayer + .into_iter() + .map(|(relayer_id, txs)| { + let mut txs = txs.collect_vec(); + + txs.sort_unstable_by_key(|tx| tx.nonce); + + (relayer_id, txs) + }) + .collect() +}