Skip to content

Commit

Permalink
feat(torii): add world block and instrument queries (#2796)
Browse files Browse the repository at this point in the history
* start indexing from fixed block for eternum

* Instrucment fetch entities

* feat: add the world block to start syncing

Currently this commit will break the indexing of tokens
since they need to start from block 0.

* chore: modify eternum config for quick sync

---------

Co-authored-by: tedison <[email protected]>
Co-authored-by: glihm <[email protected]>
  • Loading branch information
3 people authored Dec 12, 2024
1 parent bcd581e commit 35a58b3
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 3 deletions.
1 change: 1 addition & 0 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ async fn main() -> anyhow::Result<()> {
historical_events: args.events.historical.into_iter().collect(),
namespaces: args.indexing.namespaces.into_iter().collect(),
},
world_block: args.indexing.world_block,
},
shutdown_tx.clone(),
Some(block_tx),
Expand Down
18 changes: 18 additions & 0 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@ pub struct IndexingOptions {
)]
#[serde(default)]
pub namespaces: Vec<String>,

/// The block number to start indexing the world from.
///
/// Warning: In the current implementation, this will break the indexing of tokens, if any.
/// Since the tokens require the chain to be indexed from the beginning, to ensure correct
/// balance updates.
#[arg(
long = "indexing.world_block",
help = "The block number to start indexing from.",
default_value_t = 0
)]
#[serde(default)]
pub world_block: u64,
}

impl Default for IndexingOptions {
Expand All @@ -170,6 +183,7 @@ impl Default for IndexingOptions {
polling_interval: DEFAULT_POLLING_INTERVAL,
max_concurrent_tasks: DEFAULT_MAX_CONCURRENT_TASKS,
namespaces: vec![],
world_block: 0,
}
}
}
Expand Down Expand Up @@ -208,6 +222,10 @@ impl IndexingOptions {
if self.namespaces.is_empty() {
self.namespaces = other.namespaces.clone();
}

if self.world_block == 0 {
self.world_block = other.world_block;
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub struct EngineConfig {
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
pub event_processor_config: EventProcessorConfig,
pub world_block: u64,
}

impl Default for EngineConfig {
Expand All @@ -159,6 +160,7 @@ impl Default for EngineConfig {
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
event_processor_config: EventProcessorConfig::default(),
world_block: 0,
}
}
}
Expand Down Expand Up @@ -323,7 +325,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block = self.provider.block_hash_and_number().await?;

let from = cursors.head.unwrap_or(0);
let from = cursors.head.unwrap_or(self.config.world_block);
let total_remaining_blocks = latest_block.block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;
Expand Down
26 changes: 24 additions & 2 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,25 +263,36 @@ impl DojoWorld {
dont_include_hashed_keys: bool,
order_by: Option<&str>,
) -> Result<Vec<proto::types::Entity>, Error> {
tracing::debug!(
"Fetching entities from table {table} with {} entity/model pairs",
entities.len()
);
let start = std::time::Instant::now();

// Group entities by their model combinations
let mut model_groups: HashMap<String, Vec<String>> = HashMap::new();
for (entity_id, models_str) in entities {
model_groups.entry(models_str).or_default().push(entity_id);
}
tracing::debug!("Grouped into {} distinct model combinations", model_groups.len());

let mut all_entities = Vec::new();

let mut tx = self.pool.begin().await?;
tracing::debug!("Started database transaction");

// Create a temporary table to store entity IDs due to them potentially exceeding
// SQLite's parameters limit which is 999
let temp_table_start = std::time::Instant::now();
sqlx::query(
"CREATE TEMPORARY TABLE temp_entity_ids (id TEXT PRIMARY KEY, model_group TEXT)",
)
.execute(&mut *tx)
.await?;
tracing::debug!("Created temporary table in {:?}", temp_table_start.elapsed());

// Insert all entity IDs into the temporary table
let insert_start = std::time::Instant::now();
for (model_ids, entity_ids) in &model_groups {
for chunk in entity_ids.chunks(999) {
let placeholders = chunk.iter().map(|_| "(?, ?)").collect::<Vec<_>>().join(",");
Expand All @@ -296,8 +307,14 @@ impl DojoWorld {
query.execute(&mut *tx).await?;
}
}
tracing::debug!(
"Inserted all entity IDs into temporary table in {:?}",
insert_start.elapsed()
);

for (models_str, _) in model_groups {
let query_start = std::time::Instant::now();
for (models_str, entity_ids) in &model_groups {
tracing::debug!("Processing model group with {} entities", entity_ids.len());
let model_ids =
models_str.split(',').map(|id| Felt::from_str(id).unwrap()).collect::<Vec<_>>();
let schemas =
Expand All @@ -315,7 +332,7 @@ impl DojoWorld {
None,
)?;

let rows = sqlx::query(&entity_query).bind(&models_str).fetch_all(&mut *tx).await?;
let rows = sqlx::query(&entity_query).bind(models_str).fetch_all(&mut *tx).await?;
let schemas = Arc::new(schemas);

let group_entities: Result<Vec<_>, Error> = rows
Expand All @@ -325,10 +342,15 @@ impl DojoWorld {

all_entities.extend(group_entities?);
}
tracing::debug!("Processed all model groups in {:?}", query_start.elapsed());

sqlx::query("DROP TABLE temp_entity_ids").execute(&mut *tx).await?;
tracing::debug!("Dropped temporary table");

tx.commit().await?;
tracing::debug!("Committed transaction");

tracing::debug!("Total fetch_entities operation took {:?}", start.elapsed());

Ok(all_entities)
}
Expand Down
14 changes: 14 additions & 0 deletions eternum.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
world_address = "0x6a9e4c6f0799160ea8ddc43ff982a5f83d7f633e9732ce42701de1288ff705f"
rpc = "https://api.cartridge.gg/x/starknet/mainnet"
explorer = false

[indexing]
world_block = 947500
events_chunk_size = 1024
blocks_chunk_size = 1024000
pending = true
polling_interval = 500
max_concurrent_tasks = 100
transactions = false
contracts = ["ERC721:0x57675b9c0bd62b096a2e15502a37b290fa766ead21c33eda42993e48a714b80"]
namespaces = []

0 comments on commit 35a58b3

Please sign in to comment.