Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii): inmemory cache for erc and handle contract type at runtime #2483

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ justfile
spawn-and-move-db
types-test-db
examples/spawn-and-move/manifests/saya/**
**/*.log

artifacts/
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ members = [
"crates/torii/server",
"crates/torii/types-test",
"examples/spawn-and-move",
"scripts/verify_db_balances",
"xtask/generate-test-db",
]

Expand Down
2 changes: 0 additions & 2 deletions bin/torii/torii.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,5 @@
# contracts = [
# { type = "WORLD", address = "<WORLD_CONTRACT_ADDRESS>" },
# { type = "ERC20", address = "<ERC20_CONTRACT_ADDRESS>" },
# { type = "ERC20Legacy", address = "<ERC20_LEGACY_CONTRACT_ADDRESS>" },
# { type = "ERC721", address = "<ERC721_CONTRACT_ADDRESS>" },
# { type = "ERC721Legacy", address = "<ERC721_LEGACY_CONTRACT_ADDRESS>" },
# ]
67 changes: 46 additions & 21 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@
use crate::processors::store_update_member::StoreUpdateMemberProcessor;
use crate::processors::store_update_record::StoreUpdateRecordProcessor;
use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::sql::utils::I256;
use crate::sql::{Cursors, Sql};
use crate::types::ContractType;

type EventProcessorMap<P> = HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>>;

#[allow(missing_debug_implementations)]
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
pub block: Vec<Box<dyn BlockProcessor<P>>>,
pub transaction: Vec<Box<dyn TransactionProcessor<P>>>,
pub catch_all_event: Box<dyn EventProcessor<P>>,
pub event_processors: HashMap<ContractType, HashMap<Felt, Box<dyn EventProcessor<P>>>>,
pub event_processors: HashMap<ContractType, EventProcessorMap<P>>,
}

impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Default for Processors<P> {
Expand All @@ -59,10 +62,8 @@
}

impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Processors<P> {
pub fn initialize_event_processors(
) -> HashMap<ContractType, HashMap<Felt, Box<dyn EventProcessor<P>>>> {
let mut event_processors_map =
HashMap::<ContractType, HashMap<Felt, Box<dyn EventProcessor<P>>>>::new();
pub fn initialize_event_processors() -> HashMap<ContractType, EventProcessorMap<P>> {
let mut event_processors_map = HashMap::<ContractType, EventProcessorMap<P>>::new();

let event_processors = vec![
(
Expand All @@ -78,27 +79,31 @@
),
(
ContractType::ERC20,
vec![Box::new(Erc20TransferProcessor) as Box<dyn EventProcessor<P>>],
vec![
Box::new(Erc20TransferProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc20LegacyTransferProcessor) as Box<dyn EventProcessor<P>>,
],
),
(
ContractType::ERC721,
vec![Box::new(Erc721TransferProcessor) as Box<dyn EventProcessor<P>>],
),
(
ContractType::ERC20Legacy,
vec![Box::new(Erc20LegacyTransferProcessor) as Box<dyn EventProcessor<P>>],
),
(
ContractType::ERC721Legacy,
vec![Box::new(Erc721LegacyTransferProcessor) as Box<dyn EventProcessor<P>>],
vec![
Box::new(Erc721TransferProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc721LegacyTransferProcessor) as Box<dyn EventProcessor<P>>,
],
),
];

for (contract_type, processors) in event_processors {
for processor in processors {
let key = get_selector_from_name(processor.event_key().as_str())
.expect("Event key is ASCII so this should never fail");
event_processors_map.entry(contract_type).or_default().insert(key, processor);
// event_processors_map.entry(contract_type).or_default().insert(key, processor);
event_processors_map
.entry(contract_type)
.or_default()
.entry(key)
.or_default()
.push(processor);
}
}

Expand All @@ -108,7 +113,7 @@
pub fn get_event_processor(
&self,
contract_type: ContractType,
) -> &HashMap<Felt, Box<dyn EventProcessor<P>>> {
) -> &HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>> {
self.event_processors.get(&contract_type).unwrap()
}
}
Expand Down Expand Up @@ -188,6 +193,7 @@
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
contracts: Arc<HashMap<Felt, ContractType>>,
cache: HashMap<String, I256>,
}

struct UnprocessedEvent {
Expand Down Expand Up @@ -217,6 +223,7 @@
block_tx,
contracts,
tasks: HashMap::new(),
cache: HashMap::new(),
}
}

Expand Down Expand Up @@ -451,6 +458,7 @@
}
FetchDataResult::None => {}
}
self.db.apply_cache_diff(&mut self.cache).await?;

Check warning on line 461 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L461

Added line #L461 was not covered by tests

Ok(())
}
Expand Down Expand Up @@ -562,11 +570,14 @@
let mut local_db = db.clone();
for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processor) = contract_processors.get(&event.keys[0]) {
if let Some(processors) = contract_processors.get(&event.keys[0]) {

let processor = processors.iter().find(|p| p.validate(&event)).expect("Must find atleast one processor for the event");

debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event.");

if let Err(e) = processor
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event)
.process(&world, &mut local_db, None, block_number, block_timestamp, &event_id, &event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event.");
Expand Down Expand Up @@ -732,13 +743,20 @@
contract_type: ContractType,
) -> Result<()> {
if self.config.flags.contains(IndexingFlags::RAW_EVENTS) {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);
match contract_type {
ContractType::WORLD => {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);
}

Check warning on line 749 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L746-L749

Added lines #L746 - L749 were not covered by tests
// ERC events needs to be processed inside there respective processor
// we store transfer events for ERC contracts regardless of this flag
ContractType::ERC20 | ContractType::ERC721 => {}

Check warning on line 752 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L752

Added line #L752 was not covered by tests
}
}

let event_key = event.keys[0];

let processors = self.processors.get_event_processor(contract_type);
let Some(processor) = processors.get(&event_key) else {
let Some(processors) = processors.get(&event_key) else {
// if we dont have a processor for this event, we try the catch all processor
if self.processors.catch_all_event.validate(event) {
if let Err(e) = self
Expand All @@ -747,6 +765,7 @@
.process(
&self.world,
&mut self.db,
None,
block_number,
block_timestamp,
event_id,
Expand All @@ -773,6 +792,11 @@
return Ok(());
};

let processor = processors
.iter()
.find(|p| p.validate(event))
.expect("Must find atleast one processor for the event");

let task_identifier = match processor.event_key().as_str() {
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
Expand Down Expand Up @@ -801,6 +825,7 @@
.process(
&self.world,
&mut self.db,
Some(&mut self.cache),
block_number,
block_timestamp,
event_id,
Expand Down
16 changes: 14 additions & 2 deletions crates/torii/core/src/processors/erc20_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,7 @@
use tracing::debug;

use super::EventProcessor;
use crate::sql::utils::I256;
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_legacy_transfer";
Expand Down Expand Up @@ -38,6 +41,7 @@
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
cache: Option<&mut HashMap<String, I256>>,
_block_number: u64,
block_timestamp: u64,
_event_id: &str,
Expand All @@ -50,8 +54,16 @@
let value = U256Cainome::cairo_deserialize(&event.data, 2)?;
let value = U256::from_words(value.low, value.high);

db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp)
.await?;
db.handle_erc20_transfer(
token_address,
from,
to,
value,
world.provider(),
block_timestamp,
cache.expect("cache is required"),
)
.await?;

Check warning on line 66 in crates/torii/core/src/processors/erc20_legacy_transfer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/erc20_legacy_transfer.rs#L57-L66

Added lines #L57 - L66 were not covered by tests
debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer");

Ok(())
Expand Down
16 changes: 14 additions & 2 deletions crates/torii/core/src/processors/erc20_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,7 @@
use tracing::debug;

use super::EventProcessor;
use crate::sql::utils::I256;
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_transfer";
Expand Down Expand Up @@ -38,6 +41,7 @@
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
cache: Option<&mut HashMap<String, I256>>,
_block_number: u64,
block_timestamp: u64,
_event_id: &str,
Expand All @@ -50,8 +54,16 @@
let value = U256Cainome::cairo_deserialize(&event.data, 0)?;
let value = U256::from_words(value.low, value.high);

db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp)
.await?;
db.handle_erc20_transfer(
token_address,
from,
to,
value,
world.provider(),
block_timestamp,
cache.expect("cache is required"),
)
.await?;

Check warning on line 66 in crates/torii/core/src/processors/erc20_transfer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/erc20_transfer.rs#L57-L66

Added lines #L57 - L66 were not covered by tests
debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer");

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions crates/torii/core/src/processors/erc721_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,7 @@
use tracing::debug;

use super::EventProcessor;
use crate::sql::utils::I256;
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_legacy_transfer";
Expand Down Expand Up @@ -38,6 +41,7 @@
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
cache: Option<&mut HashMap<String, I256>>,
_block_number: u64,
block_timestamp: u64,
_event_id: &str,
Expand All @@ -57,6 +61,7 @@
token_id,
world.provider(),
block_timestamp,
cache.expect("cache is required"),

Check warning on line 64 in crates/torii/core/src/processors/erc721_legacy_transfer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/erc721_legacy_transfer.rs#L64

Added line #L64 was not covered by tests
)
.await?;
debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer");
Expand Down
5 changes: 5 additions & 0 deletions crates/torii/core/src/processors/erc721_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,7 @@
use tracing::debug;

use super::EventProcessor;
use crate::sql::utils::I256;
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_transfer";
Expand Down Expand Up @@ -38,6 +41,7 @@
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
cache: Option<&mut HashMap<String, I256>>,
_block_number: u64,
block_timestamp: u64,
_event_id: &str,
Expand All @@ -57,6 +61,7 @@
token_id,
world.provider(),
block_timestamp,
cache.expect("cache is required"),

Check warning on line 64 in crates/torii/core/src/processors/erc721_transfer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/erc721_transfer.rs#L64

Added line #L64 was not covered by tests
)
.await?;
debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer");
Expand Down
4 changes: 4 additions & 0 deletions crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::world::WorldContractReader;
Expand All @@ -7,6 +9,7 @@ use tracing::info;

use super::EventProcessor;
use crate::processors::MODEL_INDEX;
use crate::sql::utils::I256;
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::event_message";
Expand Down Expand Up @@ -39,6 +42,7 @@ where
&self,
_world: &WorldContractReader<P>,
db: &mut Sql,
_cache: Option<&mut HashMap<String, I256>>,
_block_number: u64,
block_timestamp: u64,
event_id: &str,
Expand Down
Loading
Loading