From c5665d9bd7315d2c1f0120d078943d1e517a82ad Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Tue, 3 Sep 2024 17:10:22 +0530 Subject: [PATCH] opt(torii): avoid re-processing of transactions in certain case fix: #2355 commit-id:a510b985 --- crates/torii/core/src/engine.rs | 63 ++++++++++++------- crates/torii/core/src/sql.rs | 36 +++++++---- ...7_add_column_for_non_world_transaction.sql | 5 ++ 3 files changed, 70 insertions(+), 34 deletions(-) create mode 100644 crates/torii/migrations/20240903110847_add_column_for_non_world_transaction.sql diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index e171279b83..cac214c602 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -79,7 +79,7 @@ pub struct FetchRangeResult { #[derive(Debug)] pub struct FetchPendingResult { pub pending_block: Box, - pub pending_block_tx: Option, + pub last_pending_block_tx: Option, pub block_number: u64, } @@ -113,9 +113,14 @@ impl Engine

{ } pub async fn start(&mut self) -> Result<()> { - let (head, pending_block_tx) = self.db.head().await?; + // use the start block provided by user if head is 0 + let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?; if head == 0 { - self.db.set_head(head, pending_block_tx); + self.db.set_head( + self.config.start_block, + last_pending_block_world_tx, + last_pending_block_tx, + ); } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -127,12 +132,12 @@ impl Engine

{ let mut erroring_out = false; loop { - let (head, pending_block_tx) = self.db.head().await?; + let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?; tokio::select! { _ = shutdown_rx.recv() => { break Ok(()); } - res = self.fetch_data(head, pending_block_tx) => { + res = self.fetch_data(head, last_pending_block_world_tx, last_pending_block_tx) => { match res { Ok(fetch_result) => { if erroring_out { @@ -171,17 +176,19 @@ impl Engine

{ pub async fn fetch_data( &mut self, from: u64, - pending_block_tx: Option, + last_pending_block_world_tx: Option, + last_pending_block_tx: Option, ) -> Result { let latest_block_number = self.provider.block_hash_and_number().await?.block_number; let result = if from < latest_block_number { let from = if from == 0 { from } else { from + 1 }; debug!(target: LOG_TARGET, from = %from, to = %latest_block_number, "Fetching data for range."); - let data = self.fetch_range(from, latest_block_number, pending_block_tx).await?; + let data = + self.fetch_range(from, latest_block_number, last_pending_block_world_tx).await?; FetchDataResult::Range(data) } else if self.config.index_pending { - let data = self.fetch_pending(latest_block_number + 1, pending_block_tx).await?; + let data = self.fetch_pending(latest_block_number + 1, last_pending_block_tx).await?; if let Some(data) = data { FetchDataResult::Pending(data) } else { @@ -198,7 +205,7 @@ impl Engine

{ &mut self, from: u64, to: u64, - pending_block_tx: Option, + last_pending_block_world_tx: Option, ) -> Result { // Process all blocks from current to latest. let get_events = |token: Option| { @@ -229,7 +236,7 @@ impl Engine

{ // Flatten events pages and events according to the pending block cursor // to array of (block_number, transaction_hash) - let mut pending_block_tx_cursor = pending_block_tx; + let mut last_pending_block_world_tx_cursor = last_pending_block_world_tx; let mut transactions = LinkedHashMap::new(); for events_page in events_pages { debug!("Processing events page with events: {}", &events_page.events.len()); @@ -269,17 +276,17 @@ impl Engine

{ // Then we skip all transactions until we reach the last pending processed // transaction (if any) - if let Some(tx) = pending_block_tx_cursor { + if let Some(tx) = last_pending_block_world_tx_cursor { if event.transaction_hash != tx { continue; } - pending_block_tx_cursor = None; + last_pending_block_world_tx_cursor = None; } // Skip the latest pending block transaction events // * as we might have multiple events for the same transaction - if let Some(tx) = pending_block_tx { + if let Some(tx) = last_pending_block_world_tx { if event.transaction_hash == tx { continue; } @@ -301,7 +308,7 @@ impl Engine

{ async fn fetch_pending( &self, block_number: u64, - pending_block_tx: Option, + last_pending_block_tx: Option, ) -> Result> { let block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) = self.provider.get_block_with_receipts(BlockId::Tag(BlockTag::Pending)).await? @@ -317,7 +324,7 @@ impl Engine

{ Ok(Some(FetchPendingResult { pending_block: Box::new(block), block_number, - pending_block_tx, + last_pending_block_tx, })) } @@ -338,17 +345,21 @@ impl Engine

{ pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> { // Skip transactions that have been processed already // Our cursor is the last processed transaction - let mut pending_block_tx_cursor = data.pending_block_tx; - let mut pending_block_tx = data.pending_block_tx; + + let mut last_pending_block_tx_cursor = data.last_pending_block_tx; + let mut last_pending_block_tx = data.last_pending_block_tx; + let mut last_pending_block_world_tx = None; + let timestamp = data.pending_block.timestamp; + for t in data.pending_block.transactions { let transaction_hash = t.transaction.transaction_hash(); - if let Some(tx) = pending_block_tx_cursor { + if let Some(tx) = last_pending_block_tx_cursor { if transaction_hash != &tx { continue; } - pending_block_tx_cursor = None; + last_pending_block_tx_cursor = None; continue; } @@ -361,7 +372,11 @@ impl Engine

{ // provider. So we can fail silently and try // again in the next iteration. warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1, pending_block_tx); + self.db.set_head( + data.block_number - 1, + last_pending_block_world_tx, + last_pending_block_tx, + ); return Ok(()); } _ => { @@ -371,10 +386,12 @@ impl Engine

{ } } Ok(true) => { - pending_block_tx = Some(*transaction_hash); + last_pending_block_world_tx = Some(*transaction_hash); + last_pending_block_tx = Some(*transaction_hash); info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction."); } Ok(_) => { + last_pending_block_tx = Some(*transaction_hash); info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.") } } @@ -382,7 +399,7 @@ impl Engine

{ // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1, pending_block_tx); + self.db.set_head(data.block_number - 1, last_pending_block_world_tx, last_pending_block_tx); self.db.execute().await?; Ok(()) @@ -425,7 +442,7 @@ impl Engine

{ // so once the sync range is done, we assume all of the tx of the block // have been processed. - self.db.set_head(data.latest_block_number, None); + self.db.set_head(data.latest_block_number, None, None); self.db.execute().await?; Ok(()) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 7068465865..dea2a7be92 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -65,33 +65,47 @@ impl Sql { }) } - pub async fn head(&self) -> Result<(u64, Option)> { + pub async fn head(&self) -> Result<(u64, Option, Option)> { let mut conn: PoolConnection = self.pool.acquire().await?; - let indexer_query = sqlx::query_as::<_, (Option, Option, String)>( - "SELECT head, pending_block_tx, contract_type FROM contracts WHERE id = ?", - ) - .bind(format!("{:#x}", self.world_address)); - - let indexer: (Option, Option, String) = + let indexer_query = + sqlx::query_as::<_, (Option, Option, Option, String)>( + "SELECT head, last_pending_block_world_tx, last_pending_block_tx, contract_type \ + FROM contracts WHERE id = ?", + ) + .bind(format!("{:#x}", self.world_address)); + + let indexer: (Option, Option, Option, String) = indexer_query.fetch_one(&mut *conn).await?; Ok(( indexer.0.map(|h| h.try_into().expect("doesn't fit in u64")).unwrap_or(0), indexer.1.map(|f| Felt::from_str(&f)).transpose()?, + indexer.2.map(|f| Felt::from_str(&f)).transpose()?, )) } - pub fn set_head(&mut self, head: u64, pending_block_tx: Option) { + pub fn set_head( + &mut self, + head: u64, + last_pending_block_world_tx: Option, + last_pending_block_tx: Option, + ) { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); let id = Argument::FieldElement(self.world_address); - let pending_block_tx = if let Some(f) = pending_block_tx { + let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx { + Argument::String(format!("{:#x}", f)) + } else { + Argument::Null + }; + let last_pending_block_tx = if let Some(f) = last_pending_block_tx { Argument::String(format!("{:#x}", f)) } else { Argument::Null }; self.query_queue.enqueue( - "UPDATE contracts SET head = ?, pending_block_tx = ? WHERE id = ?", - vec![head, pending_block_tx, id], + "UPDATE contracts SET head = ?, last_pending_block_world_tx = ?, \ + last_pending_block_tx = ? WHERE id = ?", + vec![head, last_pending_block_world_tx, last_pending_block_tx, id], ); } diff --git a/crates/torii/migrations/20240903110847_add_column_for_non_world_transaction.sql b/crates/torii/migrations/20240903110847_add_column_for_non_world_transaction.sql new file mode 100644 index 0000000000..924d3aff0c --- /dev/null +++ b/crates/torii/migrations/20240903110847_add_column_for_non_world_transaction.sql @@ -0,0 +1,5 @@ +-- Rename pending_block_tx to last_pending_block_world_tx +ALTER TABLE contracts RENAME COLUMN pending_block_tx TO last_pending_block_world_tx; + +-- Add new column last_pending_block_tx +ALTER TABLE contracts ADD COLUMN last_pending_block_tx TEXT;