Skip to content

Commit

Permalink
opt(torii): fetch receipts along with blocks instead of fetching them…
Browse files Browse the repository at this point in the history
… individually

commit-id:b6db4cb5
  • Loading branch information
lambda-0x committed Sep 5, 2024
1 parent c62e02b commit e4381c6
Showing 1 changed file with 20 additions and 27 deletions.
47 changes: 20 additions & 27 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes,
MaybePendingBlockWithTxs, PendingBlockWithTxs, ReceiptBlock, TransactionReceipt,
TransactionReceiptWithBlockInfo,
BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt,
TransactionReceiptWithBlockInfo, TransactionWithReceipt,
};
use starknet::providers::Provider;
use tokio::sync::broadcast::Sender;
Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct FetchRangeResult {

#[derive(Debug)]
pub struct FetchPendingResult {
pub pending_block: Box<PendingBlockWithTxs>,
pub pending_block: Box<PendingBlockWithReceipts>,
pub pending_block_tx: Option<Felt>,
pub block_number: u64,
}
Expand Down Expand Up @@ -303,8 +303,8 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
block_number: u64,
pending_block_tx: Option<Felt>,
) -> Result<Option<FetchPendingResult>> {
let block = if let MaybePendingBlockWithTxs::PendingBlock(pending) =
self.provider.get_block_with_txs(BlockId::Tag(BlockTag::Pending)).await?
let block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) =
self.provider.get_block_with_receipts(BlockId::Tag(BlockTag::Pending)).await?
{
pending
} else {
Expand Down Expand Up @@ -341,47 +341,41 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
let mut pending_block_tx_cursor = data.pending_block_tx;
let mut pending_block_tx = data.pending_block_tx;
let timestamp = data.pending_block.timestamp;
for transaction in data.pending_block.transactions {
for t in data.pending_block.transactions {
let transaction_hash = t.transaction.transaction_hash();
if let Some(tx) = pending_block_tx_cursor {
if transaction.transaction_hash() != &tx {
if transaction_hash != &tx {
continue;
}

pending_block_tx_cursor = None;
continue;
}

match self
.process_transaction_with_receipt(
*transaction.transaction_hash(),
data.block_number,
timestamp,
)
.await
{
match self.process_transaction_with_receipt(&t, data.block_number, timestamp).await {
Err(e) => {
match e.to_string().as_str() {
"TransactionHashNotFound" => {
// We failed to fetch the transaction, which is because
// the transaction might not have been processed fast enough by the
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Retrieving pending transaction receipt.");
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt.");
self.db.set_head(data.block_number - 1, pending_block_tx);
return Ok(());
}
_ => {
error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processing pending transaction.");
error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction.");
return Err(e);
}
}
}
Ok(true) => {
pending_block_tx = Some(*transaction.transaction_hash());
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending world transaction.");
pending_block_tx = Some(*transaction_hash);
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction.");
}
Ok(_) => {
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending transaction.")
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.")
}
}
}
Expand Down Expand Up @@ -488,13 +482,12 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
// Returns whether the transaction has a world event.
async fn process_transaction_with_receipt(
&mut self,
transaction_hash: Felt,
transaction_with_receipt: &TransactionWithReceipt,
block_number: u64,
block_timestamp: u64,
) -> Result<bool> {
let receipt = self.provider.get_transaction_receipt(transaction_hash).await?;

let events = match &receipt.receipt {
let transaction_hash = transaction_with_receipt.transaction.transaction_hash();
let events = match &transaction_with_receipt.receipt {
TransactionReceipt::Invoke(receipt) => Some(&receipt.events),
TransactionReceipt::L1Handler(receipt) => Some(&receipt.events),
_ => None,
Expand All @@ -509,15 +502,15 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

world_event = true;
let event_id =
format!("{:#064x}:{:#x}:{:#04x}", block_number, transaction_hash, event_idx);
format!("{:#064x}:{:#x}:{:#04x}", block_number, *transaction_hash, event_idx);

Self::process_event(
self,
block_number,
block_timestamp,
&event_id,
event,
transaction_hash,
*transaction_hash,
)
.await?;
}
Expand Down

0 comments on commit e4381c6

Please sign in to comment.