Skip to content

Commit

Permalink
Added logic to indexing to get relayer transactions that are missing …
Browse files Browse the repository at this point in the history
…from db
  • Loading branch information
0xKitsune committed Dec 12, 2023
1 parent 2f2766e commit d53b8f4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
23 changes: 21 additions & 2 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::time::Duration;

use chrono::{DateTime, Utc};
use ethers::types::{Address, H256, U256};
use ethers::types::{Address, Transaction, H256, U256};
use sqlx::migrate::{MigrateDatabase, Migrator};
use sqlx::types::{BigDecimal, Json};
use sqlx::{Pool, Postgres, Row};
Expand Down Expand Up @@ -548,6 +548,7 @@ impl Database {
pub async fn mine_txs(
&self,
chain_id: u64,
relayed_transactions: Vec<Transaction>,
) -> eyre::Result<Vec<(String, H256)>> {
let updated_txs: Vec<(String, H256Wrapper)> = sqlx::query_as(
r#"
Expand Down Expand Up @@ -578,6 +579,24 @@ impl Database {
.fetch_all(&self.pool)
.await?;

let missing_transactions = relayed_transactions
.into_iter()
.filter(|tx| {
!updated_txs.iter().any(|(_, tx_hash)| tx_hash.0 == tx.hash)
})
.collect::<Vec<Transaction>>();

if !missing_transactions.is_empty() {

//TODO: get the tx id for each transaction

//TODO: insert into tx_hashes

//TODO: insert into sent_transactions

//TODO: update updated txs
}

Ok(updated_txs
.into_iter()
.map(|(id, hash)| (id, hash.0))
Expand Down Expand Up @@ -1087,7 +1106,7 @@ mod tests {
chain_id: u64,
finalization_timestamp: DateTime<Utc>,
) -> eyre::Result<()> {
db.mine_txs(chain_id).await?;
db.mine_txs(chain_id, vec![]).await?;

db.handle_soft_reorgs().await?;
db.handle_hard_reorgs().await?;
Expand Down
33 changes: 31 additions & 2 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use chrono::{DateTime, Utc};
use ethers::providers::{Http, Middleware, Provider};
use ethers::types::BlockNumber;
use ethers::types::{Block, BlockNumber, Transaction, H256};
use eyre::{Context, ContextCompat};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand Down Expand Up @@ -53,7 +53,16 @@ pub async fn index_chain(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
)
.await?;

let mined_txs = app.db.mine_txs(chain_id).await?;
// Get transactions from the block where the from address is a relayer
let txs = get_transactions_from_block(&block, &rpc).await?;
let relayers = app.db.get_relayer_addresses(chain_id).await?;
let relayed_transactions = txs
.into_iter()
.filter(|tx| relayers.contains(&tx.from))
.collect::<Vec<Transaction>>();

let mined_txs =
app.db.mine_txs(chain_id, relayed_transactions).await?;

let metric_labels = [("chain_id", chain_id.to_string())];
for tx in mined_txs {
Expand Down Expand Up @@ -196,3 +205,23 @@ pub async fn get_block_fee_estimates(

Ok(fee_estimates)
}

pub async fn get_transactions_from_block(
block: &Block<H256>,
rpc: &Provider<Http>,
) -> eyre::Result<Vec<Transaction>> {
let mut futures = FuturesUnordered::new();

block

Check failure on line 215 in src/tasks/index.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused `std::iter::Map` that must be used
.transactions
.iter()
.map(|tx_hash| futures.push(rpc.get_transaction(*tx_hash)));

Check failure on line 218 in src/tasks/index.rs

View workflow job for this annotation

GitHub Actions / cargo test

`Iterator::map` call that discard the iterator's values

let mut transactions = vec![];
while let Some(tx) = futures.next().await {
let tx = tx?.context("Tx not found")?;
transactions.push(tx);
}

Ok(transactions)
}

0 comments on commit d53b8f4

Please sign in to comment.