Skip to content

Commit

Permalink
opt(torii): avoid re-processing of transactions in certain case
Browse files Browse the repository at this point in the history
fix: #2355

commit-id:a510b985
  • Loading branch information
lambda-0x committed Sep 5, 2024
1 parent e4381c6 commit c5665d9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 34 deletions.
63 changes: 40 additions & 23 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub struct FetchRangeResult {
#[derive(Debug)]
pub struct FetchPendingResult {
pub pending_block: Box<PendingBlockWithReceipts>,
pub pending_block_tx: Option<Felt>,
pub last_pending_block_tx: Option<Felt>,
pub block_number: u64,
}

Expand Down Expand Up @@ -113,9 +113,14 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
}

pub async fn start(&mut self) -> Result<()> {
let (head, pending_block_tx) = self.db.head().await?;
// use the start block provided by user if head is 0
let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?;
if head == 0 {
self.db.set_head(head, pending_block_tx);
self.db.set_head(
self.config.start_block,
last_pending_block_world_tx,
last_pending_block_tx,
);
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}
Expand All @@ -127,12 +132,12 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

let mut erroring_out = false;
loop {
let (head, pending_block_tx) = self.db.head().await?;
let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?;
tokio::select! {
_ = shutdown_rx.recv() => {
break Ok(());
}
res = self.fetch_data(head, pending_block_tx) => {
res = self.fetch_data(head, last_pending_block_world_tx, last_pending_block_tx) => {
match res {
Ok(fetch_result) => {
if erroring_out {
Expand Down Expand Up @@ -171,17 +176,19 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
pub async fn fetch_data(
&mut self,
from: u64,
pending_block_tx: Option<Felt>,
last_pending_block_world_tx: Option<Felt>,
last_pending_block_tx: Option<Felt>,
) -> Result<FetchDataResult> {
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;

let result = if from < latest_block_number {
let from = if from == 0 { from } else { from + 1 };
debug!(target: LOG_TARGET, from = %from, to = %latest_block_number, "Fetching data for range.");
let data = self.fetch_range(from, latest_block_number, pending_block_tx).await?;
let data =
self.fetch_range(from, latest_block_number, last_pending_block_world_tx).await?;
FetchDataResult::Range(data)
} else if self.config.index_pending {
let data = self.fetch_pending(latest_block_number + 1, pending_block_tx).await?;
let data = self.fetch_pending(latest_block_number + 1, last_pending_block_tx).await?;
if let Some(data) = data {
FetchDataResult::Pending(data)
} else {
Expand All @@ -198,7 +205,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
&mut self,
from: u64,
to: u64,
pending_block_tx: Option<Felt>,
last_pending_block_world_tx: Option<Felt>,
) -> Result<FetchRangeResult> {
// Process all blocks from current to latest.
let get_events = |token: Option<String>| {
Expand Down Expand Up @@ -229,7 +236,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

// Flatten events pages and events according to the pending block cursor
// to array of (block_number, transaction_hash)
let mut pending_block_tx_cursor = pending_block_tx;
let mut last_pending_block_world_tx_cursor = last_pending_block_world_tx;
let mut transactions = LinkedHashMap::new();
for events_page in events_pages {
debug!("Processing events page with events: {}", &events_page.events.len());
Expand Down Expand Up @@ -269,17 +276,17 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

// Then we skip all transactions until we reach the last pending processed
// transaction (if any)
if let Some(tx) = pending_block_tx_cursor {
if let Some(tx) = last_pending_block_world_tx_cursor {
if event.transaction_hash != tx {
continue;
}

pending_block_tx_cursor = None;
last_pending_block_world_tx_cursor = None;
}

// Skip the latest pending block transaction events
// * as we might have multiple events for the same transaction
if let Some(tx) = pending_block_tx {
if let Some(tx) = last_pending_block_world_tx {
if event.transaction_hash == tx {
continue;
}
Expand All @@ -301,7 +308,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
async fn fetch_pending(
&self,
block_number: u64,
pending_block_tx: Option<Felt>,
last_pending_block_tx: Option<Felt>,
) -> Result<Option<FetchPendingResult>> {
let block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) =
self.provider.get_block_with_receipts(BlockId::Tag(BlockTag::Pending)).await?
Expand All @@ -317,7 +324,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
Ok(Some(FetchPendingResult {
pending_block: Box::new(block),
block_number,
pending_block_tx,
last_pending_block_tx,
}))
}

Expand All @@ -338,17 +345,21 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> {
// Skip transactions that have been processed already
// Our cursor is the last processed transaction
let mut pending_block_tx_cursor = data.pending_block_tx;
let mut pending_block_tx = data.pending_block_tx;

let mut last_pending_block_tx_cursor = data.last_pending_block_tx;
let mut last_pending_block_tx = data.last_pending_block_tx;
let mut last_pending_block_world_tx = None;

let timestamp = data.pending_block.timestamp;

for t in data.pending_block.transactions {
let transaction_hash = t.transaction.transaction_hash();
if let Some(tx) = pending_block_tx_cursor {
if let Some(tx) = last_pending_block_tx_cursor {
if transaction_hash != &tx {
continue;
}

pending_block_tx_cursor = None;
last_pending_block_tx_cursor = None;
continue;
}

Expand All @@ -361,7 +372,11 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt.");
self.db.set_head(data.block_number - 1, pending_block_tx);
self.db.set_head(
data.block_number - 1,
last_pending_block_world_tx,
last_pending_block_tx,
);
return Ok(());
}
_ => {
Expand All @@ -371,18 +386,20 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
}
}
Ok(true) => {
pending_block_tx = Some(*transaction_hash);
last_pending_block_world_tx = Some(*transaction_hash);
last_pending_block_tx = Some(*transaction_hash);
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction.");
}
Ok(_) => {
last_pending_block_tx = Some(*transaction_hash);
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.")
}
}
}

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(data.block_number - 1, pending_block_tx);
self.db.set_head(data.block_number - 1, last_pending_block_world_tx, last_pending_block_tx);

self.db.execute().await?;
Ok(())
Expand Down Expand Up @@ -425,7 +442,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
// so once the sync range is done, we assume all of the tx of the block
// have been processed.

self.db.set_head(data.latest_block_number, None);
self.db.set_head(data.latest_block_number, None, None);
self.db.execute().await?;

Ok(())
Expand Down
36 changes: 25 additions & 11 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,33 +65,47 @@ impl Sql {
})
}

pub async fn head(&self) -> Result<(u64, Option<Felt>)> {
pub async fn head(&self) -> Result<(u64, Option<Felt>, Option<Felt>)> {
let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let indexer_query = sqlx::query_as::<_, (Option<i64>, Option<String>, String)>(
"SELECT head, pending_block_tx, contract_type FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", self.world_address));

let indexer: (Option<i64>, Option<String>, String) =
let indexer_query =
sqlx::query_as::<_, (Option<i64>, Option<String>, Option<String>, String)>(
"SELECT head, last_pending_block_world_tx, last_pending_block_tx, contract_type \
FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", self.world_address));

let indexer: (Option<i64>, Option<String>, Option<String>, String) =
indexer_query.fetch_one(&mut *conn).await?;
Ok((
indexer.0.map(|h| h.try_into().expect("doesn't fit in u64")).unwrap_or(0),
indexer.1.map(|f| Felt::from_str(&f)).transpose()?,
indexer.2.map(|f| Felt::from_str(&f)).transpose()?,
))
}

pub fn set_head(&mut self, head: u64, pending_block_tx: Option<Felt>) {
pub fn set_head(
&mut self,
head: u64,
last_pending_block_world_tx: Option<Felt>,
last_pending_block_tx: Option<Felt>,
) {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::FieldElement(self.world_address);
let pending_block_tx = if let Some(f) = pending_block_tx {
let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx {
Argument::String(format!("{:#x}", f))
} else {
Argument::Null
};
let last_pending_block_tx = if let Some(f) = last_pending_block_tx {
Argument::String(format!("{:#x}", f))
} else {
Argument::Null
};

self.query_queue.enqueue(
"UPDATE contracts SET head = ?, pending_block_tx = ? WHERE id = ?",
vec![head, pending_block_tx, id],
"UPDATE contracts SET head = ?, last_pending_block_world_tx = ?, \
last_pending_block_tx = ? WHERE id = ?",
vec![head, last_pending_block_world_tx, last_pending_block_tx, id],
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Rename pending_block_tx to last_pending_block_world_tx
ALTER TABLE contracts RENAME COLUMN pending_block_tx TO last_pending_block_world_tx;

-- Add new column last_pending_block_tx
ALTER TABLE contracts ADD COLUMN last_pending_block_tx TEXT;

0 comments on commit c5665d9

Please sign in to comment.