diff --git a/Cargo.lock b/Cargo.lock index bb7394efc4..c0babbb6f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7920,6 +7920,7 @@ dependencies = [ name = "katana-db" version = "1.0.0-alpha.12" dependencies = [ + "alloy-primitives", "anyhow", "criterion", "dojo-metrics", diff --git a/crates/katana/core/src/backend/storage.rs b/crates/katana/core/src/backend/storage.rs index 9264cf4043..444604c39c 100644 --- a/crates/katana/core/src/backend/storage.rs +++ b/crates/katana/core/src/backend/storage.rs @@ -7,6 +7,7 @@ use katana_provider::providers::db::DbProvider; use katana_provider::traits::block::{BlockProvider, BlockWriter}; use katana_provider::traits::contract::ContractClassWriter; use katana_provider::traits::env::BlockEnvProvider; +use katana_provider::traits::messaging::MessagingCheckpointProvider; use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter}; use katana_provider::traits::state_update::StateUpdateProvider; use katana_provider::traits::transaction::{ @@ -29,6 +30,7 @@ pub trait Database: + ContractClassWriter + StateFactoryProvider + BlockEnvProvider + + MessagingCheckpointProvider + 'static + Send + Sync @@ -50,6 +52,7 @@ impl Database for T where + ContractClassWriter + StateFactoryProvider + BlockEnvProvider + + MessagingCheckpointProvider + 'static + Send + Sync diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 618d0f2c03..d4ce7ce96f 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; +use crate::backend::Backend; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; @@ -13,11 +14,12 @@ use katana_pool::validation::stateful::TxValidator; use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader}; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash}; +use katana_primitives::transaction::{ExecutableTxWithHash, Tx, TxHash, TxWithHash}; use katana_primitives::version::CURRENT_STARKNET_VERSION; use katana_provider::error::ProviderError; use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider}; use katana_provider::traits::env::BlockEnvProvider; +use katana_provider::traits::messaging::MessagingCheckpointProvider; use katana_provider::traits::state::StateFactoryProvider; use katana_tasks::{BlockingTaskPool, BlockingTaskResult}; use parking_lot::lock_api::RawMutex; @@ -25,8 +27,6 @@ use parking_lot::{Mutex, RwLock}; use tokio::time::{interval_at, Instant, Interval}; use tracing::{error, info, trace, warn}; -use crate::backend::Backend; - pub(crate) const LOG_TARGET: &str = "miner"; #[derive(Debug, thiserror::Error)] @@ -296,9 +296,11 @@ impl IntervalBlockProducer { fn execute_transactions( executor: PendingExecutor, transactions: Vec, + backend: Arc>, ) -> TxExecutionResult { - let executor = &mut executor.write(); + let provider = backend.blockchain.provider(); + let executor = &mut executor.write(); let new_txs_count = transactions.len(); executor.execute_transactions(transactions)?; @@ -309,13 +311,25 @@ impl IntervalBlockProducer { let results = txs .iter() .skip(total_txs - new_txs_count) - .filter_map(|(tx, res)| match res { - ExecutionResult::Failed { .. } => None, - ExecutionResult::Success { receipt, trace, .. } => Some(TxWithOutcome { - tx: tx.clone(), - receipt: receipt.clone(), - exec_info: trace.clone(), - }), + .filter_map(|(tx, res)| { + let tx_ref: &Tx = &tx.transaction; + + trace!(target: LOG_TARGET, "Executed transaction: {:?}", tx); + let _ = match tx_ref { + Tx::L1Handler(l1_tx) => { + provider.process_message_nonce(l1_tx) + } + _ => Ok(()), + }; + + match res { + ExecutionResult::Failed { .. } => None, + ExecutionResult::Success { receipt, trace, .. } => Some(TxWithOutcome { + tx: tx.clone(), + receipt: receipt.clone(), + exec_info: trace.clone(), + }), + } }) .collect::>(); @@ -399,10 +413,11 @@ impl Stream for IntervalBlockProducer { let transactions: Vec = std::mem::take(&mut pin.queued).into_iter().flatten().collect(); + let backend = pin.backend.clone(); let fut = pin .blocking_task_spawner - .spawn(|| Self::execute_transactions(executor, transactions)); + .spawn(|| Self::execute_transactions(executor, transactions, backend)); pin.ongoing_execution = Some(Box::pin(fut)); } diff --git a/crates/katana/core/src/service/messaging/mod.rs b/crates/katana/core/src/service/messaging/mod.rs index cd064f44be..dac9901e6b 100644 --- a/crates/katana/core/src/service/messaging/mod.rs +++ b/crates/katana/core/src/service/messaging/mod.rs @@ -112,6 +112,10 @@ pub struct MessagingConfig { pub interval: u64, /// The block on settlement chain from where Katana will start fetching messages. pub from_block: u64, + /// The maximum number of blocks in gather messages + pub max_block: u64, + /// The size of events returned by get_events call + pub chunk_size: u64, } impl MessagingConfig { diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index df7bc6f8a7..830c837782 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -10,6 +10,7 @@ use katana_primitives::block::BlockHashOrNumber; use katana_primitives::receipt::MessageToL1; use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; use katana_provider::traits::block::BlockNumberProvider; +use katana_provider::traits::messaging::{MessagingCheckpointProvider}; use katana_provider::traits::transaction::ReceiptProvider; use tokio::time::{interval_at, Instant, Interval}; use tracing::{error, info}; @@ -38,6 +39,10 @@ pub struct MessagingService { send_from_block: u64, /// The message sending future. msg_send_fut: Option, + /// The maximum block number to gather messages from. + max_block: u64, + /// The chunk size of messages to gather. + chunk_size: u64, } impl MessagingService { @@ -48,14 +53,38 @@ impl MessagingService { pool: TxPool, backend: Arc>, ) -> anyhow::Result { - let gather_from_block = config.from_block; + let provider = backend.blockchain.provider(); + let gather_from_block = match provider.get_inbound_block() { + Ok(Some(block)) => block, + Ok(None) => 0, + Err(_) => { + anyhow::bail!( + "Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n" + ) + } + }; + let send_from_block = match provider.get_outbound_block() { + Ok(Some(block)) => block, + Ok(None) => 0, + Err(_) => { + anyhow::bail!( + "Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n" + ) + } + }; + + let max_block = config.max_block; + let chunk_size = config.chunk_size; + let interval = interval_from_seconds(config.interval); let messenger = match MessengerMode::from_config(config).await { Ok(m) => Arc::new(m), Err(_) => { - panic!( + anyhow::bail!( "Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n", + (anvil or other katana) is running.\n" ) } }; @@ -66,9 +95,11 @@ impl MessagingService { interval, messenger, gather_from_block, - send_from_block: 0, + send_from_block, msg_gather_fut: None, msg_send_fut: None, + max_block, + chunk_size, }) } @@ -77,11 +108,9 @@ impl MessagingService { pool: TxPool, backend: Arc>, from_block: u64, + max_block: u64, + chunk_size: u64, ) -> MessengerResult<(u64, usize)> { - // 200 avoids any possible rejection from RPC with possibly lot's of messages. - // TODO: May this be configurable? - let max_block = 200; - match messenger.as_ref() { MessengerMode::Ethereum(inner) => { let (block_num, txs) = @@ -102,8 +131,9 @@ impl MessagingService { #[cfg(feature = "starknet-messaging")] MessengerMode::Starknet(inner) => { - let (block_num, txs) = - inner.gather_messages(from_block, max_block, backend.chain_id).await?; + let (block_num, txs) = inner + .gather_messages(from_block, max_block, chunk_size, backend.chain_id) + .await?; let txs_count = txs.len(); txs.into_iter().for_each(|tx| { @@ -188,6 +218,8 @@ impl Stream for MessagingService { pin.pool.clone(), pin.backend.clone(), pin.gather_from_block, + pin.max_block, + pin.chunk_size, ))); } @@ -209,6 +241,11 @@ impl Stream for MessagingService { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { pin.gather_from_block = last_block + 1; + let _ = pin + .backend + .blockchain + .provider() + .set_inbound_block(pin.gather_from_block); return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, msg_count, @@ -234,6 +271,8 @@ impl Stream for MessagingService { // +1 to move to the next local block to check messages to be // sent on the settlement chain. pin.send_from_block += 1; + let _ = + pin.backend.blockchain.provider().set_outbound_block(pin.send_from_block); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); } Poll::Ready(Err(e)) => { diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 7c3f2bb3db..2d725f8ee1 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -64,7 +64,8 @@ impl StarknetMessaging { &self, from_block: BlockId, to_block: BlockId, - ) -> Result> { + chunk_size: u64, + ) -> Result>> { trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); let mut events = vec![]; @@ -77,8 +78,6 @@ impl StarknetMessaging { keys: None, }; - // TODO: this chunk_size may also come from configuration? - let chunk_size = 200; let mut continuation_token: Option = None; loop { @@ -165,6 +164,7 @@ impl Messenger for StarknetMessaging { &self, from_block: u64, max_blocks: u64, + chunk_size: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { let chain_latest_block: u64 = match self.provider.block_number().await { @@ -193,7 +193,7 @@ impl Messenger for StarknetMessaging { let mut l1_handler_txs: Vec = vec![]; - self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) + self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block), chunk_size) .await .map_err(|_| Error::SendError) .unwrap() @@ -204,10 +204,15 @@ impl Messenger for StarknetMessaging { event = ?e, "Converting event into L1HandlerTx." ); - - if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { - l1_handler_txs.push(tx) - } + block_events.iter().for_each(|e| { + if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { + let last_processed_nonce = + self.provider.get_gather_message_nonce().unwrap_or(0.into()); + if tx.nonce > last_processed_nonce { + l1_handler_txs.push(tx) + } + } + }) }); Ok((to_block, l1_handler_txs)) @@ -235,7 +240,14 @@ impl Messenger for StarknetMessaging { }; } - self.send_hashes(hashes.clone()).await?; + for (index, hash) in hashes.iter().enumerate() { + let stored_index = self.provider.get_send_from_index(); + self.send_hashes(std::slice::from_ref(hash)).await?; + self.provider.set_send_from_index((index as u64) + 1).await?; + } + + // reset the index + self.provider.set_send_from_index(0).await?; Ok(hashes) } diff --git a/crates/katana/primitives/src/contract.rs b/crates/katana/primitives/src/contract.rs index 4473d4169b..72ebaf180b 100644 --- a/crates/katana/primitives/src/contract.rs +++ b/crates/katana/primitives/src/contract.rs @@ -14,6 +14,9 @@ pub type StorageValue = Felt; /// Represents the type for a contract nonce. pub type Nonce = Felt; +/// Represents the type for a message hash. +pub type MessageHash = Felt; + /// Represents a contract address. #[derive(Default, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, Debug, Deref)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] diff --git a/crates/katana/primitives/src/genesis/json.rs b/crates/katana/primitives/src/genesis/json.rs index 74f43f4a9c..1edfddfc6e 100644 --- a/crates/katana/primitives/src/genesis/json.rs +++ b/crates/katana/primitives/src/genesis/json.rs @@ -270,6 +270,7 @@ pub struct GenesisJson { pub accounts: HashMap, #[serde(default)] pub contracts: HashMap, + pub settlement_block_number: BlockNumber, } impl GenesisJson { @@ -611,6 +612,7 @@ impl TryFrom for Genesis { gas_prices: value.gas_prices, state_root: value.state_root, parent_hash: value.parent_hash, + settlement_block_number: value.settlement_block_number, }) } } @@ -1039,6 +1041,7 @@ mod tests { let expected_genesis = Genesis { classes: expected_classes, number: 0, + gather_from_block: 0, fee_token: expected_fee_token, allocations: expected_allocations, timestamp: 5123512314u64, @@ -1179,6 +1182,7 @@ mod tests { classes, allocations, number: 0, + gather_from_block: 0, timestamp: 5123512314u64, state_root: felt!("0x99"), parent_hash: felt!("0x999"), diff --git a/crates/katana/primitives/src/genesis/mod.rs b/crates/katana/primitives/src/genesis/mod.rs index a2548b67ca..b0af9577d4 100644 --- a/crates/katana/primitives/src/genesis/mod.rs +++ b/crates/katana/primitives/src/genesis/mod.rs @@ -105,6 +105,8 @@ pub struct Genesis { pub universal_deployer: Option, /// The genesis contract allocations. pub allocations: BTreeMap, + /// The block on settlement chain from where Katana will start fetching messages. + pub settlement_block_number: BlockNumber, } impl Genesis { @@ -309,6 +311,7 @@ impl Default for Genesis { allocations: BTreeMap::new(), fee_token, universal_deployer: Some(universal_deployer), + settlement_block_number: 0, } } } @@ -425,6 +428,7 @@ mod tests { fee_token: fee_token.clone(), allocations: BTreeMap::from(allocations.clone()), number: 0, + settlement_block_number: 0, timestamp: 5123512314u64, state_root: felt!("0x99"), parent_hash: felt!("0x999"), diff --git a/crates/katana/storage/db/Cargo.toml b/crates/katana/storage/db/Cargo.toml index ce6aaf883b..081afb1547 100644 --- a/crates/katana/storage/db/Cargo.toml +++ b/crates/katana/storage/db/Cargo.toml @@ -21,6 +21,8 @@ tempfile = { workspace = true, optional = true } thiserror.workspace = true tracing.workspace = true +alloy-primitives.workspace = true + # codecs [dependencies.postcard] default-features = false diff --git a/crates/katana/storage/db/src/models/storage.rs b/crates/katana/storage/db/src/models/storage.rs index b6264a481f..0317116bcb 100644 --- a/crates/katana/storage/db/src/models/storage.rs +++ b/crates/katana/storage/db/src/models/storage.rs @@ -83,6 +83,43 @@ impl Decompress for ContractStorageEntry { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MessagingCheckpointId { + SendBlock, + SendIndex, + GatherBlock, + GatherNonce, +} + + +impl Encode for MessagingCheckpointId { + type Encoded = [u8; 1]; + fn encode(self) -> Self::Encoded { + let mut buf = [0u8; 1]; + buf[0] = match self { + MessagingCheckpointId::SendBlock => 1, + MessagingCheckpointId::SendIndex => 2, + MessagingCheckpointId::GatherBlock => 3, + MessagingCheckpointId::GatherNonce => 4, + }; + buf + } +} + +impl Decode for MessagingCheckpointId { + fn decode>(bytes: B) -> Result { + let bytes = bytes.as_ref(); + match bytes[0] { + 1 => Ok(MessagingCheckpointId::SendBlock), + 2 => Ok(MessagingCheckpointId::SendIndex), + 3 => Ok(MessagingCheckpointId::GatherBlock), + 4 => Ok(MessagingCheckpointId::GatherNonce), + _ => Err(CodecError::Decode("Invalid MessagingCheckpointId".into())), + } + } +} + + #[cfg(test)] mod tests { use starknet::macros::felt; diff --git a/crates/katana/storage/db/src/tables.rs b/crates/katana/storage/db/src/tables.rs index d7af3e1333..b1a5d868ba 100644 --- a/crates/katana/storage/db/src/tables.rs +++ b/crates/katana/storage/db/src/tables.rs @@ -1,6 +1,6 @@ use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::{ContractAddress, GenericContractInfo, StorageKey}; +use katana_primitives::contract::{ContractAddress, GenericContractInfo, Nonce, StorageKey}; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; use katana_primitives::transaction::{Tx, TxHash, TxNumber}; @@ -9,7 +9,7 @@ use crate::codecs::{Compress, Decode, Decompress, Encode}; use crate::models::block::StoredBlockBodyIndices; use crate::models::contract::{ContractClassChange, ContractInfoChangeList, ContractNonceChange}; use crate::models::list::BlockList; -use crate::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; +use crate::models::storage::{ContractStorageEntry, ContractStorageKey, MessagingCheckpointId, StorageEntry}; pub trait Key: Encode + Decode + Clone + std::fmt::Debug {} pub trait Value: Compress + Decompress + std::fmt::Debug {} @@ -44,7 +44,7 @@ pub enum TableType { DupSort, } -pub const NUM_TABLES: usize = 23; +pub const NUM_TABLES: usize = 27; /// Macro to declare `libmdbx` tables. #[macro_export] @@ -167,7 +167,11 @@ define_tables_enum! {[ (NonceChangeHistory, TableType::DupSort), (ClassChangeHistory, TableType::DupSort), (StorageChangeHistory, TableType::DupSort), - (StorageChangeSet, TableType::Table) + (StorageChangeSet, TableType::Table), + (MessagingCheckpointBlock, TableType::Table), + (MessagingCheckpointNonce, TableType::Table), + (MessagingMessageNonceMapping, TableType::Table), + (MessagingCheckpointIndex, TableType::Table) ]} tables! { @@ -223,7 +227,19 @@ tables! { /// storage change set StorageChangeSet: (ContractStorageKey) => BlockList, /// Account storage change set - StorageChangeHistory: (BlockNumber, ContractStorageKey) => ContractStorageEntry + StorageChangeHistory: (BlockNumber, ContractStorageKey) => ContractStorageEntry, + + /// Stores the block number related to messaging service + MessagingCheckpointBlock: (MessagingCheckpointId) => BlockNumber, + + /// Stores the nonce related to messaging service + MessagingCheckpointNonce: (MessagingCheckpointId) => Nonce, + + /// Map a message hash to a message nonce + MessagingMessageNonceMapping: (TxHash) => Nonce, + + /// Stores the index of the messaging service + MessagingCheckpointIndex: (MessagingCheckpointId) => u64 } @@ -258,6 +274,10 @@ mod tests { assert_eq!(Tables::ALL[20].name(), ClassChangeHistory::NAME); assert_eq!(Tables::ALL[21].name(), StorageChangeHistory::NAME); assert_eq!(Tables::ALL[22].name(), StorageChangeSet::NAME); + assert_eq!(Tables::ALL[23].name(), MessagingCheckpointBlock::NAME); + assert_eq!(Tables::ALL[24].name(), MessagingCheckpointNonce::NAME); + assert_eq!(Tables::ALL[25].name(), MessagingMessageNonceMapping::NAME); + assert_eq!(Tables::ALL[26].name(), MessagingCheckpointIndex::NAME); assert_eq!(Tables::Headers.table_type(), TableType::Table); assert_eq!(Tables::BlockHashes.table_type(), TableType::Table); @@ -282,6 +302,10 @@ mod tests { assert_eq!(Tables::ClassChangeHistory.table_type(), TableType::DupSort); assert_eq!(Tables::StorageChangeHistory.table_type(), TableType::DupSort); assert_eq!(Tables::StorageChangeSet.table_type(), TableType::Table); + assert_eq!(Tables::MessagingCheckpointBlock.table_type(), TableType::Table); + assert_eq!(Tables::MessagingCheckpointNonce.table_type(), TableType::Table); + assert_eq!(Tables::MessagingMessageNonceMapping.table_type(), TableType::Table); + assert_eq!(Tables::MessagingCheckpointIndex.table_type(), TableType::Table); } use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; @@ -301,6 +325,7 @@ mod tests { }; use crate::models::list::BlockList; use crate::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; + use crate::tables::Tables::{MessagingCheckpointBlock, MessagingCheckpointNonce}; macro_rules! assert_key_encode_decode { { $( ($name:ty, $key:expr) ),* } => { diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index b7c1f7b430..5854b2d04d 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -6,16 +6,17 @@ use katana_primitives::block::{ SealedBlockWithStatus, }; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::{ContractAddress, StorageKey, StorageValue}; +use katana_primitives::contract::{ContractAddress, MessageHash, Nonce, StorageKey, StorageValue}; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, TxHash, TxNumber, TxWithHash}; use katana_primitives::Felt; use traits::block::{BlockIdReader, BlockStatusProvider, BlockWriter}; use traits::contract::{ContractClassProvider, ContractClassWriter}; use traits::env::BlockEnvProvider; +use traits::messaging::MessagingCheckpointProvider; use traits::state::{StateRootProvider, StateWriter}; use traits::transaction::{TransactionStatusProvider, TransactionTraceProvider}; @@ -380,3 +381,64 @@ where self.provider.block_env_at(id) } } + +impl MessagingCheckpointProvider for BlockchainProvider +where + Db: MessagingCheckpointProvider, +{ + fn get_outbound_block(&self) -> ProviderResult> { + self.provider.get_outbound_block() + } + + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()> { + self.provider.set_outbound_block(outbound_block) + } + + fn get_inbound_block(&self) -> ProviderResult> { + self.provider.get_inbound_block() + } + + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()> { + self.provider.set_inbound_block(inbound_block) + } + + fn get_inbound_nonce(&self) -> ProviderResult> { + self.provider.get_inbound_nonce() + } + + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.provider.set_inbound_nonce(nonce) + } + + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + self.provider.get_nonce_from_message_hash(message_hash) + } + + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.provider.set_nonce_from_message_hash(message_hash, nonce) + } + + fn get_outbound_index(&self) -> ProviderResult> { + self.provider.get_outbound_index() + } + + fn set_outbound_index(&self, outbound_index: u64) -> ProviderResult<()> { + self.provider.set_outbound_index(outbound_index) + } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } +} diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index 2954dce1c2..e3b6156d23 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -12,7 +12,7 @@ use katana_db::models::contract::{ ContractClassChange, ContractInfoChangeList, ContractNonceChange, }; use katana_db::models::list::BlockList; -use katana_db::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; +use katana_db::models::storage::{ContractStorageEntry, ContractStorageKey, MessagingCheckpointId, StorageEntry}; use katana_db::tables::{self, DupSort, Table}; use katana_db::utils::KeyValue; use katana_primitives::block::{ @@ -21,13 +21,13 @@ use katana_primitives::block::{ }; use katana_primitives::class::{ClassHash, CompiledClassHash}; use katana_primitives::contract::{ - ContractAddress, GenericContractInfo, Nonce, StorageKey, StorageValue, + ContractAddress, GenericContractInfo, MessageHash, Nonce, StorageKey, StorageValue, }; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, TxHash, TxNumber, TxWithHash}; use katana_primitives::Felt; use crate::error::ProviderError; @@ -36,6 +36,7 @@ use crate::traits::block::{ HeaderProvider, }; use crate::traits::env::BlockEnvProvider; +use crate::traits::messaging::MessagingCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -43,7 +44,6 @@ use crate::traits::transaction::{ TransactionsProviderExt, }; use crate::ProviderResult; - /// A provider implementation that uses a persistent database as the backend. // TODO: remove the default generic type #[derive(Debug)] @@ -763,6 +763,94 @@ impl BlockWriter for DbProvider { } } +impl MessagingCheckpointProvider for DbProvider { + fn set_outbound_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(MessagingCheckpointId::SendBlock, send_from_block)?; + Ok(()) + })? + } + + fn get_outbound_block(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let block_num = db_tx.get::(MessagingCheckpointId::SendBlock)?; + db_tx.commit()?; + Ok(block_num) + } + + fn set_inbound_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(MessagingCheckpointId::GatherBlock, gather_from_block)?; + Ok(()) + })? + } + + fn get_inbound_block(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let block_num = db_tx.get::(MessagingCheckpointId::GatherBlock)?; + db_tx.commit()?; + Ok(block_num) + } + + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(MessagingCheckpointId::GatherNonce, nonce)?; + Ok(()) + })? + } + + fn get_inbound_nonce(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let nonce = db_tx.get::(MessagingCheckpointId::GatherNonce)?; + db_tx.commit()?; + Ok(nonce) + } + + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(message_hash, nonce)?; + Ok(()) + })? + } + + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + let db_tx = self.0.tx()?; + let nonce = db_tx.get::(message_hash)?; + db_tx.commit()?; + Ok(nonce) + } + + fn set_outbound_index(&self, send_from_index: u64) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(MessagingCheckpointId::SendIndex, send_from_index)?; + Ok(()) + })? + } + + fn get_outbound_index(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let index = db_tx.get::(MessagingCheckpointId::SendIndex)?; + db_tx.commit()?; + Ok(index) + } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index 104b1a9328..c525506656 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -10,16 +10,16 @@ use katana_primitives::block::{ SealedBlockWithStatus, }; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::ContractAddress; +use katana_primitives::contract::{ContractAddress, MessageHash, Nonce}; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{Tx, TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, Tx, TxHash, TxNumber, TxWithHash}; use parking_lot::RwLock; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; - +use katana_primitives::Felt; use self::backend::{Backend, BackendError, SharedStateProvider}; use self::state::ForkedStateDb; use super::in_memory::cache::{CacheDb, CacheStateDb}; @@ -30,6 +30,7 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; +use crate::traits::messaging::MessagingCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -581,3 +582,66 @@ impl BlockEnvProvider for ForkedProvider { })) } } + +impl MessagingCheckpointProvider for ForkedProvider { + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.send_block = Some(outbound_block); + Ok(()) + } + + fn get_outbound_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.send_block) + } + + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.gather_block = Some(inbound_block); + Ok(()) + } + + fn get_inbound_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.gather_block) + } + + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_info.gather_nonce = Some(nonce); + Ok(()) + } + + fn get_inbound_nonce(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.gather_nonce) + } + + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_message_nonce_mapping.insert(message_hash, nonce); + Ok(()) + } + + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) + } + + fn set_outbound_index(&self, outbound_index: u64) -> ProviderResult<()> { + self.storage.write().messaging_info.send_index = Some(outbound_index); + Ok(()) + } + + fn get_outbound_index(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.send_index) + } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } +} diff --git a/crates/katana/storage/provider/src/providers/in_memory/cache.rs b/crates/katana/storage/provider/src/providers/in_memory/cache.rs index d5b1df7685..20bbd4ffa2 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/cache.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/cache.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use katana_db::models::block::StoredBlockBodyIndices; use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; +use katana_primitives::contract::Nonce; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; use katana_primitives::contract::{ContractAddress, GenericContractInfo, StorageKey, StorageValue}; use katana_primitives::receipt::Receipt; @@ -41,6 +42,14 @@ pub struct CacheStateDb { pub(crate) compiled_class_hashes: RwLock, } +#[derive(Default, Debug)] +pub struct MessagingCheckpointId { + pub(crate) send_block: Option, + pub(crate) send_index: Option, + pub(crate) gather_block: Option, + pub(crate) gather_nonce: Option, +} + impl CacheStateDb { /// Applies the given state updates to the cache. pub fn insert_updates(&self, updates: StateUpdatesWithDeclaredClasses) { @@ -88,6 +97,8 @@ pub struct CacheDb { pub(crate) transaction_hashes: HashMap, pub(crate) transaction_numbers: HashMap, pub(crate) transaction_block: HashMap, + pub(crate) messaging_info: MessagingCheckpointId, + pub(crate) messaging_message_nonce_mapping: HashMap, } impl CacheStateDb { @@ -120,6 +131,8 @@ impl CacheDb { transactions_executions: Vec::new(), latest_block_hash: Default::default(), latest_block_number: Default::default(), + messaging_info: Default::default(), + messaging_message_nonce_mapping: Default::default(), } } } diff --git a/crates/katana/storage/provider/src/providers/in_memory/mod.rs b/crates/katana/storage/provider/src/providers/in_memory/mod.rs index 2fc8b7aac4..f8ca389c5c 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/mod.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/mod.rs @@ -10,14 +10,14 @@ use katana_primitives::block::{ SealedBlockWithStatus, }; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::ContractAddress; +use katana_primitives::contract::{ContractAddress, MessageHash, Nonce}; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{Tx, TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, Tx, TxHash, TxNumber, TxWithHash}; use parking_lot::RwLock; - +use katana_primitives::Felt; use self::cache::CacheDb; use self::state::{HistoricalStates, InMemoryStateDb, LatestStateProvider}; use crate::traits::block::{ @@ -26,6 +26,7 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; +use crate::traits::messaging::MessagingCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -575,3 +576,66 @@ impl BlockEnvProvider for InMemoryProvider { })) } } + +impl MessagingCheckpointProvider for InMemoryProvider { + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.send_block = Some(outbound_block); + Ok(()) + } + + fn get_outbound_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.send_block) + } + + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.gather_block = Some(inbound_block); + Ok(()) + } + + fn get_inbound_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.gather_block) + } + + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_info.gather_nonce = Some(nonce); + Ok(()) + } + + fn get_inbound_nonce(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.gather_nonce) + } + + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_message_nonce_mapping.insert(message_hash, nonce); + Ok(()) + } + + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) + } + + fn set_outbound_index(&self, outbound_index: u64) -> ProviderResult<()> { + self.storage.write().messaging_info.send_index = Some(outbound_index); + Ok(()) + } + + fn get_outbound_index(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.send_index) + } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } +} diff --git a/crates/katana/storage/provider/src/traits/messaging.rs b/crates/katana/storage/provider/src/traits/messaging.rs new file mode 100644 index 0000000000..94db0fff9e --- /dev/null +++ b/crates/katana/storage/provider/src/traits/messaging.rs @@ -0,0 +1,31 @@ +use katana_primitives::block::BlockNumber; +use katana_primitives::contract::{Nonce, MessageHash}; +use katana_primitives::transaction::L1HandlerTx; +use crate::ProviderResult; + +#[auto_impl::auto_impl(&, Box, Arc)] +pub trait MessagingCheckpointProvider: Send + Sync { + /// Sets the outbound block. + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()>; + /// Returns the outbound block. + fn get_outbound_block(&self) -> ProviderResult>; + + /// Sets the inbound block. + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()>; + /// Returns the inbound block. + fn get_inbound_block(&self) -> ProviderResult>; + /// Sets the inbound nonce. + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()>; + /// Returns the inbound nonce. + fn get_inbound_nonce(&self) -> ProviderResult>; + /// Sets the nonce by message_hash. + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()>; + /// Returns the nonce by message_hash. + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult>; + /// Sets the outbound index. + fn set_outbound_index(&self, index: u64) -> ProviderResult<()>; + /// Returns the outbound index. + fn get_outbound_index(&self) -> ProviderResult>; + /// Processes the nonce in the provided L1HandlerTx and updates the inbound nonce within the provider. + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>; +} diff --git a/crates/katana/storage/provider/src/traits/mod.rs b/crates/katana/storage/provider/src/traits/mod.rs index 762de20465..a221b98e8d 100644 --- a/crates/katana/storage/provider/src/traits/mod.rs +++ b/crates/katana/storage/provider/src/traits/mod.rs @@ -1,6 +1,7 @@ pub mod block; pub mod contract; pub mod env; +pub mod messaging; pub mod state; pub mod state_update; pub mod transaction;