From d53b8f493a2981fd71c31c0b56af8cb94460628a Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 15:51:04 -0500 Subject: [PATCH] Added logic to indexing to get relayer transactions that are missing from db --- src/db.rs | 23 +++++++++++++++++++++-- src/tasks/index.rs | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/db.rs b/src/db.rs index ac0da7f..fe831e4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,7 +3,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; -use ethers::types::{Address, H256, U256}; +use ethers::types::{Address, Transaction, H256, U256}; use sqlx::migrate::{MigrateDatabase, Migrator}; use sqlx::types::{BigDecimal, Json}; use sqlx::{Pool, Postgres, Row}; @@ -548,6 +548,7 @@ impl Database { pub async fn mine_txs( &self, chain_id: u64, + relayed_transactions: Vec, ) -> eyre::Result> { let updated_txs: Vec<(String, H256Wrapper)> = sqlx::query_as( r#" @@ -578,6 +579,24 @@ impl Database { .fetch_all(&self.pool) .await?; + let missing_transactions = relayed_transactions + .into_iter() + .filter(|tx| { + !updated_txs.iter().any(|(_, tx_hash)| tx_hash.0 == tx.hash) + }) + .collect::>(); + + if !missing_transactions.is_empty() { + + //TODO: get the tx id for each transaction + + //TODO: insert into tx_hashes + + //TODO: insert into sent_transactions + + //TODO: update updated txs + } + Ok(updated_txs .into_iter() .map(|(id, hash)| (id, hash.0)) @@ -1087,7 +1106,7 @@ mod tests { chain_id: u64, finalization_timestamp: DateTime, ) -> eyre::Result<()> { - db.mine_txs(chain_id).await?; + db.mine_txs(chain_id, vec![]).await?; db.handle_soft_reorgs().await?; db.handle_hard_reorgs().await?; diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 0483938..c46bf49 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -3,7 +3,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use ethers::providers::{Http, Middleware, Provider}; -use ethers::types::BlockNumber; +use ethers::types::{Block, BlockNumber, Transaction, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -53,7 +53,16 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { ) .await?; - let mined_txs = app.db.mine_txs(chain_id).await?; + // Get transactions from the block where the from address is a relayer + let txs = get_transactions_from_block(&block, &rpc).await?; + let relayers = app.db.get_relayer_addresses(chain_id).await?; + let relayed_transactions = txs + .into_iter() + .filter(|tx| relayers.contains(&tx.from)) + .collect::>(); + + let mined_txs = + app.db.mine_txs(chain_id, relayed_transactions).await?; let metric_labels = [("chain_id", chain_id.to_string())]; for tx in mined_txs { @@ -196,3 +205,23 @@ pub async fn get_block_fee_estimates( Ok(fee_estimates) } + +pub async fn get_transactions_from_block( + block: &Block, + rpc: &Provider, +) -> eyre::Result> { + let mut futures = FuturesUnordered::new(); + + block + .transactions + .iter() + .map(|tx_hash| futures.push(rpc.get_transaction(*tx_hash))); + + let mut transactions = vec![]; + while let Some(tx) = futures.next().await { + let tx = tx?.context("Tx not found")?; + transactions.push(tx); + } + + Ok(transactions) +}