Skip to content

Commit

Permalink
updated logic to back fill blocks that have been missed
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Dec 13, 2023
1 parent 9139faf commit 3d369f3
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
20 changes: 20 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,26 @@ impl Database {
Ok(block_number.map(|(n,)| n as u64))
}

pub async fn get_latest_block_number(
&self,
chain_id: u64,
) -> eyre::Result<u64> {
let (block_number,): (i64,) = sqlx::query_as(
r#"
SELECT block_number
FROM blocks
WHERE chain_id = $1
ORDER BY block_number DESC
LIMIT 1
"#,
)
.bind(chain_id as i64)
.fetch_one(&self.pool)
.await?;

Ok(block_number as u64)
}

pub async fn get_latest_block_fees_by_chain_id(
&self,
chain_id: u64,
Expand Down
106 changes: 68 additions & 38 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use chrono::{DateTime, Utc};
use ethers::providers::{Http, Middleware, Provider};
use ethers::types::BlockNumber;
use ethers::types::{Block, BlockNumber, H256};
use eyre::{Context, ContextCompat};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand All @@ -26,53 +26,83 @@ pub async fn index_chain(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {

let mut blocks_stream = ws_rpc.subscribe_blocks().await?;

let next_block_number =
app.db.get_latest_block_number(chain_id).await? + 1;

if let Some(latest_block) = blocks_stream.next().await {
let latest_block_number = latest_block
.number
.context("Missing block number")?
.as_u64();

if latest_block_number > next_block_number {
for block_number in next_block_number..=latest_block_number {
let block = rpc
.get_block::<BlockNumber>(block_number.into())
.await?
.context(format!(
"Could not get block at height {}",
block_number
))?;

index_block(app.clone(), chain_id, &rpc, block).await?;
}
}
}

while let Some(block) = blocks_stream.next().await {
let block_number =
block.number.context("Missing block number")?.as_u64();
index_block(app.clone(), chain_id, &rpc, block).await?;
}
}
}

tracing::info!(block_number, "Indexing block");
pub async fn index_block(
app: Arc<App>,
chain_id: u64,
rpc: &Provider<Http>,
block: Block<H256>,
) -> eyre::Result<()> {
let block_number = block.number.context("Missing block number")?.as_u64();

let block_timestamp_seconds = block.timestamp.as_u64();
let block_timestamp = DateTime::<Utc>::from_timestamp(
block_timestamp_seconds as i64,
0,
)
.context("Invalid timestamp")?;
tracing::info!(block_number, "Indexing block");

let block = rpc
.get_block(block_number)
.await?
.context("Missing block")?;
let block_timestamp_seconds = block.timestamp.as_u64();
let block_timestamp =
DateTime::<Utc>::from_timestamp(block_timestamp_seconds as i64, 0)
.context("Invalid timestamp")?;

app.db
.save_block(
block.number.unwrap().as_u64(),
chain_id,
block_timestamp,
&block.transactions,
)
.await?;
let block = rpc
.get_block(block_number)
.await?
.context("Missing block")?;

app.db
.save_block(
block.number.unwrap().as_u64(),
chain_id,
block_timestamp,
&block.transactions,
)
.await?;

let mined_txs = app.db.mine_txs(chain_id).await?;
let mined_txs = app.db.mine_txs(chain_id).await?;

let metric_labels = [("chain_id", chain_id.to_string())];
for tx in mined_txs {
tracing::info!(
id = tx.0,
hash = ?tx.1,
"Tx mined"
);
let metric_labels: [(&str, String); 1] =
[("chain_id", chain_id.to_string())];
for tx in mined_txs {
tracing::info!(
id = tx.0,
hash = ?tx.1,
"Tx mined"
);

metrics::increment_counter!("tx_mined", &metric_labels);
}
metrics::increment_counter!("tx_mined", &metric_labels);
}

let relayer_addresses =
app.db.get_relayer_addresses(chain_id).await?;
let relayer_addresses = app.db.get_relayer_addresses(chain_id).await?;

update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id)
.await?;
}
}
update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id).await?;
Ok(())
}

pub async fn estimate_gas(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
Expand Down

0 comments on commit 3d369f3

Please sign in to comment.