Skip to content

Commit

Permalink
added logic to recover simulated txs
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Dec 13, 2023
1 parent 524199b commit 479e335
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 25 deletions.
17 changes: 17 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,23 @@ impl Database {
Ok(())
}

// Gets all transactions that were simulated but not sent
pub async fn recover_simulated_txs(&self) -> eyre::Result<Vec<UnsentTx>> {
Ok(sqlx::query_as(
r#"
SELECT r.id as relayer_id, t.id, t.tx_to, t.data, t.value, t.gas_limit, t.priority, t.nonce, r.key_id, r.chain_id
FROM transactions t
INNER JOIN tx_hashes h ON (h.tx_id = t.id)
INNER JOIN relayers r ON (t.relayer_id = r.id
LEFT JOIN sent_transactions s ON (t.id = s.tx_id)
WHERE s.tx_id IS NULL
ORDER BY r.id, t.nonce ASC;
"#,
)
.fetch_all(&self.pool)
.await?)
}

pub async fn get_latest_block_number_without_fee_estimates(
&self,
chain_id: u64,
Expand Down
67 changes: 42 additions & 25 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,38 @@ use crate::broadcast_utils::{
use crate::db::UnsentTx;

pub async fn broadcast_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
let mut txs = app.db.get_unsent_txs().await?;

txs.sort_unstable_by_key(|tx| tx.relayer_id.clone());
// Recovery any unsent transactions that were simulated but never sent
let recovered_txs = app.db.recover_simulated_txs().await?;
broadcast_unsent_txs(&app, recovered_txs).await?;

let txs_by_relayer =
txs.into_iter().group_by(|tx| tx.relayer_id.clone());
loop {
// Get all unsent txs and broadcast
let txs = app.db.get_unsent_txs().await?;
broadcast_unsent_txs(&app, txs).await?;

let txs_by_relayer: HashMap<_, _> = txs_by_relayer
.into_iter()
.map(|(relayer_id, txs)| {
let mut txs = txs.collect_vec();
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

txs.sort_unstable_by_key(|tx| tx.nonce);
async fn broadcast_unsent_txs(
app: &App,
txs: Vec<UnsentTx>,
) -> eyre::Result<()> {
let txs_by_relayer = sort_txs_by_relayer(txs);

(relayer_id, txs)
})
.collect();
let mut futures = FuturesUnordered::new();

let mut futures = FuturesUnordered::new();
for (relayer_id, txs) in txs_by_relayer {
futures.push(broadcast_relayer_txs(&app, relayer_id, txs));
}

for (relayer_id, txs) in txs_by_relayer {
futures.push(broadcast_relayer_txs(&app, relayer_id, txs));
while let Some(result) = futures.next().await {
if let Err(err) = result {
tracing::error!(error = ?err, "Failed broadcasting txs");
}

while let Some(result) = futures.next().await {
if let Err(err) = result {
tracing::error!(error = ?err, "Failed broadcasting txs");
}
}

tokio::time::sleep(Duration::from_secs(1)).await;
}

Ok(())
}

#[tracing::instrument(skip(app, txs))]
Expand Down Expand Up @@ -176,3 +175,21 @@ async fn broadcast_relayer_txs(

Ok(())
}

fn sort_txs_by_relayer(
mut txs: Vec<UnsentTx>,
) -> HashMap<String, Vec<UnsentTx>> {
txs.sort_unstable_by_key(|tx| tx.relayer_id.clone());
let txs_by_relayer = txs.into_iter().group_by(|tx| tx.relayer_id.clone());

txs_by_relayer
.into_iter()
.map(|(relayer_id, txs)| {
let mut txs = txs.collect_vec();

txs.sort_unstable_by_key(|tx| tx.nonce);

(relayer_id, txs)
})
.collect()
}

0 comments on commit 479e335

Please sign in to comment.