Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): add world block and instrument queries #2796

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
historical_events: args.events.historical.into_iter().collect(),
namespaces: args.indexing.namespaces.into_iter().collect(),
},
world_block: args.indexing.world_block,

Check warning on line 167 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L167

Added line #L167 was not covered by tests
},
shutdown_tx.clone(),
Some(block_tx),
Expand Down
18 changes: 18 additions & 0 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@
)]
#[serde(default)]
pub namespaces: Vec<String>,

/// The block number to start indexing the world from.
///
/// Warning: In the current implementation, this will break the indexing of tokens, if any.
/// Since the tokens require the chain to be indexed from the beginning, to ensure correct
/// balance updates.
#[arg(
long = "indexing.world_block",
help = "The block number to start indexing from.",
default_value_t = 0
)]
#[serde(default)]
pub world_block: u64,

Check warning on line 172 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L172

Added line #L172 was not covered by tests
}

impl Default for IndexingOptions {
Expand All @@ -170,6 +183,7 @@
polling_interval: DEFAULT_POLLING_INTERVAL,
max_concurrent_tasks: DEFAULT_MAX_CONCURRENT_TASKS,
namespaces: vec![],
world_block: 0,

Check warning on line 186 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L186

Added line #L186 was not covered by tests
}
}
}
Expand Down Expand Up @@ -208,6 +222,10 @@
if self.namespaces.is_empty() {
self.namespaces = other.namespaces.clone();
}

if self.world_block == 0 {
self.world_block = other.world_block;
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
pub event_processor_config: EventProcessorConfig,
pub world_block: u64,
}

impl Default for EngineConfig {
Expand All @@ -159,6 +160,7 @@
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
event_processor_config: EventProcessorConfig::default(),
world_block: 0,
}
}
}
Expand Down Expand Up @@ -323,7 +325,7 @@
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block = self.provider.block_hash_and_number().await?;

let from = cursors.head.unwrap_or(0);
let from = cursors.head.unwrap_or(self.config.world_block);

Check warning on line 328 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L328

Added line #L328 was not covered by tests
let total_remaining_blocks = latest_block.block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;
Expand Down
26 changes: 24 additions & 2 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,25 +263,36 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
) -> Result<Vec<proto::types::Entity>, Error> {
tracing::debug!(
"Fetching entities from table {table} with {} entity/model pairs",
entities.len()

Check warning on line 268 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L267-L268

Added lines #L267 - L268 were not covered by tests
);
let start = std::time::Instant::now();

// Group entities by their model combinations
let mut model_groups: HashMap<String, Vec<String>> = HashMap::new();
for (entity_id, models_str) in entities {
model_groups.entry(models_str).or_default().push(entity_id);
}
tracing::debug!("Grouped into {} distinct model combinations", model_groups.len());

let mut all_entities = Vec::new();

let mut tx = self.pool.begin().await?;
tracing::debug!("Started database transaction");

// Create a temporary table to store entity IDs due to them potentially exceeding
// SQLite's parameters limit which is 999
let temp_table_start = std::time::Instant::now();
sqlx::query(
"CREATE TEMPORARY TABLE temp_entity_ids (id TEXT PRIMARY KEY, model_group TEXT)",
)
.execute(&mut *tx)
.await?;
tracing::debug!("Created temporary table in {:?}", temp_table_start.elapsed());

// Insert all entity IDs into the temporary table
let insert_start = std::time::Instant::now();
for (model_ids, entity_ids) in &model_groups {
for chunk in entity_ids.chunks(999) {
let placeholders = chunk.iter().map(|_| "(?, ?)").collect::<Vec<_>>().join(",");
Expand All @@ -296,8 +307,14 @@
query.execute(&mut *tx).await?;
}
}
tracing::debug!(
"Inserted all entity IDs into temporary table in {:?}",
insert_start.elapsed()

Check warning on line 312 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L311-L312

Added lines #L311 - L312 were not covered by tests
);

for (models_str, _) in model_groups {
let query_start = std::time::Instant::now();
for (models_str, entity_ids) in &model_groups {
tracing::debug!("Processing model group with {} entities", entity_ids.len());
Comment on lines +315 to +317
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error logging for model processing.

The model group processing lacks error logging. Consider adding error logs to help with debugging.

         let query_start = std::time::Instant::now();
         for (models_str, entity_ids) in &model_groups {
-            tracing::debug!("Processing model group with {} entities", entity_ids.len());
+            tracing::info!("Processing model group with {} entities", entity_ids.len());
+            if let Err(e) = process_model_group(models_str, entity_ids).await {
+                tracing::error!("Failed to process model group: {}", e);
+                return Err(e);
+            }

Committable suggestion skipped: line range outside the PR's diff.

let model_ids =
models_str.split(',').map(|id| Felt::from_str(id).unwrap()).collect::<Vec<_>>();
let schemas =
Expand All @@ -315,7 +332,7 @@
None,
)?;

let rows = sqlx::query(&entity_query).bind(&models_str).fetch_all(&mut *tx).await?;
let rows = sqlx::query(&entity_query).bind(models_str).fetch_all(&mut *tx).await?;
let schemas = Arc::new(schemas);

let group_entities: Result<Vec<_>, Error> = rows
Expand All @@ -325,10 +342,15 @@

all_entities.extend(group_entities?);
}
tracing::debug!("Processed all model groups in {:?}", query_start.elapsed());

sqlx::query("DROP TABLE temp_entity_ids").execute(&mut *tx).await?;
tracing::debug!("Dropped temporary table");

tx.commit().await?;
tracing::debug!("Committed transaction");

tracing::debug!("Total fetch_entities operation took {:?}", start.elapsed());

Ok(all_entities)
}
Expand Down
14 changes: 14 additions & 0 deletions eternum.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
world_address = "0x6a9e4c6f0799160ea8ddc43ff982a5f83d7f633e9732ce42701de1288ff705f"
rpc = "https://api.cartridge.gg/x/starknet/mainnet"
explorer = false

[indexing]
world_block = 947500
events_chunk_size = 1024
blocks_chunk_size = 1024000
pending = true
polling_interval = 500
max_concurrent_tasks = 100
transactions = false
contracts = ["ERC721:0x57675b9c0bd62b096a2e15502a37b290fa766ead21c33eda42993e48a714b80"]
namespaces = []
Loading