diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index f10f6ca3c6..4eafafbe26 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -164,6 +164,7 @@ async fn main() -> anyhow::Result<()> { historical_events: args.events.historical.into_iter().collect(), namespaces: args.indexing.namespaces.into_iter().collect(), }, + world_block: args.indexing.world_block, }, shutdown_tx.clone(), Some(block_tx), diff --git a/crates/torii/cli/src/options.rs b/crates/torii/cli/src/options.rs index eaffd58b2d..9a824beb09 100644 --- a/crates/torii/cli/src/options.rs +++ b/crates/torii/cli/src/options.rs @@ -157,6 +157,19 @@ pub struct IndexingOptions { )] #[serde(default)] pub namespaces: Vec<String>, + + /// The block number to start indexing the world from. + /// + /// Warning: In the current implementation, this will break the indexing of tokens, if any. + /// Since the tokens require the chain to be indexed from the beginning, to ensure correct + /// balance updates. + #[arg( + long = "indexing.world_block", + help = "The block number to start indexing from.", + default_value_t = 0 + )] + #[serde(default)] + pub world_block: u64, } impl Default for IndexingOptions { @@ -170,6 +183,7 @@ impl Default for IndexingOptions { polling_interval: DEFAULT_POLLING_INTERVAL, max_concurrent_tasks: DEFAULT_MAX_CONCURRENT_TASKS, namespaces: vec![], + world_block: 0, } } } @@ -208,6 +222,10 @@ impl IndexingOptions { if self.namespaces.is_empty() { self.namespaces = other.namespaces.clone(); } + + if self.world_block == 0 { + self.world_block = other.world_block; + } } } } diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index a51b593844..f4279e6a5d 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -148,6 +148,7 @@ pub struct EngineConfig { pub max_concurrent_tasks: usize, pub flags: IndexingFlags, pub event_processor_config: EventProcessorConfig, + pub world_block: u64, } impl Default for EngineConfig { @@ -159,6 +160,7 @@ impl Default for EngineConfig { max_concurrent_tasks: 100, flags: IndexingFlags::empty(), event_processor_config: EventProcessorConfig::default(), + world_block: 0, } } } @@ -323,7 +325,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> { let latest_block = self.provider.block_hash_and_number().await?; - let from = cursors.head.unwrap_or(0); + let from = cursors.head.unwrap_or(self.config.world_block); let total_remaining_blocks = latest_block.block_number - from; let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size); let to = from + blocks_to_process; diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index a3def13008..bb04979b83 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -263,25 +263,36 @@ impl DojoWorld { dont_include_hashed_keys: bool, order_by: Option<&str>, ) -> Result<Vec<proto::types::Entity>, Error> { + tracing::debug!( + "Fetching entities from table {table} with {} entity/model pairs", + entities.len() + ); + let start = std::time::Instant::now(); + // Group entities by their model combinations let mut model_groups: HashMap<String, Vec<String>> = HashMap::new(); for (entity_id, models_str) in entities { model_groups.entry(models_str).or_default().push(entity_id); } + tracing::debug!("Grouped into {} distinct model combinations", model_groups.len()); let mut all_entities = Vec::new(); let mut tx = self.pool.begin().await?; + tracing::debug!("Started database transaction"); // Create a temporary table to store entity IDs due to them potentially exceeding // SQLite's parameters limit which is 999 + let temp_table_start = std::time::Instant::now(); sqlx::query( "CREATE TEMPORARY TABLE temp_entity_ids (id TEXT PRIMARY KEY, model_group TEXT)", ) .execute(&mut *tx) .await?; + tracing::debug!("Created temporary table in {:?}", temp_table_start.elapsed()); // Insert all entity IDs into the temporary table + let insert_start = std::time::Instant::now(); for (model_ids, entity_ids) in &model_groups { for chunk in entity_ids.chunks(999) { let placeholders = chunk.iter().map(|_| "(?, ?)").collect::<Vec<_>>().join(","); @@ -296,8 +307,14 @@ impl DojoWorld { query.execute(&mut *tx).await?; } } + tracing::debug!( + "Inserted all entity IDs into temporary table in {:?}", + insert_start.elapsed() + ); - for (models_str, _) in model_groups { + let query_start = std::time::Instant::now(); + for (models_str, entity_ids) in &model_groups { + tracing::debug!("Processing model group with {} entities", entity_ids.len()); let model_ids = models_str.split(',').map(|id| Felt::from_str(id).unwrap()).collect::<Vec<_>>(); let schemas = @@ -315,7 +332,7 @@ impl DojoWorld { None, )?; - let rows = sqlx::query(&entity_query).bind(&models_str).fetch_all(&mut *tx).await?; + let rows = sqlx::query(&entity_query).bind(models_str).fetch_all(&mut *tx).await?; let schemas = Arc::new(schemas); let group_entities: Result<Vec<_>, Error> = rows @@ -325,10 +342,15 @@ impl DojoWorld { all_entities.extend(group_entities?); } + tracing::debug!("Processed all model groups in {:?}", query_start.elapsed()); sqlx::query("DROP TABLE temp_entity_ids").execute(&mut *tx).await?; + tracing::debug!("Dropped temporary table"); tx.commit().await?; + tracing::debug!("Committed transaction"); + + tracing::debug!("Total fetch_entities operation took {:?}", start.elapsed()); Ok(all_entities) } diff --git a/eternum.toml b/eternum.toml new file mode 100644 index 0000000000..cd136378b6 --- /dev/null +++ b/eternum.toml @@ -0,0 +1,14 @@ +world_address = "0x6a9e4c6f0799160ea8ddc43ff982a5f83d7f633e9732ce42701de1288ff705f" +rpc = "https://api.cartridge.gg/x/starknet/mainnet" +explorer = false + +[indexing] +world_block = 947500 +events_chunk_size = 1024 +blocks_chunk_size = 1024000 +pending = true +polling_interval = 500 +max_concurrent_tasks = 100 +transactions = false +contracts = ["ERC721:0x57675b9c0bd62b096a2e15502a37b290fa766ead21c33eda42993e48a714b80"] +namespaces = []