From d7e26adb0eae3c8703ab4cc1a57b8118e6c4b917 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Fri, 27 Sep 2024 14:41:35 +0530 Subject: [PATCH 1/8] refactor(torii): handle normal and legacy erc contract type at runtime --- crates/torii/core/src/engine.rs | 59 +++++++++++++++++++++----------- crates/torii/core/src/sql/mod.rs | 6 +--- crates/torii/core/src/types.rs | 6 ---- crates/torii/libp2p/src/tests.rs | 8 +++-- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 5d3d55f868..1b81106cc1 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -39,12 +39,14 @@ use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; use crate::sql::{Cursors, Sql}; use crate::types::ContractType; +type EventProcessorMap

= HashMap>>>; + #[allow(missing_debug_implementations)] pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, pub catch_all_event: Box>, - pub event_processors: HashMap>>>, + pub event_processors: HashMap>, } impl Default for Processors

{ @@ -59,10 +61,8 @@ impl Default for Processo } impl Processors

{ - pub fn initialize_event_processors( - ) -> HashMap>>> { - let mut event_processors_map = - HashMap::>>>::new(); + pub fn initialize_event_processors() -> HashMap> { + let mut event_processors_map = HashMap::>::new(); let event_processors = vec![ ( @@ -78,19 +78,17 @@ impl Processors

{ ), ( ContractType::ERC20, - vec![Box::new(Erc20TransferProcessor) as Box>], + vec![ + Box::new(Erc20TransferProcessor) as Box>, + Box::new(Erc20LegacyTransferProcessor) as Box>, + ], ), ( ContractType::ERC721, - vec![Box::new(Erc721TransferProcessor) as Box>], - ), - ( - ContractType::ERC20Legacy, - vec![Box::new(Erc20LegacyTransferProcessor) as Box>], - ), - ( - ContractType::ERC721Legacy, - vec![Box::new(Erc721LegacyTransferProcessor) as Box>], + vec![ + Box::new(Erc721TransferProcessor) as Box>, + Box::new(Erc721LegacyTransferProcessor) as Box>, + ], ), ]; @@ -98,7 +96,13 @@ impl Processors

{ for processor in processors { let key = get_selector_from_name(processor.event_key().as_str()) .expect("Event key is ASCII so this should never fail"); - event_processors_map.entry(contract_type).or_default().insert(key, processor); + // event_processors_map.entry(contract_type).or_default().insert(key, processor); + event_processors_map + .entry(contract_type) + .or_default() + .entry(key) + .or_default() + .push(processor); } } @@ -108,7 +112,7 @@ impl Processors

{ pub fn get_event_processor( &self, contract_type: ContractType, - ) -> &HashMap>> { + ) -> &HashMap>>> { self.event_processors.get(&contract_type).unwrap() } } @@ -562,7 +566,10 @@ impl Engine

{ let mut local_db = db.clone(); for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events { let contract_processors = processors.get_event_processor(contract_type); - if let Some(processor) = contract_processors.get(&event.keys[0]) { + if let Some(processors) = contract_processors.get(&event.keys[0]) { + + let processor = processors.iter().find(|p| p.validate(&event)).expect("Must find atleast one processor for the event"); + debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); if let Err(e) = processor @@ -732,13 +739,20 @@ impl Engine

{ contract_type: ContractType, ) -> Result<()> { if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { - self.db.store_event(event_id, event, transaction_hash, block_timestamp); + match contract_type { + ContractType::WORLD => { + self.db.store_event(event_id, event, transaction_hash, block_timestamp); + } + // ERC events needs to be processed inside there respective processor + // we store transfer events for ERC contracts regardless of this flag + ContractType::ERC20 | ContractType::ERC721 => {} + } } let event_key = event.keys[0]; let processors = self.processors.get_event_processor(contract_type); - let Some(processor) = processors.get(&event_key) else { + let Some(processors) = processors.get(&event_key) else { // if we dont have a processor for this event, we try the catch all processor if self.processors.catch_all_event.validate(event) { if let Err(e) = self @@ -773,6 +787,11 @@ impl Engine

{ return Ok(()); }; + let processor = processors + .iter() + .find(|p| p.validate(event)) + .expect("Must find atleast one processor for the event"); + let task_identifier = match processor.event_key().as_str() { "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { let mut hasher = DefaultHasher::new(); diff --git a/crates/torii/core/src/sql/mod.rs b/crates/torii/core/src/sql/mod.rs index 2554c1cc64..ec25c3bf27 100644 --- a/crates/torii/core/src/sql/mod.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -798,11 +798,7 @@ impl Sql { Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { - t.is_empty() - } else { - false - } + if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } }, ) { return; diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index e3c814b9a2..f5716d979f 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -112,9 +112,7 @@ pub struct Contract { pub enum ContractType { WORLD, ERC20, - ERC20Legacy, ERC721, - ERC721Legacy, } impl FromStr for ContractType { @@ -125,8 +123,6 @@ impl FromStr for ContractType { "world" => Ok(ContractType::WORLD), "erc20" => Ok(ContractType::ERC20), "erc721" => Ok(ContractType::ERC721), - "erc20legacy" | "erc20_legacy" => Ok(ContractType::ERC20Legacy), - "erc721legacy" | "erc721_legacy" => Ok(ContractType::ERC721Legacy), _ => Err(anyhow::anyhow!("Invalid ERC type: {}", input)), } } @@ -138,8 +134,6 @@ impl std::fmt::Display for ContractType { ContractType::WORLD => write!(f, "WORLD"), ContractType::ERC20 => write!(f, "ERC20"), ContractType::ERC721 => write!(f, "ERC721"), - ContractType::ERC20Legacy => write!(f, "ERC20Legacy"), - ContractType::ERC721Legacy => write!(f, "ERC721Legacy"), } } } diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index ee7ad0a525..d81355491a 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -1,5 +1,6 @@ #[cfg(test)] mod test { + use std::collections::HashMap; use std::error::Error; use crate::client::RelayClient; @@ -20,7 +21,7 @@ mod test { use starknet::core::types::Felt; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::Sql; - use torii_core::types::EventMessage; + use torii_core::types::{ContractType, EventMessage}; #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::*; @@ -705,7 +706,10 @@ mod test { let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); let mut broker = SimpleBroker::::subscribe(); let entity = Ty::Struct(Struct { name: "Message".to_string(), children: vec![] }); From a6d0b07eb7dda8e45a50e6aadb27218d6757b3c1 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Sat, 28 Sep 2024 02:04:59 +0530 Subject: [PATCH 2/8] refactor(torii): use inmemory cache to speedup erc indexing --- crates/torii/core/src/engine.rs | 8 +- .../src/processors/erc20_legacy_transfer.rs | 16 +- .../core/src/processors/erc20_transfer.rs | 16 +- .../src/processors/erc721_legacy_transfer.rs | 5 + .../core/src/processors/erc721_transfer.rs | 5 + .../core/src/processors/event_message.rs | 4 + .../core/src/processors/metadata_update.rs | 3 + crates/torii/core/src/processors/mod.rs | 4 + .../core/src/processors/register_model.rs | 4 + .../core/src/processors/store_del_record.rs | 4 + .../core/src/processors/store_set_record.rs | 5 +- .../src/processors/store_update_member.rs | 4 + .../src/processors/store_update_record.rs | 4 + crates/torii/core/src/sql/erc.rs | 258 +++++++++--------- crates/torii/core/src/sql/utils.rs | 75 +++++ .../migrations/20240913104418_add_erc.sql | 2 +- scripts/compare-torii-data.py | 31 ++- 17 files changed, 313 insertions(+), 135 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 1b81106cc1..819c25efcf 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -36,6 +36,7 @@ use crate::processors::store_set_record::StoreSetRecordProcessor; use crate::processors::store_update_member::StoreUpdateMemberProcessor; use crate::processors::store_update_record::StoreUpdateRecordProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; +use crate::sql::utils::I256; use crate::sql::{Cursors, Sql}; use crate::types::ContractType; @@ -192,6 +193,7 @@ pub struct Engine { block_tx: Option>, tasks: HashMap>, contracts: Arc>, + cache: HashMap, } struct UnprocessedEvent { @@ -221,6 +223,7 @@ impl Engine

{ block_tx, contracts, tasks: HashMap::new(), + cache: HashMap::new(), } } @@ -455,6 +458,7 @@ impl Engine

{ } FetchDataResult::None => {} } + self.db.apply_cache_diff(&mut self.cache).await?; Ok(()) } @@ -573,7 +577,7 @@ impl Engine

{ debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); if let Err(e) = processor - .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) + .process(&world, &mut local_db, None, block_number, block_timestamp, &event_id, &event) .await { error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); @@ -761,6 +765,7 @@ impl Engine

{ .process( &self.world, &mut self.db, + None, block_number, block_timestamp, event_id, @@ -820,6 +825,7 @@ impl Engine

{ .process( &self.world, &mut self.db, + Some(&mut self.cache), block_number, block_timestamp, event_id, diff --git a/crates/torii/core/src/processors/erc20_legacy_transfer.rs b/crates/torii/core/src/processors/erc20_legacy_transfer.rs index 4cef0dc19d..6782ea595b 100644 --- a/crates/torii/core/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/core/src/processors/erc20_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,7 @@ use starknet::providers::Provider; use tracing::debug; use super::EventProcessor; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_legacy_transfer"; @@ -38,6 +41,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, + cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, _event_id: &str, @@ -50,8 +54,16 @@ where let value = U256Cainome::cairo_deserialize(&event.data, 2)?; let value = U256::from_words(value.low, value.high); - db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) - .await?; + db.handle_erc20_transfer( + token_address, + from, + to, + value, + world.provider(), + block_timestamp, + cache.expect("cache is required"), + ) + .await?; debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer"); Ok(()) diff --git a/crates/torii/core/src/processors/erc20_transfer.rs b/crates/torii/core/src/processors/erc20_transfer.rs index 10022d9eb0..c2a459a43c 100644 --- a/crates/torii/core/src/processors/erc20_transfer.rs +++ b/crates/torii/core/src/processors/erc20_transfer.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,7 @@ use starknet::providers::Provider; use tracing::debug; use super::EventProcessor; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_transfer"; @@ -38,6 +41,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, + cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, _event_id: &str, @@ -50,8 +54,16 @@ where let value = U256Cainome::cairo_deserialize(&event.data, 0)?; let value = U256::from_words(value.low, value.high); - db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) - .await?; + db.handle_erc20_transfer( + token_address, + from, + to, + value, + world.provider(), + block_timestamp, + cache.expect("cache is required"), + ) + .await?; debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer"); Ok(()) diff --git a/crates/torii/core/src/processors/erc721_legacy_transfer.rs b/crates/torii/core/src/processors/erc721_legacy_transfer.rs index 89a88f04a3..7908d74f98 100644 --- a/crates/torii/core/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/core/src/processors/erc721_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,7 @@ use starknet::providers::Provider; use tracing::debug; use super::EventProcessor; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_legacy_transfer"; @@ -38,6 +41,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, + cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, _event_id: &str, @@ -57,6 +61,7 @@ where token_id, world.provider(), block_timestamp, + cache.expect("cache is required"), ) .await?; debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); diff --git a/crates/torii/core/src/processors/erc721_transfer.rs b/crates/torii/core/src/processors/erc721_transfer.rs index 319ea81833..4ac2813be2 100644 --- a/crates/torii/core/src/processors/erc721_transfer.rs +++ b/crates/torii/core/src/processors/erc721_transfer.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,7 @@ use starknet::providers::Provider; use tracing::debug; use super::EventProcessor; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_transfer"; @@ -38,6 +41,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, + cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, _event_id: &str, @@ -57,6 +61,7 @@ where token_id, world.provider(), block_timestamp, + cache.expect("cache is required"), ) .await?; debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); diff --git a/crates/torii/core/src/processors/event_message.rs b/crates/torii/core/src/processors/event_message.rs index e2044cbe1a..9d3757486f 100644 --- a/crates/torii/core/src/processors/event_message.rs +++ b/crates/torii/core/src/processors/event_message.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::world::WorldContractReader; @@ -7,6 +9,7 @@ use tracing::info; use super::EventProcessor; use crate::processors::MODEL_INDEX; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::event_message"; @@ -39,6 +42,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, + _cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, event_id: &str, diff --git a/crates/torii/core/src/processors/metadata_update.rs b/crates/torii/core/src/processors/metadata_update.rs index 594a32898a..cea4305141 100644 --- a/crates/torii/core/src/processors/metadata_update.rs +++ b/crates/torii/core/src/processors/metadata_update.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::time::Duration; use anyhow::{Error, Result}; @@ -15,6 +16,7 @@ use tokio_util::bytes::Bytes; use tracing::{error, info}; use super::EventProcessor; +use crate::sql::utils::I256; use crate::sql::Sql; const IPFS_URL: &str = "https://cartridge.infura-ipfs.io/ipfs/"; @@ -51,6 +53,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, + _cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, _event_id: &str, diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index cf25f36ca6..98a609c7cb 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -1,9 +1,12 @@ +use std::collections::HashMap; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, Felt, Transaction}; use starknet::providers::Provider; +use crate::sql::utils::I256; use crate::sql::Sql; pub mod erc20_legacy_transfer; @@ -41,6 +44,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, + cache: Option<&mut HashMap>, block_number: u64, block_timestamp: u64, event_id: &str, diff --git a/crates/torii/core/src/processors/register_model.rs b/crates/torii/core/src/processors/register_model.rs index 369357a243..da4e174fa0 100644 --- a/crates/torii/core/src/processors/register_model.rs +++ b/crates/torii/core/src/processors/register_model.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use cainome::cairo_serde::{ByteArray, CairoSerde}; @@ -8,6 +10,7 @@ use starknet::providers::Provider; use tracing::{debug, info}; use super::EventProcessor; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::register_model"; @@ -41,6 +44,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, + _cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, _event_id: &str, diff --git a/crates/torii/core/src/processors/store_del_record.rs b/crates/torii/core/src/processors/store_del_record.rs index 2226d4c92f..090085dea3 100644 --- a/crates/torii/core/src/processors/store_del_record.rs +++ b/crates/torii/core/src/processors/store_del_record.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::world::WorldContractReader; @@ -7,6 +9,7 @@ use tracing::info; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX}; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_del_record"; @@ -40,6 +43,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, + _cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, event_id: &str, diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index fa1351b156..23385ceaea 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Context, Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::world::WorldContractReader; @@ -8,7 +10,7 @@ use tracing::info; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX, NUM_KEYS_INDEX}; -use crate::sql::utils::felts_to_sql_string; +use crate::sql::utils::{felts_to_sql_string, I256}; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_set_record"; @@ -42,6 +44,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, + _cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, event_id: &str, diff --git a/crates/torii/core/src/processors/store_update_member.rs b/crates/torii/core/src/processors/store_update_member.rs index 567e9e18d0..9a17139951 100644 --- a/crates/torii/core/src/processors/store_update_member.rs +++ b/crates/torii/core/src/processors/store_update_member.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Context, Error, Result}; use async_trait::async_trait; use dojo_types::schema::{Struct, Ty}; @@ -11,6 +13,7 @@ use tracing::{info, warn}; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX}; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_update_member"; @@ -46,6 +49,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, + _cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, event_id: &str, diff --git a/crates/torii/core/src/processors/store_update_record.rs b/crates/torii/core/src/processors/store_update_record.rs index 374e6a5189..b30ac02573 100644 --- a/crates/torii/core/src/processors/store_update_record.rs +++ b/crates/torii/core/src/processors/store_update_record.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Context, Error, Ok, Result}; use async_trait::async_trait; use dojo_types::schema::Ty; @@ -9,6 +11,7 @@ use tracing::info; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX}; +use crate::sql::utils::I256; use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_update_record"; @@ -42,6 +45,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, + _cache: Option<&mut HashMap>, _block_number: u64, block_timestamp: u64, event_id: &str, diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index f856c2f21a..0554390f0f 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -1,4 +1,4 @@ -use std::ops::{Add, Sub}; +use std::collections::HashMap; use anyhow::Result; use cainome::cairo_serde::{ByteArray, CairoSerde}; @@ -8,12 +8,13 @@ use starknet::providers::Provider; use tracing::debug; use super::query_queue::{Argument, QueryType}; -use super::utils::{sql_string_to_u256, u256_to_sql_string}; +use super::utils::{sql_string_to_u256, u256_to_sql_string, I256}; use super::{Sql, FELT_DELIMITER}; use crate::sql::utils::{felt_and_u256_to_sql_string, felt_to_sql_string, felts_to_sql_string}; use crate::utils::utc_dt_string_from_timestamp; impl Sql { + #[allow(clippy::too_many_arguments)] pub async fn handle_erc20_transfer( &mut self, contract_address: Felt, @@ -22,8 +23,9 @@ impl Sql { amount: U256, provider: &P, block_timestamp: u64, + cache: &mut HashMap, ) -> Result<()> { - // unique token identifier in DB + // contract_address let token_id = felt_to_sql_string(&contract_address); let token_exists: bool = @@ -36,7 +38,7 @@ impl Sql { self.register_erc20_token_metadata(contract_address, &token_id, provider).await?; } - self.register_erc_transfer_event( + self.store_erc_transfer_event( contract_address, from_address, to_address, @@ -45,87 +47,29 @@ impl Sql { block_timestamp, ); - // Update balances in erc20_balance table - { - // NOTE: formatting here should match the format we use for Argument type in QueryQueue - // TODO: abstract this so they cannot mismatch - - // Since balance are stored as TEXT in db, we cannot directly use INSERT OR UPDATE - // statements. - // Fetch balances for both `from` and `to` addresses, update them and write back to db - let query = sqlx::query_as::<_, (String, String)>( - "SELECT account_address, balance FROM balances WHERE contract_address = ? AND \ - account_address IN (?, ?)", - ) - .bind(felt_to_sql_string(&contract_address)) - .bind(felt_to_sql_string(&from_address)) - .bind(felt_to_sql_string(&to_address)); - - // (address, balance) - let balances: Vec<(String, String)> = query.fetch_all(&self.pool).await?; - // (address, balance) is primary key in DB, and we are fetching for 2 addresses so there - // should be at most 2 rows returned - assert!(balances.len() <= 2); - - let from_balance = balances - .iter() - .find(|(address, _)| address == &felt_to_sql_string(&from_address)) - .map(|(_, balance)| balance.clone()) - .unwrap_or_else(|| u256_to_sql_string(&U256::from(0u8))); - - let to_balance = balances - .iter() - .find(|(address, _)| address == &felt_to_sql_string(&to_address)) - .map(|(_, balance)| balance.clone()) - .unwrap_or_else(|| u256_to_sql_string(&U256::from(0u8))); - - let from_balance = sql_string_to_u256(&from_balance); - let to_balance = sql_string_to_u256(&to_balance); - - let new_from_balance = - if from_address != Felt::ZERO { from_balance.sub(amount) } else { from_balance }; - let new_to_balance = - if to_address != Felt::ZERO { to_balance.add(amount) } else { to_balance }; - - let update_query = " - INSERT INTO balances (id, balance, account_address, contract_address, token_id) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (id) - DO UPDATE SET balance = excluded.balance"; - - if from_address != Felt::ZERO { - self.query_queue.enqueue( - update_query, - vec![ - Argument::String(felts_to_sql_string(&[from_address, contract_address])), - Argument::String(u256_to_sql_string(&new_from_balance)), - Argument::FieldElement(from_address), - Argument::FieldElement(contract_address), - Argument::String(token_id.clone()), - ], - QueryType::Other, - ); - } + self.query_queue.execute_all().await?; - if to_address != Felt::ZERO { - self.query_queue.enqueue( - update_query, - vec![ - Argument::String(felts_to_sql_string(&[to_address, contract_address])), - Argument::String(u256_to_sql_string(&new_to_balance)), - Argument::FieldElement(to_address), - Argument::FieldElement(contract_address), - Argument::String(token_id.clone()), - ], - QueryType::Other, - ); - } + if from_address != Felt::ZERO { + // from_address/contract_address/ + let from_balance_id = felts_to_sql_string(&[from_address, contract_address]); + let from_balance = cache.entry(from_balance_id).or_default(); + *from_balance -= I256::from(amount); + } + + if to_address != Felt::ZERO { + let to_balance_id = felts_to_sql_string(&[to_address, contract_address]); + let to_balance = cache.entry(to_balance_id).or_default(); + *to_balance += I256::from(amount); + } + + if cache.len() >= 100000 { + self.apply_cache_diff(cache).await?; } - self.query_queue.execute_all().await?; Ok(()) } + #[allow(clippy::too_many_arguments)] pub async fn handle_erc721_transfer( &mut self, contract_address: Felt, @@ -134,7 +78,9 @@ impl Sql { token_id: U256, provider: &P, block_timestamp: u64, + cache: &mut HashMap, ) -> Result<()> { + // contract_address:id let token_id = felt_and_u256_to_sql_string(&contract_address, &token_id); let token_exists: bool = sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tokens WHERE id = ?)") @@ -146,7 +92,7 @@ impl Sql { self.register_erc721_token_metadata(contract_address, &token_id, provider).await?; } - self.register_erc_transfer_event( + self.store_erc_transfer_event( contract_address, from_address, to_address, @@ -155,51 +101,26 @@ impl Sql { block_timestamp, ); - // Update balances in erc721_balances table - { - let update_query = " - INSERT INTO balances (id, balance, account_address, contract_address, token_id) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (id) - DO UPDATE SET balance = excluded.balance"; - - if from_address != Felt::ZERO { - self.query_queue.enqueue( - update_query, - vec![ - Argument::String(format!( - "{}{FELT_DELIMITER}{}", - felt_to_sql_string(&from_address), - &token_id - )), - Argument::String(u256_to_sql_string(&U256::from(0u8))), - Argument::FieldElement(from_address), - Argument::FieldElement(contract_address), - Argument::String(token_id.clone()), - ], - QueryType::Other, - ); - } + self.query_queue.execute_all().await?; - if to_address != Felt::ZERO { - self.query_queue.enqueue( - update_query, - vec![ - Argument::String(format!( - "{}{FELT_DELIMITER}{}", - felt_to_sql_string(&to_address), - &token_id - )), - Argument::String(u256_to_sql_string(&U256::from(1u8))), - Argument::FieldElement(to_address), - Argument::FieldElement(contract_address), - Argument::String(token_id.clone()), - ], - QueryType::Other, - ); - } + // from_address/contract_address:id + if from_address != Felt::ZERO { + let from_balance_id = + format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id); + let from_balance = cache.entry(from_balance_id).or_default(); + *from_balance -= I256::from(1u8); + } + + if to_address != Felt::ZERO { + let to_balance_id = + format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); + let to_balance = cache.entry(to_balance_id).or_default(); + *to_balance += I256::from(1u8); + } + + if cache.len() >= 100000 { + self.apply_cache_diff(cache).await?; } - self.query_queue.execute_all().await?; Ok(()) } @@ -303,8 +224,8 @@ impl Sql { "Token already registered for contract_address, so reusing fetched data", ); self.query_queue.enqueue( - "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ - ?, ?, ?)", + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, \ + ?, ?, ?, ?)", vec![ Argument::String(token_id.to_string()), Argument::FieldElement(contract_address), @@ -378,7 +299,7 @@ impl Sql { Ok(()) } - fn register_erc_transfer_event( + fn store_erc_transfer_event( &mut self, contract_address: Felt, from: Felt, @@ -403,4 +324,89 @@ impl Sql { QueryType::Other, ); } + + pub async fn apply_cache_diff(&mut self, cache: &mut HashMap) -> Result<()> { + for (id_str, balance) in cache.iter() { + let id = id_str.split(FELT_DELIMITER).collect::>(); + match id.len() { + // account_address/contract_address:id => ERC721 + 2 => { + let account_address = id[0]; + let token_id = id[1]; + let mid = token_id.split(":").collect::>(); + let contract_address = mid[0]; + + self.apply_balance_diff( + id_str, + account_address, + contract_address, + token_id, + balance, + ) + .await?; + } + // account_address/contract_address/ => ERC20 + 3 => { + let account_address = id[0]; + let contract_address = id[1]; + let token_id = id[1]; + + self.apply_balance_diff( + id_str, + account_address, + contract_address, + token_id, + balance, + ) + .await?; + } + _ => unreachable!(), + } + } + + cache.clear(); + Ok(()) + } + + async fn apply_balance_diff( + &self, + id: &str, + account_address: &str, + contract_address: &str, + token_id: &str, + balance_diff: &I256, + ) -> Result<()> { + let balance: Option<(String,)> = + sqlx::query_as("SELECT balance FROM balances WHERE id = ? ") + .bind(id) + .fetch_optional(&self.pool) + .await?; + + let mut balance = if let Some(balance) = balance { + sql_string_to_u256(&balance.0) + } else { + U256::from(0u8) + }; + + if balance_diff.is_negative { + balance -= balance_diff.value; + } else { + balance += balance_diff.value; + } + + // write the new balance to the database + sqlx::query( + "INSERT OR REPLACE INTO balances (id, contract_address, account_address, token_id, balance) \ + VALUES (?, ?, ?, ?, ?)" + ) + .bind(&id) + .bind(contract_address) + .bind(account_address) + .bind(token_id) + .bind(u256_to_sql_string(&balance)) + .execute(&self.pool) + .await?; + + Ok(()) + } } diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs index 26476b0837..7bd365efb5 100644 --- a/crates/torii/core/src/sql/utils.rs +++ b/crates/torii/core/src/sql/utils.rs @@ -1,3 +1,5 @@ +use std::ops::{Add, AddAssign, Sub, SubAssign}; + use starknet::core::types::U256; use starknet_crypto::Felt; @@ -24,3 +26,76 @@ pub fn sql_string_to_u256(sql_string: &str) -> U256 { let sql_string = sql_string.strip_prefix("0x").unwrap_or(sql_string); U256::from(crypto_bigint::U256::from_be_hex(sql_string)) } + +// type used to do calculation on inmemory balances +#[derive(Debug, Clone, Copy)] +pub struct I256 { + pub value: U256, + pub is_negative: bool, +} + +impl Default for I256 { + fn default() -> Self { + Self { value: U256::from(0u8), is_negative: false } + } +} + +impl From for I256 { + fn from(value: U256) -> Self { + Self { value, is_negative: false } + } +} + +impl From for I256 { + fn from(value: u8) -> Self { + Self { value: U256::from(value), is_negative: false } + } +} + +impl Add for I256 { + type Output = I256; + + fn add(self, other: I256) -> I256 { + if self.is_negative == other.is_negative { + // Same sign: add the values and keep the sign + I256 { value: self.value + other.value, is_negative: self.is_negative } + } else { + // Different signs: subtract the smaller value from the larger one + if self.value >= other.value { + I256 { value: self.value - other.value, is_negative: self.is_negative } + } else { + I256 { value: other.value - self.value, is_negative: other.is_negative } + } + } + } +} + +impl Sub for I256 { + type Output = I256; + + fn sub(self, other: I256) -> I256 { + if self.is_negative != other.is_negative { + // Different signs: add the values and keep the sign of self + I256 { value: self.value + other.value, is_negative: self.is_negative } + } else { + // Same sign: subtract the values + if self.value >= other.value { + I256 { value: self.value - other.value, is_negative: self.is_negative } + } else { + I256 { value: other.value - self.value, is_negative: !other.is_negative } + } + } + } +} + +impl AddAssign for I256 { + fn add_assign(&mut self, other: I256) { + *self = *self + other; + } +} + +impl SubAssign for I256 { + fn sub_assign(&mut self, other: I256) { + *self = *self - other; + } +} diff --git a/crates/torii/migrations/20240913104418_add_erc.sql b/crates/torii/migrations/20240913104418_add_erc.sql index aca9f4d817..4366acac27 100644 --- a/crates/torii/migrations/20240913104418_add_erc.sql +++ b/crates/torii/migrations/20240913104418_add_erc.sql @@ -1,5 +1,5 @@ CREATE TABLE balances ( - -- account_address:contract_address:token_id + -- account_address:token_id id TEXT NOT NULL PRIMARY KEY, balance TEXT NOT NULL, account_address TEXT NOT NULL, diff --git a/scripts/compare-torii-data.py b/scripts/compare-torii-data.py index 04798185cd..f997ef81c5 100644 --- a/scripts/compare-torii-data.py +++ b/scripts/compare-torii-data.py @@ -1,4 +1,4 @@ -# This script compares data across 'events', 'entities', and 'transactions' tables between two SQLite databases. +# This script compares data across 'events', 'entities', 'transactions', 'balances', 'tokens', and 'erc_transfers' tables between two SQLite databases. # Helpful to make sure any changes made in torii doesn't affect the resulting data. import sqlite3 @@ -46,6 +46,9 @@ def compare_databases(db_path1, db_path2): events_columns = ["id", "keys", "data", "transaction_hash"] entities_columns = ["id", "keys"] transactions_columns = ["id", "transaction_hash", "sender_address", "calldata", "max_fee", "signature", "nonce", "transaction_type"] + balances_columns = ["id", "balance", "account_address", "contract_address", "token_id"] + tokens_columns = ["id", "contract_address", "name", "symbol", "decimals"] + erc_transfers_columns = ["id", "contract_address", "from_address", "to_address", "amount", "token_id"] # Fetch data from both databases events_data_db1 = fetch_table_data(db_path1, "events", events_columns) @@ -54,6 +57,12 @@ def compare_databases(db_path1, db_path2): entities_data_db2 = fetch_table_data(db_path2, "entities", entities_columns) transactions_data_db1 = fetch_table_data(db_path1, "transactions", transactions_columns) transactions_data_db2 = fetch_table_data(db_path2, "transactions", transactions_columns) + balances_data_db1 = fetch_table_data(db_path1, "balances", balances_columns) + balances_data_db2 = fetch_table_data(db_path2, "balances", balances_columns) + tokens_data_db1 = fetch_table_data(db_path1, "tokens", tokens_columns) + tokens_data_db2 = fetch_table_data(db_path2, "tokens", tokens_columns) + erc_transfers_data_db1 = fetch_table_data(db_path1, "erc_transfers", erc_transfers_columns) + erc_transfers_data_db2 = fetch_table_data(db_path2, "erc_transfers", erc_transfers_columns) # Get row counts from both databases events_count_db1 = get_table_row_count(db_path1, "events") @@ -62,11 +71,20 @@ def compare_databases(db_path1, db_path2): entities_count_db2 = get_table_row_count(db_path2, "entities") transactions_count_db1 = get_table_row_count(db_path1, "transactions") transactions_count_db2 = get_table_row_count(db_path2, "transactions") + balances_count_db1 = get_table_row_count(db_path1, "balances") + balances_count_db2 = get_table_row_count(db_path2, "balances") + tokens_count_db1 = get_table_row_count(db_path1, "tokens") + tokens_count_db2 = get_table_row_count(db_path2, "tokens") + erc_transfers_count_db1 = get_table_row_count(db_path1, "erc_transfers") + erc_transfers_count_db2 = get_table_row_count(db_path2, "erc_transfers") # Print row counts print(f"Number of rows in events table: Database 1 = {events_count_db1}, Database 2 = {events_count_db2}") print(f"Number of rows in entities table: Database 1 = {entities_count_db1}, Database 2 = {entities_count_db2}") print(f"Number of rows in transactions table: Database 1 = {transactions_count_db1}, Database 2 = {transactions_count_db2}") + print(f"Number of rows in balances table: Database 1 = {balances_count_db1}, Database 2 = {balances_count_db2}") + print(f"Number of rows in tokens table: Database 1 = {tokens_count_db1}, Database 2 = {tokens_count_db2}") + print(f"Number of rows in erc_transfers table: Database 1 = {erc_transfers_count_db1}, Database 2 = {erc_transfers_count_db2}") # Compare data print("\nComparing events table:") @@ -78,8 +96,17 @@ def compare_databases(db_path1, db_path2): print("\nComparing transactions table:") compare_data(transactions_data_db1, transactions_data_db2, "transactions") + print("\nComparing balances table:") + compare_data(balances_data_db1, balances_data_db2, "balances") + + print("\nComparing tokens table:") + compare_data(tokens_data_db1, tokens_data_db2, "tokens") + + print("\nComparing erc_transfers table:") + compare_data(erc_transfers_data_db1, erc_transfers_data_db2, "erc_transfers") + if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Compare data in events, entities, and transactions tables between two SQLite databases.") + parser = argparse.ArgumentParser(description="Compare data in events, entities, transactions, balances, tokens, and erc_transfers tables between two SQLite databases.") parser.add_argument("db_path1", help="Path to the first SQLite database") parser.add_argument("db_path2", help="Path to the second SQLite database") args = parser.parse_args() From 52e64220666924551b89b8b1fa11d5944374cc4c Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Sat, 28 Sep 2024 03:09:18 +0530 Subject: [PATCH 3/8] fix lints --- crates/torii/core/src/sql/erc.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index 0554390f0f..17fe456671 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -224,8 +224,8 @@ impl Sql { "Token already registered for contract_address, so reusing fetched data", ); self.query_queue.enqueue( - "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, \ - ?, ?, ?, ?)", + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ + ?, ?, ?)", vec![ Argument::String(token_id.to_string()), Argument::FieldElement(contract_address), @@ -396,10 +396,10 @@ impl Sql { // write the new balance to the database sqlx::query( - "INSERT OR REPLACE INTO balances (id, contract_address, account_address, token_id, balance) \ - VALUES (?, ?, ?, ?, ?)" + "INSERT OR REPLACE INTO balances (id, contract_address, account_address, token_id, \ + balance) VALUES (?, ?, ?, ?, ?)", ) - .bind(&id) + .bind(id) .bind(contract_address) .bind(account_address) .bind(token_id) From 736455e8a5cc35f4171e5de6e4acbacab80c6325 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Sat, 28 Sep 2024 18:55:24 +0530 Subject: [PATCH 4/8] add script to verify balances in torii db by quering rpc --- .gitignore | 1 + Cargo.lock | 12 +++ Cargo.toml | 1 + scripts/verify_db_balances/Cargo.toml | 14 ++++ scripts/verify_db_balances/src/main.rs | 100 +++++++++++++++++++++++++ 5 files changed, 128 insertions(+) create mode 100644 scripts/verify_db_balances/Cargo.toml create mode 100644 scripts/verify_db_balances/src/main.rs diff --git a/.gitignore b/.gitignore index 75b10b8482..4f1f1adc1a 100644 --- a/.gitignore +++ b/.gitignore @@ -21,5 +21,6 @@ justfile spawn-and-move-db types-test-db examples/spawn-and-move/manifests/saya/** +**/*.log artifacts/ diff --git a/Cargo.lock b/Cargo.lock index cd1aae5ccb..2c8a1b9750 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15585,6 +15585,18 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "verify_db_balances" +version = "1.0.0-alpha.13" +dependencies = [ + "num-traits 0.2.19", + "sqlx", + "starknet 0.12.0", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 0e5ed4668d..e6ad8f57da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ members = [ "crates/torii/server", "crates/torii/types-test", "examples/spawn-and-move", + "scripts/verify_db_balances", "xtask/generate-test-db", ] diff --git a/scripts/verify_db_balances/Cargo.toml b/scripts/verify_db_balances/Cargo.toml new file mode 100644 index 0000000000..bc8419ccbd --- /dev/null +++ b/scripts/verify_db_balances/Cargo.toml @@ -0,0 +1,14 @@ +[package] +edition.workspace = true +license.workspace = true +name = "verify_db_balances" +repository.workspace = true +version.workspace = true + +[dependencies] +num-traits = "0.2.19" +sqlx.workspace = true +starknet.workspace = true +tokio = { workspace = true, features = [ "full" ] } +tracing-subscriber.workspace = true +tracing.workspace = true diff --git a/scripts/verify_db_balances/src/main.rs b/scripts/verify_db_balances/src/main.rs new file mode 100644 index 0000000000..dc820921e9 --- /dev/null +++ b/scripts/verify_db_balances/src/main.rs @@ -0,0 +1,100 @@ +use std::str::FromStr; +use std::sync::Arc; + +use num_traits::ToPrimitive; +use sqlx::sqlite::SqlitePool; +use sqlx::Row; +use starknet::core::types::{BlockId, Felt, FunctionCall, U256}; +use starknet::macros::selector; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Provider, Url}; +use tracing::{error, info, Level}; + +async fn get_balance_from_starknet( + account_address: &str, + contract_address: &str, + provider: Arc>, +) -> Result> { + let account_address = Felt::from_str(account_address).unwrap(); + let contract_address = Felt::from_str(contract_address).unwrap(); + + let balance = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: selector!("balanceOf"), + calldata: vec![account_address], + }, + BlockId::Tag(starknet::core::types::BlockTag::Pending), + ) + .await?; + + let balance_low = balance[0].to_u128().unwrap(); + let balance_high = balance[1].to_u128().unwrap(); + + let balance = U256::from_words(balance_low, balance_high); + let balance = format!("{:#064x}", balance); + Ok(balance) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize the logger + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + + let pool = SqlitePool::connect("sqlite:../../torii.db").await?; + + let rows = sqlx::query("SELECT account_address, contract_address, balance FROM balances") + .fetch_all(&pool) + .await?; + + // Create a semaphore to limit concurrent tasks + let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(10)); // Adjust the number as needed + + let mut handles = Vec::new(); + + // print number of balances + info!("Checking {} balances", rows.len()); + + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new( + Url::parse("https://api.cartridge.gg/rpc/starknet-sepolia").unwrap(), + ))); + + // IMPROVEMENT: batch multiple balanceOf calls in same rpc call + for row in rows { + let account_address: String = row.get("account_address"); + let contract_address: String = row.get("contract_address"); + let db_balance: String = row.get("balance"); + let semaphore_clone = semaphore.clone(); + let provider = provider.clone(); + + let handle = tokio::spawn(async move { + let _permit = semaphore_clone.acquire().await.unwrap(); + let starknet_balance = + get_balance_from_starknet(&account_address, &contract_address, provider).await?; + + if db_balance != starknet_balance { + error!( + "Mismatch for account {}: DB balance = {}, Starknet balance = {}", + account_address, db_balance, starknet_balance + ); + } else { + info!( + "Balance matched for account {} and contract {}", + account_address, contract_address + ); + } + Ok::<(), Box>(()) + }); + + handles.push(handle); + } + + // Wait for all tasks to complete + for handle in handles { + handle.await??; + } + + info!("Checked all balances"); + Ok(()) +} From 5617d39c90d187caadbcf30f02da9e76e818045d Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Sat, 28 Sep 2024 22:37:16 +0530 Subject: [PATCH 5/8] fix(torii/graphql): return only actual token_id in graphql response --- crates/torii/graphql/src/object/erc/erc_balance.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/torii/graphql/src/object/erc/erc_balance.rs b/crates/torii/graphql/src/object/erc/erc_balance.rs index 77cc492dc2..2e87a281dd 100644 --- a/crates/torii/graphql/src/object/erc/erc_balance.rs +++ b/crates/torii/graphql/src/object/erc/erc_balance.rs @@ -74,8 +74,8 @@ async fn fetch_erc_balances( for row in rows { let row = BalanceQueryResultRaw::from_row(&row)?; - let balance_value = match row.contract_type.as_str() { - "ERC20" | "Erc20" | "erc20" => { + let balance_value = match row.contract_type.to_lowercase().as_str() { + "erc20" => { let token_metadata = Value::Object(ValueMapping::from([ (Name::new("name"), Value::String(row.name)), (Name::new("symbol"), Value::String(row.symbol)), @@ -91,7 +91,7 @@ async fn fetch_erc_balances( (Name::new("token_metadata"), token_metadata), ])) } - "ERC721" | "Erc721" | "erc721" => { + "erc721" => { // contract_address:token_id let token_id = row.token_id.split(':').collect::>(); assert!(token_id.len() == 2); @@ -100,7 +100,7 @@ async fn fetch_erc_balances( (Name::new("contract_address"), Value::String(row.contract_address.clone())), (Name::new("name"), Value::String(row.name)), (Name::new("symbol"), Value::String(row.symbol)), - (Name::new("token_id"), Value::String(row.token_id)), + (Name::new("token_id"), Value::String(token_id[1].to_string())), (Name::new("decimals"), Value::String(row.decimals.to_string())), ])); From ef6977c9561e85db3d8f28709fd9fcbe440770c9 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Sat, 28 Sep 2024 22:38:22 +0530 Subject: [PATCH 6/8] add debug statement for shouldn't reach state --- crates/torii/core/src/sql/erc.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index 17fe456671..43db0cbdd7 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -377,7 +377,7 @@ impl Sql { balance_diff: &I256, ) -> Result<()> { let balance: Option<(String,)> = - sqlx::query_as("SELECT balance FROM balances WHERE id = ? ") + sqlx::query_as("SELECT balance FROM balances WHERE id = ?") .bind(id) .fetch_optional(&self.pool) .await?; @@ -389,6 +389,9 @@ impl Sql { }; if balance_diff.is_negative { + if balance < balance_diff.value { + dbg!(&balance_diff, balance, id); + } balance -= balance_diff.value; } else { balance += balance_diff.value; From 40bbc641a30c54c8aba0ff89ef22d23c6672b641 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Mon, 30 Sep 2024 11:15:17 +0530 Subject: [PATCH 7/8] add tests for I256 type --- crates/torii/core/src/sql/utils.rs | 182 +++++++++++++++++++++++++++-- 1 file changed, 174 insertions(+), 8 deletions(-) diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs index 7bd365efb5..a8d187605b 100644 --- a/crates/torii/core/src/sql/utils.rs +++ b/crates/torii/core/src/sql/utils.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::ops::{Add, AddAssign, Sub, SubAssign}; use starknet::core::types::U256; @@ -61,10 +62,15 @@ impl Add for I256 { I256 { value: self.value + other.value, is_negative: self.is_negative } } else { // Different signs: subtract the smaller value from the larger one - if self.value >= other.value { - I256 { value: self.value - other.value, is_negative: self.is_negative } - } else { - I256 { value: other.value - self.value, is_negative: other.is_negative } + match self.value.cmp(&other.value) { + Ordering::Greater => { + I256 { value: self.value - other.value, is_negative: self.is_negative } + } + Ordering::Less => { + I256 { value: other.value - self.value, is_negative: other.is_negative } + } + // If both values are equal, the result is zero and not negative + Ordering::Equal => I256 { value: U256::from(0u8), is_negative: false }, } } } @@ -79,10 +85,15 @@ impl Sub for I256 { I256 { value: self.value + other.value, is_negative: self.is_negative } } else { // Same sign: subtract the values - if self.value >= other.value { - I256 { value: self.value - other.value, is_negative: self.is_negative } - } else { - I256 { value: other.value - self.value, is_negative: !other.is_negative } + match self.value.cmp(&other.value) { + Ordering::Greater => { + I256 { value: self.value - other.value, is_negative: self.is_negative } + } + Ordering::Less => { + I256 { value: other.value - self.value, is_negative: !self.is_negative } + } + // If both values are equal, the result is zero and not negative + Ordering::Equal => I256 { value: U256::from(0u8), is_negative: false }, } } } @@ -99,3 +110,158 @@ impl SubAssign for I256 { *self = *self - other; } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_add_zero_false_and_zero_false() { + // 0,false + 0,false == 0,false + let a = I256::default(); + let b = I256::default(); + let result = a + b; + assert_eq!(result.value, U256::from(0u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_add_zero_true_and_zero_false() { + // 0,true + 0,false + let a = I256 { value: U256::from(0u8), is_negative: true }; + let b = I256::default(); + let result = a + b; + assert_eq!(result.value, U256::from(0u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_sub_zero_false_and_zero_false() { + // 0,false - 0,false + let a = I256::default(); + let b = I256::default(); + let result = a - b; + assert_eq!(result.value, U256::from(0u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_sub_zero_true_and_zero_false() { + // 0,true - 0,false + let a = I256 { value: U256::from(0u8), is_negative: true }; + let b = I256::default(); + let result = a - b; + assert_eq!(result.value, U256::from(0u8)); + assert!(result.is_negative); + } + + #[test] + fn test_add_positive_and_negative_equal_values() { + // 5,false + 5,true == 0,false + let a = I256::from(U256::from(5u8)); + let b = I256 { value: U256::from(5u8), is_negative: true }; + let result = a + b; + assert_eq!(result.value, U256::from(0u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_sub_positive_and_negative() { + // 10,false - 5,true == 15,false + let a = I256::from(U256::from(10u8)); + let b = I256 { value: U256::from(5u8), is_negative: true }; + let result = a - b; + assert_eq!(result.value, U256::from(15u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_sub_larger_from_smaller() { + // 5,false - 10,true == 5,true + let a = I256::from(U256::from(5u8)); + let b = I256::from(U256::from(10u8)); + let result = a - b; + assert_eq!(result.value, U256::from(5u8)); + assert!(result.is_negative); + } + + #[test] + fn test_add_mixed_signs() { + // 15,false + 10,true == 5,false + let a = I256::from(U256::from(15u8)); + let b = I256 { value: U256::from(10u8), is_negative: true }; + let result = a + b; + assert_eq!(result.value, U256::from(5u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_sub_mixed_signs() { + // 5,false - 10,true == 15,false + let a = I256::from(U256::from(5u8)); + let b = I256 { value: U256::from(10u8), is_negative: true }; + let result = a - b; + assert_eq!(result.value, U256::from(15u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_add_negative_and_negative() { + // -5,true + -3,true == -8,true + let a = I256 { value: U256::from(5u8), is_negative: true }; + let b = I256 { value: U256::from(3u8), is_negative: true }; + let result = a + b; + assert_eq!(result.value, U256::from(8u8)); + assert!(result.is_negative); + } + + #[test] + fn test_sub_negative_and_negative() { + // -5,true - -3,true == -2,true + let a = I256 { value: U256::from(5u8), is_negative: true }; + let b = I256 { value: U256::from(3u8), is_negative: true }; + let result = a - b; + assert_eq!(result.value, U256::from(2u8)); + assert!(result.is_negative); + } + + #[test] + fn test_subtraction_resulting_zero() { + // 5,false - 5,false == 0,false + let a = I256::from(U256::from(5u8)); + let b = I256::from(U256::from(5u8)); + let result = a - b; + assert_eq!(result.value, U256::from(0u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_subtraction_resulting_zero_negative() { + // 5,true - 5,true == 0,false + let a = I256 { value: U256::from(5u8), is_negative: true }; + let b = I256 { value: U256::from(5u8), is_negative: true }; + let result = a - b; + assert_eq!(result.value, U256::from(0u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_add_negative_and_positive_result_positive() { + // -10,true + 15,false == 5,false + let a = I256 { value: U256::from(10u8), is_negative: true }; + let b = I256::from(U256::from(15u8)); + let result = a + b; + assert_eq!(result.value, U256::from(5u8)); + assert!(!result.is_negative); + } + + #[test] + fn test_add_negative_and_positive_result_negative() { + // -15,true + 5,false == -10,true + let a = I256 { value: U256::from(15u8), is_negative: true }; + let b = I256::from(U256::from(5u8)); + let result = a + b; + assert_eq!(result.value, U256::from(10u8)); + assert!(result.is_negative); + } +} From 9b7f59f670ad0090132dade5bd5bb0f78bacae49 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Mon, 30 Sep 2024 11:42:47 +0530 Subject: [PATCH 8/8] update torii toml file --- bin/torii/torii.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/bin/torii/torii.toml b/bin/torii/torii.toml index a45ecfe10e..93a444170f 100644 --- a/bin/torii/torii.toml +++ b/bin/torii/torii.toml @@ -2,7 +2,5 @@ # contracts = [ # { type = "WORLD", address = "" }, # { type = "ERC20", address = "" }, -# { type = "ERC20Legacy", address = "" }, # { type = "ERC721", address = "" }, -# { type = "ERC721Legacy", address = "" }, # ] \ No newline at end of file