From b2dc09e179b6fb51b9c57af29c2af5069cf250d4 Mon Sep 17 00:00:00 2001 From: bexan Date: Wed, 28 Aug 2024 16:23:10 +0200 Subject: [PATCH 1/8] feat(mesaging): prevent from resending messages to L1 --- crates/katana/core/src/backend/storage.rs | 3 ++ .../core/src/service/messaging/service.rs | 32 +++++++++++++++-- crates/katana/primitives/src/genesis/json.rs | 2 ++ crates/katana/primitives/src/genesis/mod.rs | 3 ++ crates/katana/storage/db/src/tables.rs | 11 ++++-- crates/katana/storage/provider/src/lib.rs | 22 ++++++++++++ .../storage/provider/src/providers/db/mod.rs | 34 ++++++++++++++++++- .../provider/src/providers/fork/mod.rs | 21 ++++++++++++ .../provider/src/providers/in_memory/cache.rs | 2 ++ .../provider/src/providers/in_memory/mod.rs | 22 ++++++++++++ .../storage/provider/src/traits/messaging.rs | 18 ++++++++++ .../katana/storage/provider/src/traits/mod.rs | 1 + 12 files changed, 165 insertions(+), 6 deletions(-) create mode 100644 crates/katana/storage/provider/src/traits/messaging.rs diff --git a/crates/katana/core/src/backend/storage.rs b/crates/katana/core/src/backend/storage.rs index 9264cf4043..f66521802c 100644 --- a/crates/katana/core/src/backend/storage.rs +++ b/crates/katana/core/src/backend/storage.rs @@ -7,6 +7,7 @@ use katana_provider::providers::db::DbProvider; use katana_provider::traits::block::{BlockProvider, BlockWriter}; use katana_provider::traits::contract::ContractClassWriter; use katana_provider::traits::env::BlockEnvProvider; +use katana_provider::traits::messaging::MessagingProvider; use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter}; use katana_provider::traits::state_update::StateUpdateProvider; use katana_provider::traits::transaction::{ @@ -29,6 +30,7 @@ pub trait Database: + ContractClassWriter + StateFactoryProvider + BlockEnvProvider + + MessagingProvider + 'static + Send + Sync @@ -50,6 +52,7 @@ impl Database for T where + ContractClassWriter + StateFactoryProvider + BlockEnvProvider + + MessagingProvider + 'static + Send + Sync diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index df7bc6f8a7..e699bbb073 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -11,6 +11,7 @@ use katana_primitives::receipt::MessageToL1; use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; use katana_provider::traits::block::BlockNumberProvider; use katana_provider::traits::transaction::ReceiptProvider; +use katana_provider::traits::messaging::MessagingProvider; use tokio::time::{interval_at, Instant, Interval}; use tracing::{error, info}; @@ -48,7 +49,32 @@ impl MessagingService { pool: TxPool, backend: Arc>, ) -> anyhow::Result { - let gather_from_block = config.from_block; + let provider = backend.blockchain.provider(); + let gather_from_block = match provider.get_gather_from_block() { + Ok(Some(block)) => block, + Ok(None) => { + 0 + } + Err(_) => { + panic!( + "Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n", + ) + } + }; + let send_from_block = match provider.get_send_from_block() { + Ok(Some(block)) => block, + Ok(None) => { + 0 + } + Err(_) => { + panic!( + "Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n", + ) + } + }; + let interval = interval_from_seconds(config.interval); let messenger = match MessengerMode::from_config(config).await { Ok(m) => Arc::new(m), @@ -66,7 +92,7 @@ impl MessagingService { interval, messenger, gather_from_block, - send_from_block: 0, + send_from_block, msg_gather_fut: None, msg_send_fut: None, }) @@ -209,6 +235,7 @@ impl Stream for MessagingService { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { pin.gather_from_block = last_block + 1; + let _ = pin.backend.blockchain.provider().set_gather_from_block(pin.gather_from_block); return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, msg_count, @@ -234,6 +261,7 @@ impl Stream for MessagingService { // +1 to move to the next local block to check messages to be // sent on the settlement chain. pin.send_from_block += 1; + let _ = pin.backend.blockchain.provider().set_send_from_block(pin.send_from_block); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); } Poll::Ready(Err(e)) => { diff --git a/crates/katana/primitives/src/genesis/json.rs b/crates/katana/primitives/src/genesis/json.rs index 74f43f4a9c..38141480ff 100644 --- a/crates/katana/primitives/src/genesis/json.rs +++ b/crates/katana/primitives/src/genesis/json.rs @@ -270,6 +270,7 @@ pub struct GenesisJson { pub accounts: HashMap, #[serde(default)] pub contracts: HashMap, + pub gather_from_block: BlockNumber, } impl GenesisJson { @@ -611,6 +612,7 @@ impl TryFrom for Genesis { gas_prices: value.gas_prices, state_root: value.state_root, parent_hash: value.parent_hash, + gather_from_block: value.gather_from_block }) } } diff --git a/crates/katana/primitives/src/genesis/mod.rs b/crates/katana/primitives/src/genesis/mod.rs index a2548b67ca..1e931ebb12 100644 --- a/crates/katana/primitives/src/genesis/mod.rs +++ b/crates/katana/primitives/src/genesis/mod.rs @@ -105,6 +105,8 @@ pub struct Genesis { pub universal_deployer: Option, /// The genesis contract allocations. pub allocations: BTreeMap, + /// The block on settlement chain from where Katana will start fetching messages. + pub gather_from_block: BlockNumber, } impl Genesis { @@ -309,6 +311,7 @@ impl Default for Genesis { allocations: BTreeMap::new(), fee_token, universal_deployer: Some(universal_deployer), + gather_from_block: 0, } } } diff --git a/crates/katana/storage/db/src/tables.rs b/crates/katana/storage/db/src/tables.rs index d7af3e1333..39ddcdb4fb 100644 --- a/crates/katana/storage/db/src/tables.rs +++ b/crates/katana/storage/db/src/tables.rs @@ -44,7 +44,7 @@ pub enum TableType { DupSort, } -pub const NUM_TABLES: usize = 23; +pub const NUM_TABLES: usize = 24; /// Macro to declare `libmdbx` tables. #[macro_export] @@ -167,7 +167,8 @@ define_tables_enum! {[ (NonceChangeHistory, TableType::DupSort), (ClassChangeHistory, TableType::DupSort), (StorageChangeHistory, TableType::DupSort), - (StorageChangeSet, TableType::Table) + (StorageChangeSet, TableType::Table), + (MessagingInfo, TableType::Table) ]} tables! { @@ -223,8 +224,10 @@ tables! { /// storage change set StorageChangeSet: (ContractStorageKey) => BlockList, /// Account storage change set - StorageChangeHistory: (BlockNumber, ContractStorageKey) => ContractStorageEntry + StorageChangeHistory: (BlockNumber, ContractStorageKey) => ContractStorageEntry, + /// Stores the block number related to messaging service + MessagingInfo: (u64) => BlockNumber } #[cfg(test)] @@ -258,6 +261,7 @@ mod tests { assert_eq!(Tables::ALL[20].name(), ClassChangeHistory::NAME); assert_eq!(Tables::ALL[21].name(), StorageChangeHistory::NAME); assert_eq!(Tables::ALL[22].name(), StorageChangeSet::NAME); + assert_eq!(Tables::ALL[23].name(), MessagingInfo::NAME); assert_eq!(Tables::Headers.table_type(), TableType::Table); assert_eq!(Tables::BlockHashes.table_type(), TableType::Table); @@ -282,6 +286,7 @@ mod tests { assert_eq!(Tables::ClassChangeHistory.table_type(), TableType::DupSort); assert_eq!(Tables::StorageChangeHistory.table_type(), TableType::DupSort); assert_eq!(Tables::StorageChangeSet.table_type(), TableType::Table); + assert_eq!(Tables::MessagingInfo.table_type(), TableType::Table); } use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index b7c1f7b430..fc10622a70 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -16,6 +16,7 @@ use katana_primitives::Felt; use traits::block::{BlockIdReader, BlockStatusProvider, BlockWriter}; use traits::contract::{ContractClassProvider, ContractClassWriter}; use traits::env::BlockEnvProvider; +use traits::messaging::MessagingProvider; use traits::state::{StateRootProvider, StateWriter}; use traits::transaction::{TransactionStatusProvider, TransactionTraceProvider}; @@ -380,3 +381,24 @@ where self.provider.block_env_at(id) } } + +impl MessagingProvider for BlockchainProvider +where + Db: MessagingProvider, +{ + fn get_send_from_block(&self) -> ProviderResult> { + self.provider.get_send_from_block() + } + + fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + self.provider.set_send_from_block(send_from_block) + } + + fn get_gather_from_block(&self) -> ProviderResult> { + self.provider.get_gather_from_block() + } + + fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { + self.provider.set_gather_from_block(gather_from_block) + } +} diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index 2954dce1c2..4583f0c33d 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -36,14 +36,15 @@ use crate::traits::block::{ HeaderProvider, }; use crate::traits::env::BlockEnvProvider; +use crate::traits::messaging::MessagingProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ ReceiptProvider, TransactionProvider, TransactionStatusProvider, TransactionTraceProvider, TransactionsProviderExt, }; +use crate::traits::messaging::{SEND_FROM_BLOCK_KEY, GATHER_FROM_BLOCK_KEY}; use crate::ProviderResult; - /// A provider implementation that uses a persistent database as the backend. // TODO: remove the default generic type #[derive(Debug)] @@ -763,6 +764,37 @@ impl BlockWriter for DbProvider { } } +impl MessagingProvider for DbProvider { + + fn get_send_from_block(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let block_num = db_tx.get::(SEND_FROM_BLOCK_KEY)?; + db_tx.commit()?; + Ok(block_num) + } + + fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(SEND_FROM_BLOCK_KEY, send_from_block)?; + Ok(()) + })? + } + + fn get_gather_from_block(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let block_num = db_tx.get::(GATHER_FROM_BLOCK_KEY)?; + db_tx.commit()?; + Ok(block_num) + } + + fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(GATHER_FROM_BLOCK_KEY, gather_from_block)?; + Ok(()) + })? + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index 104b1a9328..cbcfc4a05d 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -30,6 +30,7 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; +use crate::traits::messaging::MessagingProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -459,6 +460,7 @@ impl StateFactoryProvider for ForkedProvider { } } + impl BlockWriter for ForkedProvider { fn insert_block_with_states_and_receipts( &self, @@ -581,3 +583,22 @@ impl BlockEnvProvider for ForkedProvider { })) } } + +impl MessagingProvider for ForkedProvider { + + fn get_send_from_block(&self) -> ProviderResult> { + Ok(None) + } + + fn set_send_from_block(&self, _send_from_block: BlockNumber) -> ProviderResult<()> { + Ok(()) + } + + fn get_gather_from_block(&self) -> ProviderResult> { + Ok(None) + } + + fn set_gather_from_block(&self, _gather_from_block: BlockNumber) -> ProviderResult<()> { + Ok(()) + } +} diff --git a/crates/katana/storage/provider/src/providers/in_memory/cache.rs b/crates/katana/storage/provider/src/providers/in_memory/cache.rs index d5b1df7685..a2acfb9b59 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/cache.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/cache.rs @@ -88,6 +88,7 @@ pub struct CacheDb { pub(crate) transaction_hashes: HashMap, pub(crate) transaction_numbers: HashMap, pub(crate) transaction_block: HashMap, + pub(crate) messaging_info: HashMap, } impl CacheStateDb { @@ -120,6 +121,7 @@ impl CacheDb { transactions_executions: Vec::new(), latest_block_hash: Default::default(), latest_block_number: Default::default(), + messaging_info: Default::default(), } } } diff --git a/crates/katana/storage/provider/src/providers/in_memory/mod.rs b/crates/katana/storage/provider/src/providers/in_memory/mod.rs index 2fc8b7aac4..80e9b1042d 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/mod.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/mod.rs @@ -26,12 +26,14 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; +use crate::traits::messaging::MessagingProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ ReceiptProvider, TransactionProvider, TransactionStatusProvider, TransactionTraceProvider, TransactionsProviderExt, }; +use crate::traits::messaging::{SEND_FROM_BLOCK_KEY, GATHER_FROM_BLOCK_KEY}; use crate::ProviderResult; #[derive(Debug)] @@ -575,3 +577,23 @@ impl BlockEnvProvider for InMemoryProvider { })) } } + +impl MessagingProvider for InMemoryProvider { + fn get_send_from_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.get(&SEND_FROM_BLOCK_KEY).cloned()) + } + + fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.insert(SEND_FROM_BLOCK_KEY, send_from_block); + Ok(()) + } + + fn get_gather_from_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.get(&GATHER_FROM_BLOCK_KEY).cloned()) + } + + fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.insert(GATHER_FROM_BLOCK_KEY, gather_from_block); + Ok(()) + } +} diff --git a/crates/katana/storage/provider/src/traits/messaging.rs b/crates/katana/storage/provider/src/traits/messaging.rs new file mode 100644 index 0000000000..3873ba515b --- /dev/null +++ b/crates/katana/storage/provider/src/traits/messaging.rs @@ -0,0 +1,18 @@ +use crate::ProviderResult; +use katana_primitives::block::BlockNumber; + +pub const SEND_FROM_BLOCK_KEY: u64 = 1; +pub const GATHER_FROM_BLOCK_KEY: u64 = 2; + +#[auto_impl::auto_impl(&, Box, Arc)] +pub trait MessagingProvider: Send + Sync { + + /// Sets the send from block. + fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()>; + /// Returns the send from block. + fn get_send_from_block(&self) -> ProviderResult>; + /// Sets the gather from block. + fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()>; + /// Returns the gather from block. + fn get_gather_from_block(&self) -> ProviderResult>; +} diff --git a/crates/katana/storage/provider/src/traits/mod.rs b/crates/katana/storage/provider/src/traits/mod.rs index 762de20465..a221b98e8d 100644 --- a/crates/katana/storage/provider/src/traits/mod.rs +++ b/crates/katana/storage/provider/src/traits/mod.rs @@ -1,6 +1,7 @@ pub mod block; pub mod contract; pub mod env; +pub mod messaging; pub mod state; pub mod state_update; pub mod transaction; From 093ef215a072e11f193655a86328ccbc7dfcbdc6 Mon Sep 17 00:00:00 2001 From: bexan Date: Wed, 28 Aug 2024 16:37:02 +0200 Subject: [PATCH 2/8] cargo fmt --- .../core/src/service/messaging/service.rs | 19 +++++----- crates/katana/primitives/src/genesis/json.rs | 2 +- crates/katana/storage/provider/src/lib.rs | 8 ++--- .../storage/provider/src/providers/db/mod.rs | 36 +++++++++---------- .../provider/src/providers/fork/mod.rs | 2 -- .../provider/src/providers/in_memory/mod.rs | 3 +- .../storage/provider/src/traits/messaging.rs | 4 +-- 7 files changed, 35 insertions(+), 39 deletions(-) diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index e699bbb073..198fe9b2dc 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -10,8 +10,8 @@ use katana_primitives::block::BlockHashOrNumber; use katana_primitives::receipt::MessageToL1; use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; use katana_provider::traits::block::BlockNumberProvider; -use katana_provider::traits::transaction::ReceiptProvider; use katana_provider::traits::messaging::MessagingProvider; +use katana_provider::traits::transaction::ReceiptProvider; use tokio::time::{interval_at, Instant, Interval}; use tracing::{error, info}; @@ -52,9 +52,7 @@ impl MessagingService { let provider = backend.blockchain.provider(); let gather_from_block = match provider.get_gather_from_block() { Ok(Some(block)) => block, - Ok(None) => { - 0 - } + Ok(None) => 0, Err(_) => { panic!( "Messaging could not be initialized.\nVerify that the messaging target node \ @@ -64,9 +62,7 @@ impl MessagingService { }; let send_from_block = match provider.get_send_from_block() { Ok(Some(block)) => block, - Ok(None) => { - 0 - } + Ok(None) => 0, Err(_) => { panic!( "Messaging could not be initialized.\nVerify that the messaging target node \ @@ -235,7 +231,11 @@ impl Stream for MessagingService { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { pin.gather_from_block = last_block + 1; - let _ = pin.backend.blockchain.provider().set_gather_from_block(pin.gather_from_block); + let _ = pin + .backend + .blockchain + .provider() + .set_gather_from_block(pin.gather_from_block); return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, msg_count, @@ -261,7 +261,8 @@ impl Stream for MessagingService { // +1 to move to the next local block to check messages to be // sent on the settlement chain. pin.send_from_block += 1; - let _ = pin.backend.blockchain.provider().set_send_from_block(pin.send_from_block); + let _ = + pin.backend.blockchain.provider().set_send_from_block(pin.send_from_block); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); } Poll::Ready(Err(e)) => { diff --git a/crates/katana/primitives/src/genesis/json.rs b/crates/katana/primitives/src/genesis/json.rs index 38141480ff..925b5228b7 100644 --- a/crates/katana/primitives/src/genesis/json.rs +++ b/crates/katana/primitives/src/genesis/json.rs @@ -612,7 +612,7 @@ impl TryFrom for Genesis { gas_prices: value.gas_prices, state_root: value.state_root, parent_hash: value.parent_hash, - gather_from_block: value.gather_from_block + gather_from_block: value.gather_from_block, }) } } diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index fc10622a70..07a5a3e295 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -387,18 +387,18 @@ where Db: MessagingProvider, { fn get_send_from_block(&self) -> ProviderResult> { - self.provider.get_send_from_block() + self.provider.get_send_from_block() } fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { - self.provider.set_send_from_block(send_from_block) + self.provider.set_send_from_block(send_from_block) } fn get_gather_from_block(&self) -> ProviderResult> { - self.provider.get_gather_from_block() + self.provider.get_gather_from_block() } fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { - self.provider.set_gather_from_block(gather_from_block) + self.provider.set_gather_from_block(gather_from_block) } } diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index 4583f0c33d..b4ad13f190 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -36,14 +36,13 @@ use crate::traits::block::{ HeaderProvider, }; use crate::traits::env::BlockEnvProvider; -use crate::traits::messaging::MessagingProvider; +use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY}; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ ReceiptProvider, TransactionProvider, TransactionStatusProvider, TransactionTraceProvider, TransactionsProviderExt, }; -use crate::traits::messaging::{SEND_FROM_BLOCK_KEY, GATHER_FROM_BLOCK_KEY}; use crate::ProviderResult; /// A provider implementation that uses a persistent database as the backend. // TODO: remove the default generic type @@ -765,33 +764,32 @@ impl BlockWriter for DbProvider { } impl MessagingProvider for DbProvider { - fn get_send_from_block(&self) -> ProviderResult> { - let db_tx = self.0.tx()?; - let block_num = db_tx.get::(SEND_FROM_BLOCK_KEY)?; - db_tx.commit()?; - Ok(block_num) + let db_tx = self.0.tx()?; + let block_num = db_tx.get::(SEND_FROM_BLOCK_KEY)?; + db_tx.commit()?; + Ok(block_num) } fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { - self.0.update(|db_tx| { - db_tx.put::(SEND_FROM_BLOCK_KEY, send_from_block)?; - Ok(()) - })? + self.0.update(|db_tx| { + db_tx.put::(SEND_FROM_BLOCK_KEY, send_from_block)?; + Ok(()) + })? } fn get_gather_from_block(&self) -> ProviderResult> { - let db_tx = self.0.tx()?; - let block_num = db_tx.get::(GATHER_FROM_BLOCK_KEY)?; - db_tx.commit()?; - Ok(block_num) + let db_tx = self.0.tx()?; + let block_num = db_tx.get::(GATHER_FROM_BLOCK_KEY)?; + db_tx.commit()?; + Ok(block_num) } fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { - self.0.update(|db_tx| { - db_tx.put::(GATHER_FROM_BLOCK_KEY, gather_from_block)?; - Ok(()) - })? + self.0.update(|db_tx| { + db_tx.put::(GATHER_FROM_BLOCK_KEY, gather_from_block)?; + Ok(()) + })? } } diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index cbcfc4a05d..2e81ea821b 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -460,7 +460,6 @@ impl StateFactoryProvider for ForkedProvider { } } - impl BlockWriter for ForkedProvider { fn insert_block_with_states_and_receipts( &self, @@ -585,7 +584,6 @@ impl BlockEnvProvider for ForkedProvider { } impl MessagingProvider for ForkedProvider { - fn get_send_from_block(&self) -> ProviderResult> { Ok(None) } diff --git a/crates/katana/storage/provider/src/providers/in_memory/mod.rs b/crates/katana/storage/provider/src/providers/in_memory/mod.rs index 80e9b1042d..55058400e7 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/mod.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/mod.rs @@ -26,14 +26,13 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; -use crate::traits::messaging::MessagingProvider; +use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY}; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ ReceiptProvider, TransactionProvider, TransactionStatusProvider, TransactionTraceProvider, TransactionsProviderExt, }; -use crate::traits::messaging::{SEND_FROM_BLOCK_KEY, GATHER_FROM_BLOCK_KEY}; use crate::ProviderResult; #[derive(Debug)] diff --git a/crates/katana/storage/provider/src/traits/messaging.rs b/crates/katana/storage/provider/src/traits/messaging.rs index 3873ba515b..42d6dd22f6 100644 --- a/crates/katana/storage/provider/src/traits/messaging.rs +++ b/crates/katana/storage/provider/src/traits/messaging.rs @@ -1,12 +1,12 @@ -use crate::ProviderResult; use katana_primitives::block::BlockNumber; +use crate::ProviderResult; + pub const SEND_FROM_BLOCK_KEY: u64 = 1; pub const GATHER_FROM_BLOCK_KEY: u64 = 2; #[auto_impl::auto_impl(&, Box, Arc)] pub trait MessagingProvider: Send + Sync { - /// Sets the send from block. fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()>; /// Returns the send from block. From 23d8c3e247aa448ac14f90615291596e1378e2ce Mon Sep 17 00:00:00 2001 From: bexan Date: Wed, 28 Aug 2024 16:38:57 +0200 Subject: [PATCH 3/8] genesis: add missing gather_from_block --- crates/katana/primitives/src/genesis/json.rs | 2 ++ crates/katana/primitives/src/genesis/mod.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/crates/katana/primitives/src/genesis/json.rs b/crates/katana/primitives/src/genesis/json.rs index 925b5228b7..a8be28aa6d 100644 --- a/crates/katana/primitives/src/genesis/json.rs +++ b/crates/katana/primitives/src/genesis/json.rs @@ -1041,6 +1041,7 @@ mod tests { let expected_genesis = Genesis { classes: expected_classes, number: 0, + gather_from_block: 0, fee_token: expected_fee_token, allocations: expected_allocations, timestamp: 5123512314u64, @@ -1181,6 +1182,7 @@ mod tests { classes, allocations, number: 0, + gather_from_block: 0, timestamp: 5123512314u64, state_root: felt!("0x99"), parent_hash: felt!("0x999"), diff --git a/crates/katana/primitives/src/genesis/mod.rs b/crates/katana/primitives/src/genesis/mod.rs index 1e931ebb12..53284b43e6 100644 --- a/crates/katana/primitives/src/genesis/mod.rs +++ b/crates/katana/primitives/src/genesis/mod.rs @@ -428,6 +428,7 @@ mod tests { fee_token: fee_token.clone(), allocations: BTreeMap::from(allocations.clone()), number: 0, + gather_from_block: 0, timestamp: 5123512314u64, state_root: felt!("0x99"), parent_hash: felt!("0x999"), From 90078eccc5f736cadbabe6a167bbbac9b9e670a8 Mon Sep 17 00:00:00 2001 From: bexan Date: Tue, 3 Sep 2024 17:10:05 +0200 Subject: [PATCH 4/8] service: add max_block et chunk_size in config file --- .../katana/core/src/service/messaging/mod.rs | 4 +++ .../core/src/service/messaging/service.rs | 35 ++++++++++--------- .../core/src/service/messaging/starknet.rs | 8 ++--- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/crates/katana/core/src/service/messaging/mod.rs b/crates/katana/core/src/service/messaging/mod.rs index cd064f44be..dac9901e6b 100644 --- a/crates/katana/core/src/service/messaging/mod.rs +++ b/crates/katana/core/src/service/messaging/mod.rs @@ -112,6 +112,10 @@ pub struct MessagingConfig { pub interval: u64, /// The block on settlement chain from where Katana will start fetching messages. pub from_block: u64, + /// The maximum number of blocks in gather messages + pub max_block: u64, + /// The size of events returned by get_events call + pub chunk_size: u64, } impl MessagingConfig { diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 198fe9b2dc..5d664227ca 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -39,6 +39,10 @@ pub struct MessagingService { send_from_block: u64, /// The message sending future. msg_send_fut: Option, + /// The maximum block number to gather messages from. + max_block: u64, + /// The chunk size of messages to gather. + chunk_size: u64, } impl MessagingService { @@ -54,31 +58,28 @@ impl MessagingService { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { - panic!( - "Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n", - ) + anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n") } }; let send_from_block = match provider.get_send_from_block() { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { - panic!( - "Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n", - ) + anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n") } }; + let max_block = config.max_block; + let chunk_size = config.chunk_size; + let interval = interval_from_seconds(config.interval); let messenger = match MessengerMode::from_config(config).await { Ok(m) => Arc::new(m), Err(_) => { - panic!( - "Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n", - ) + anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n") } }; @@ -91,6 +92,8 @@ impl MessagingService { send_from_block, msg_gather_fut: None, msg_send_fut: None, + max_block, + chunk_size, }) } @@ -99,10 +102,9 @@ impl MessagingService { pool: TxPool, backend: Arc>, from_block: u64, + max_block: u64, + chunk_size: u64, ) -> MessengerResult<(u64, usize)> { - // 200 avoids any possible rejection from RPC with possibly lot's of messages. - // TODO: May this be configurable? - let max_block = 200; match messenger.as_ref() { MessengerMode::Ethereum(inner) => { @@ -125,7 +127,7 @@ impl MessagingService { #[cfg(feature = "starknet-messaging")] MessengerMode::Starknet(inner) => { let (block_num, txs) = - inner.gather_messages(from_block, max_block, backend.chain_id).await?; + inner.gather_messages(from_block, max_block, chunk_size, backend.chain_id).await?; let txs_count = txs.len(); txs.into_iter().for_each(|tx| { @@ -210,6 +212,7 @@ impl Stream for MessagingService { pin.pool.clone(), pin.backend.clone(), pin.gather_from_block, + pin.max_block, ))); } diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 7c3f2bb3db..91a3a8b334 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -64,7 +64,8 @@ impl StarknetMessaging { &self, from_block: BlockId, to_block: BlockId, - ) -> Result> { + chunk_size: u64, + ) -> Result>> { trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); let mut events = vec![]; @@ -77,8 +78,6 @@ impl StarknetMessaging { keys: None, }; - // TODO: this chunk_size may also come from configuration? - let chunk_size = 200; let mut continuation_token: Option = None; loop { @@ -165,6 +164,7 @@ impl Messenger for StarknetMessaging { &self, from_block: u64, max_blocks: u64, + chunk_size: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { let chain_latest_block: u64 = match self.provider.block_number().await { @@ -193,7 +193,7 @@ impl Messenger for StarknetMessaging { let mut l1_handler_txs: Vec = vec![]; - self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) + self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block), chunk_size) .await .map_err(|_| Error::SendError) .unwrap() From ca9277b566613b9aeb0a6379d2a25cb31cbc26a1 Mon Sep 17 00:00:00 2001 From: bexan Date: Wed, 18 Sep 2024 17:30:21 +0200 Subject: [PATCH 5/8] store nonce & message hash --- Cargo.lock | 1 + .../katana/core/src/service/block_producer.rs | 55 +++++++++++++++---- .../core/src/service/messaging/service.rs | 1 + .../core/src/service/messaging/starknet.rs | 16 ++++-- crates/katana/primitives/src/contract.rs | 3 + crates/katana/storage/db/Cargo.toml | 2 + crates/katana/storage/db/src/tables.rs | 27 +++++++-- crates/katana/storage/provider/src/lib.rs | 26 ++++++++- .../storage/provider/src/providers/db/mod.rs | 46 +++++++++++++++- .../provider/src/providers/fork/mod.rs | 27 ++++++++- .../provider/src/providers/in_memory/cache.rs | 7 +++ .../provider/src/providers/in_memory/mod.rs | 31 ++++++++++- .../storage/provider/src/traits/messaging.rs | 15 +++++ 13 files changed, 233 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb7394efc4..c0babbb6f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7920,6 +7920,7 @@ dependencies = [ name = "katana-db" version = "1.0.0-alpha.12" dependencies = [ + "alloy-primitives", "anyhow", "criterion", "dojo-metrics", diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 618d0f2c03..02d386f14e 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -13,9 +13,11 @@ use katana_pool::validation::stateful::TxValidator; use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader}; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash}; +use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash, Tx}; use katana_primitives::version::CURRENT_STARKNET_VERSION; +use katana_primitives::FieldElement; use katana_provider::error::ProviderError; +use katana_provider::traits::messaging::MessagingProvider; use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider}; use katana_provider::traits::env::BlockEnvProvider; use katana_provider::traits::state::StateFactoryProvider; @@ -296,9 +298,11 @@ impl IntervalBlockProducer { fn execute_transactions( executor: PendingExecutor, transactions: Vec, + backend: Arc>, ) -> TxExecutionResult { - let executor = &mut executor.write(); + let provider = backend.blockchain.provider(); + let executor = &mut executor.write(); let new_txs_count = transactions.len(); executor.execute_transactions(transactions)?; @@ -309,13 +313,43 @@ impl IntervalBlockProducer { let results = txs .iter() .skip(total_txs - new_txs_count) - .filter_map(|(tx, res)| match res { - ExecutionResult::Failed { .. } => None, - ExecutionResult::Success { receipt, trace, .. } => Some(TxWithOutcome { - tx: tx.clone(), - receipt: receipt.clone(), - exec_info: trace.clone(), - }), + .filter_map(|(tx, res)| { + let tx_ref: &Tx = &tx.transaction; + + trace!(target: LOG_TARGET, "Executed transaction: {:?}", tx); + let _ = match tx_ref { + Tx::L1Handler(l1_tx) => { + // get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + match FieldElement::from_bytes_be(&message_hash_bytes) { + Ok(message_hash) => { + match provider.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => provider.set_gather_message_nonce(nonce), + Ok(None) => { + Ok(()) + }, + Err(_e) => { + Ok(()) + } + } + }, + Err(_e) => { + Ok(()) + } + } + }, + _ => Ok({}) + }; + + match res { + ExecutionResult::Failed { .. } => None, + ExecutionResult::Success { receipt, trace, .. } => Some(TxWithOutcome { + tx: tx.clone(), + receipt: receipt.clone(), + exec_info: trace.clone(), + }), + } }) .collect::>(); @@ -399,10 +433,11 @@ impl Stream for IntervalBlockProducer { let transactions: Vec = std::mem::take(&mut pin.queued).into_iter().flatten().collect(); + let backend = pin.backend.clone(); let fut = pin .blocking_task_spawner - .spawn(|| Self::execute_transactions(executor, transactions)); + .spawn(|| Self::execute_transactions(executor, transactions, backend)); pin.ongoing_execution = Some(Box::pin(fut)); } diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 5d664227ca..86471a2d9d 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -213,6 +213,7 @@ impl Stream for MessagingService { pin.backend.clone(), pin.gather_from_block, pin.max_block, + pin.chunk_size, ))); } diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 91a3a8b334..8ad7bc8926 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -204,10 +204,14 @@ impl Messenger for StarknetMessaging { event = ?e, "Converting event into L1HandlerTx." ); - - if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { - l1_handler_txs.push(tx) - } + block_events.iter().for_each(|e| { + if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { + let last_processed_nonce = self.provider.get_gather_message_nonce().unwrap_or(0.into()); + if tx.nonce > last_processed_nonce { + l1_handler_txs.push(tx) + } + } + }) }); Ok((to_block, l1_handler_txs)) @@ -236,6 +240,10 @@ impl Messenger for StarknetMessaging { } self.send_hashes(hashes.clone()).await?; + for (index, hash) in hashes.iter().enumerate() { + self.send_hashes(std::slice::from_ref(hash)).await?; + self.provider.set_send_from_index(*hash, index as u64).await?; + } Ok(hashes) } diff --git a/crates/katana/primitives/src/contract.rs b/crates/katana/primitives/src/contract.rs index 4473d4169b..cfc589cf13 100644 --- a/crates/katana/primitives/src/contract.rs +++ b/crates/katana/primitives/src/contract.rs @@ -14,6 +14,9 @@ pub type StorageValue = Felt; /// Represents the type for a contract nonce. pub type Nonce = Felt; +/// Represents the type for a message hash. +pub type MessageHash = FieldElement; + /// Represents a contract address. #[derive(Default, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, Debug, Deref)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] diff --git a/crates/katana/storage/db/Cargo.toml b/crates/katana/storage/db/Cargo.toml index ce6aaf883b..081afb1547 100644 --- a/crates/katana/storage/db/Cargo.toml +++ b/crates/katana/storage/db/Cargo.toml @@ -21,6 +21,8 @@ tempfile = { workspace = true, optional = true } thiserror.workspace = true tracing.workspace = true +alloy-primitives.workspace = true + # codecs [dependencies.postcard] default-features = false diff --git a/crates/katana/storage/db/src/tables.rs b/crates/katana/storage/db/src/tables.rs index 39ddcdb4fb..d99ee2ee59 100644 --- a/crates/katana/storage/db/src/tables.rs +++ b/crates/katana/storage/db/src/tables.rs @@ -1,6 +1,6 @@ use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::{ContractAddress, GenericContractInfo, StorageKey}; +use katana_primitives::contract::{ContractAddress, GenericContractInfo, Nonce, StorageKey}; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; use katana_primitives::transaction::{Tx, TxHash, TxNumber}; @@ -44,7 +44,7 @@ pub enum TableType { DupSort, } -pub const NUM_TABLES: usize = 24; +pub const NUM_TABLES: usize = 27; /// Macro to declare `libmdbx` tables. #[macro_export] @@ -168,7 +168,10 @@ define_tables_enum! {[ (ClassChangeHistory, TableType::DupSort), (StorageChangeHistory, TableType::DupSort), (StorageChangeSet, TableType::Table), - (MessagingInfo, TableType::Table) + (MessagingInfo, TableType::Table), + (MessagingNonceInfo, TableType::Table), + (MessagingMessageNonceMapping, TableType::Table), + (MessagingIndexInfo, TableType::Table), ]} tables! { @@ -227,7 +230,17 @@ tables! { StorageChangeHistory: (BlockNumber, ContractStorageKey) => ContractStorageEntry, /// Stores the block number related to messaging service - MessagingInfo: (u64) => BlockNumber + MessagingInfo: (u64) => BlockNumber, + + /// Stores the nonce related to messaging service + MessagingNonceInfo: (u64) => Nonce, + + /// Map a message hash to a message nonce + MessagingMessageNonceMapping: (TxHash) => Nonce, + + /// Stores the index of the messaging service + MessagingIndexInfo: (u64) => u64, + } #[cfg(test)] @@ -262,6 +275,9 @@ mod tests { assert_eq!(Tables::ALL[21].name(), StorageChangeHistory::NAME); assert_eq!(Tables::ALL[22].name(), StorageChangeSet::NAME); assert_eq!(Tables::ALL[23].name(), MessagingInfo::NAME); + assert_eq!(Tables::ALL[24].name(), MessagingNonceInfo::NAME); + assert_eq!(Tables::ALL[25].name(), MessagingMessageNonceMapping::NAME); + assert_eq!(Tables::ALL[26].name(), MessagingMessageNonceMapping::NAME); assert_eq!(Tables::Headers.table_type(), TableType::Table); assert_eq!(Tables::BlockHashes.table_type(), TableType::Table); @@ -287,6 +303,9 @@ mod tests { assert_eq!(Tables::StorageChangeHistory.table_type(), TableType::DupSort); assert_eq!(Tables::StorageChangeSet.table_type(), TableType::Table); assert_eq!(Tables::MessagingInfo.table_type(), TableType::Table); + assert_eq!(Tables::MessagingNonceInfo.table_type(), TableType::Table); + assert_eq!(Tables::MessagingMessageNonMapping.table_type(), TableType::Table); + assert_eq!(Tables::MessagingIndexInfo.table_type(), TableType::Table); } use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index 07a5a3e295..a4273ba6d7 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -6,7 +6,7 @@ use katana_primitives::block::{ SealedBlockWithStatus, }; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::{ContractAddress, StorageKey, StorageValue}; +use katana_primitives::contract::{ContractAddress, MessageHash, Nonce, StorageKey, StorageValue}; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; @@ -401,4 +401,28 @@ where fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { self.provider.set_gather_from_block(gather_from_block) } + + fn get_gather_message_nonce(&self) -> ProviderResult> { + self.provider.get_gather_message_nonce() + } + + fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.provider.set_gather_message_nonce(nonce) + } + + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + self.provider.get_nonce_from_message_hash(message_hash) + } + + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.provider.set_nonce_from_message_hash(message_hash, nonce) + } + + fn get_send_from_index(&self) -> ProviderResult> { + self.provider.get_send_from_index() + } + + fn set_send_from_index(&self, _send_from_index: u64) -> ProviderResult<()> { + self.provider.get_send_from_index(send_from_index) + } } diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index b4ad13f190..bc3706995d 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -21,7 +21,7 @@ use katana_primitives::block::{ }; use katana_primitives::class::{ClassHash, CompiledClassHash}; use katana_primitives::contract::{ - ContractAddress, GenericContractInfo, Nonce, StorageKey, StorageValue, + ContractAddress, GenericContractInfo, MessageHash, Nonce, StorageKey, StorageValue, }; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; @@ -36,7 +36,7 @@ use crate::traits::block::{ HeaderProvider, }; use crate::traits::env::BlockEnvProvider; -use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY}; +use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY, GATHER_FROM_NONCE_KEY, SEND_FROM_INDEX_KEY}; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -791,6 +791,48 @@ impl MessagingProvider for DbProvider { Ok(()) })? } + + fn get_gather_message_nonce(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let nonce = db_tx.get::(GATHER_FROM_NONCE_KEY)?; + db_tx.commit()?; + Ok(nonce) + } + + fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(GATHER_FROM_NONCE_KEY, nonce)?; + Ok(()) + })? + } + + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + let db_tx = self.0.tx()?; + let nonce = db_tx.get::(message_hash)?; + db_tx.commit()?; + Ok(nonce) + } + + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(message_hash, nonce)?; + Ok(()) + })? + } + + fn get_send_from_index(&self) -> ProviderResult> { + let db_tx = self.0.tx()?; + let index = db_tx.get::(SEND_FROM_INDEX_KEY)?; + db_tx.commit()?; + Ok(index) + } + + fn set_send_from_index(&self, send_from_index: u64) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(SEND_FROM_INDEX_KEY, send_from_index)?; + Ok(()) + })? + } } #[cfg(test)] diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index 2e81ea821b..62cb9700ab 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -10,7 +10,7 @@ use katana_primitives::block::{ SealedBlockWithStatus, }; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::ContractAddress; +use katana_primitives::contract::{ContractAddress, MessageHash, Nonce}; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; @@ -599,4 +599,29 @@ impl MessagingProvider for ForkedProvider { fn set_gather_from_block(&self, _gather_from_block: BlockNumber) -> ProviderResult<()> { Ok(()) } + + fn get_gather_message_nonce(&self) -> ProviderResult> { + Ok(None) + } + + fn set_gather_message_nonce(&self, _nonce: Nonce) -> ProviderResult<()> { + Ok(()) + } + + fn get_nonce_from_message_hash(&self, _message_hash: MessageHash) -> ProviderResult> { + Ok(None) + } + + fn set_nonce_from_message_hash(&self, _message_hash: MessageHash, _nonce: Nonce) -> ProviderResult<()> { + Ok(()) + } + + fn get_send_from_index(&self) -> ProviderResult> { + Ok(None) + } + + fn set_send_from_index(&self, _send_from_index: u64) -> ProviderResult<()> { + Ok(()) + } + } diff --git a/crates/katana/storage/provider/src/providers/in_memory/cache.rs b/crates/katana/storage/provider/src/providers/in_memory/cache.rs index a2acfb9b59..c4728ceaaf 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/cache.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/cache.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use katana_db::models::block::StoredBlockBodyIndices; use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; +use katana_primitives::contract::Nonce; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; use katana_primitives::contract::{ContractAddress, GenericContractInfo, StorageKey, StorageValue}; use katana_primitives::receipt::Receipt; @@ -89,6 +90,9 @@ pub struct CacheDb { pub(crate) transaction_numbers: HashMap, pub(crate) transaction_block: HashMap, pub(crate) messaging_info: HashMap, + pub(crate) messaging_nonce_info: HashMap, + pub(crate) messaging_message_nonce_mapping: HashMap, + pub(crate) messaging_index_info: HashMap, } impl CacheStateDb { @@ -122,6 +126,9 @@ impl CacheDb { latest_block_hash: Default::default(), latest_block_number: Default::default(), messaging_info: Default::default(), + messaging_nonce_info: Default::default(), + messaging_message_nonce_mapping: Default::default(), + messaging_index_info: Default::default(), } } } diff --git a/crates/katana/storage/provider/src/providers/in_memory/mod.rs b/crates/katana/storage/provider/src/providers/in_memory/mod.rs index 55058400e7..4c912f60eb 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/mod.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/mod.rs @@ -10,7 +10,7 @@ use katana_primitives::block::{ SealedBlockWithStatus, }; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; -use katana_primitives::contract::ContractAddress; +use katana_primitives::contract::{ContractAddress, MessageHash, Nonce}; use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; @@ -26,7 +26,7 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; -use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY}; +use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY, GATHER_FROM_NONCE_KEY, SEND_FROM_INDEX_KEY}; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -595,4 +595,31 @@ impl MessagingProvider for InMemoryProvider { self.storage.write().messaging_info.insert(GATHER_FROM_BLOCK_KEY, gather_from_block); Ok(()) } + + fn get_gather_message_nonce(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_nonce_info.get(&GATHER_FROM_NONCE_KEY).cloned()) + } + + fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_nonce_info.insert(GATHER_FROM_NONCE_KEY, nonce); + Ok(()) + } + + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) + } + + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_message_nonce_mapping.insert(message_hash, nonce); + Ok(()) + } + + fn get_send_from_index(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_index_info.get(&SEND_FROM_INDEX_KEY).cloned()) + } + + fn set_send_from_index(&self, index: u64) -> ProviderResult<()> { + self.storage.write().messaging_index_info.insert(SEND_FROM_INDEX_KEY, index); + Ok(()) + } } diff --git a/crates/katana/storage/provider/src/traits/messaging.rs b/crates/katana/storage/provider/src/traits/messaging.rs index 42d6dd22f6..a556a4b8f4 100644 --- a/crates/katana/storage/provider/src/traits/messaging.rs +++ b/crates/katana/storage/provider/src/traits/messaging.rs @@ -1,9 +1,12 @@ use katana_primitives::block::BlockNumber; +use katana_primitives::contract::{Nonce, MessageHash}; use crate::ProviderResult; pub const SEND_FROM_BLOCK_KEY: u64 = 1; pub const GATHER_FROM_BLOCK_KEY: u64 = 2; +pub const GATHER_FROM_NONCE_KEY: u64 = 3; +pub const SEND_FROM_INDEX_KEY: u64 = 4; #[auto_impl::auto_impl(&, Box, Arc)] pub trait MessagingProvider: Send + Sync { @@ -15,4 +18,16 @@ pub trait MessagingProvider: Send + Sync { fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()>; /// Returns the gather from block. fn get_gather_from_block(&self) -> ProviderResult>; + /// Sets the gather from nonce. + fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()>; + /// Returns the gather from nonce. + fn get_gather_message_nonce(&self) -> ProviderResult>; + /// Sets the nonce by message_hash. + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()>; + /// Returns the nonce by message_hash. + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult>; + /// Sets the send from index. + fn set_send_from_index(&self, index: u64) -> ProviderResult<()>; + /// Returns the send from index. + fn get_send_from_index(&self) -> ProviderResult>; } From fe2c75b56d921462538a637a7aa638468cb13506 Mon Sep 17 00:00:00 2001 From: Yannick Bensacq Date: Thu, 19 Sep 2024 18:30:22 +0200 Subject: [PATCH 6/8] feat(katana): implement struct for in memory provider --- crates/katana/core/src/backend/storage.rs | 6 +-- .../katana/core/src/service/block_producer.rs | 34 +++++--------- .../core/src/service/messaging/service.rs | 24 ++++++---- .../core/src/service/messaging/starknet.rs | 10 +++-- crates/katana/primitives/src/contract.rs | 2 +- crates/katana/primitives/src/genesis/json.rs | 4 +- crates/katana/primitives/src/genesis/mod.rs | 4 +- .../katana/storage/db/src/models/storage.rs | 37 ++++++++++++++++ crates/katana/storage/db/src/tables.rs | 29 ++++++------ crates/katana/storage/provider/src/lib.rs | 10 ++--- .../storage/provider/src/providers/db/mod.rs | 44 +++++++++---------- .../provider/src/providers/fork/mod.rs | 37 +++++++++------- .../provider/src/providers/in_memory/cache.rs | 14 +++--- .../provider/src/providers/in_memory/mod.rs | 36 +++++++-------- .../storage/provider/src/traits/messaging.rs | 7 +-- 15 files changed, 170 insertions(+), 128 deletions(-) diff --git a/crates/katana/core/src/backend/storage.rs b/crates/katana/core/src/backend/storage.rs index f66521802c..444604c39c 100644 --- a/crates/katana/core/src/backend/storage.rs +++ b/crates/katana/core/src/backend/storage.rs @@ -7,7 +7,7 @@ use katana_provider::providers::db::DbProvider; use katana_provider::traits::block::{BlockProvider, BlockWriter}; use katana_provider::traits::contract::ContractClassWriter; use katana_provider::traits::env::BlockEnvProvider; -use katana_provider::traits::messaging::MessagingProvider; +use katana_provider::traits::messaging::MessagingCheckpointProvider; use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter}; use katana_provider::traits::state_update::StateUpdateProvider; use katana_provider::traits::transaction::{ @@ -30,7 +30,7 @@ pub trait Database: + ContractClassWriter + StateFactoryProvider + BlockEnvProvider - + MessagingProvider + + MessagingCheckpointProvider + 'static + Send + Sync @@ -52,7 +52,7 @@ impl Database for T where + ContractClassWriter + StateFactoryProvider + BlockEnvProvider - + MessagingProvider + + MessagingCheckpointProvider + 'static + Send + Sync diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 02d386f14e..c4c7349b0e 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; +use crate::backend::Backend; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; @@ -13,13 +14,13 @@ use katana_pool::validation::stateful::TxValidator; use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader}; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash, Tx}; +use katana_primitives::transaction::{ExecutableTxWithHash, Tx, TxHash, TxWithHash}; use katana_primitives::version::CURRENT_STARKNET_VERSION; -use katana_primitives::FieldElement; +use katana_primitives::Felt; use katana_provider::error::ProviderError; -use katana_provider::traits::messaging::MessagingProvider; use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider}; use katana_provider::traits::env::BlockEnvProvider; +use katana_provider::traits::messaging::MessagingCheckpointProvider; use katana_provider::traits::state::StateFactoryProvider; use katana_tasks::{BlockingTaskPool, BlockingTaskResult}; use parking_lot::lock_api::RawMutex; @@ -27,8 +28,6 @@ use parking_lot::{Mutex, RwLock}; use tokio::time::{interval_at, Instant, Interval}; use tracing::{error, info, trace, warn}; -use crate::backend::Backend; - pub(crate) const LOG_TARGET: &str = "miner"; #[derive(Debug, thiserror::Error)] @@ -322,24 +321,15 @@ impl IntervalBlockProducer { // get stored nonce from message hash let message_hash_bytes = l1_tx.message_hash; let message_hash_bytes: [u8; 32] = *message_hash_bytes; - match FieldElement::from_bytes_be(&message_hash_bytes) { - Ok(message_hash) => { - match provider.get_nonce_from_message_hash(message_hash) { - Ok(Some(nonce)) => provider.set_gather_message_nonce(nonce), - Ok(None) => { - Ok(()) - }, - Err(_e) => { - Ok(()) - } - } - }, - Err(_e) => { - Ok(()) - } + + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + match provider.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => provider.set_gather_message_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), } - }, - _ => Ok({}) + } + _ => Ok(()), }; match res { diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 86471a2d9d..bd304e7231 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -58,16 +58,20 @@ impl MessagingService { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { - anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n") + anyhow::bail!( + "Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n" + ) } }; let send_from_block = match provider.get_send_from_block() { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { - anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n") + anyhow::bail!( + "Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n" + ) } }; @@ -78,8 +82,10 @@ impl MessagingService { let messenger = match MessengerMode::from_config(config).await { Ok(m) => Arc::new(m), Err(_) => { - anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n") + anyhow::bail!( + "Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n" + ) } }; @@ -105,7 +111,6 @@ impl MessagingService { max_block: u64, chunk_size: u64, ) -> MessengerResult<(u64, usize)> { - match messenger.as_ref() { MessengerMode::Ethereum(inner) => { let (block_num, txs) = @@ -126,8 +131,9 @@ impl MessagingService { #[cfg(feature = "starknet-messaging")] MessengerMode::Starknet(inner) => { - let (block_num, txs) = - inner.gather_messages(from_block, max_block, chunk_size, backend.chain_id).await?; + let (block_num, txs) = inner + .gather_messages(from_block, max_block, chunk_size, backend.chain_id) + .await?; let txs_count = txs.len(); txs.into_iter().for_each(|tx| { diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 8ad7bc8926..2d725f8ee1 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -206,7 +206,8 @@ impl Messenger for StarknetMessaging { ); block_events.iter().for_each(|e| { if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { - let last_processed_nonce = self.provider.get_gather_message_nonce().unwrap_or(0.into()); + let last_processed_nonce = + self.provider.get_gather_message_nonce().unwrap_or(0.into()); if tx.nonce > last_processed_nonce { l1_handler_txs.push(tx) } @@ -239,12 +240,15 @@ impl Messenger for StarknetMessaging { }; } - self.send_hashes(hashes.clone()).await?; for (index, hash) in hashes.iter().enumerate() { + let stored_index = self.provider.get_send_from_index(); self.send_hashes(std::slice::from_ref(hash)).await?; - self.provider.set_send_from_index(*hash, index as u64).await?; + self.provider.set_send_from_index((index as u64) + 1).await?; } + // reset the index + self.provider.set_send_from_index(0).await?; + Ok(hashes) } } diff --git a/crates/katana/primitives/src/contract.rs b/crates/katana/primitives/src/contract.rs index cfc589cf13..72ebaf180b 100644 --- a/crates/katana/primitives/src/contract.rs +++ b/crates/katana/primitives/src/contract.rs @@ -15,7 +15,7 @@ pub type StorageValue = Felt; pub type Nonce = Felt; /// Represents the type for a message hash. -pub type MessageHash = FieldElement; +pub type MessageHash = Felt; /// Represents a contract address. #[derive(Default, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, Debug, Deref)] diff --git a/crates/katana/primitives/src/genesis/json.rs b/crates/katana/primitives/src/genesis/json.rs index a8be28aa6d..1edfddfc6e 100644 --- a/crates/katana/primitives/src/genesis/json.rs +++ b/crates/katana/primitives/src/genesis/json.rs @@ -270,7 +270,7 @@ pub struct GenesisJson { pub accounts: HashMap, #[serde(default)] pub contracts: HashMap, - pub gather_from_block: BlockNumber, + pub settlement_block_number: BlockNumber, } impl GenesisJson { @@ -612,7 +612,7 @@ impl TryFrom for Genesis { gas_prices: value.gas_prices, state_root: value.state_root, parent_hash: value.parent_hash, - gather_from_block: value.gather_from_block, + settlement_block_number: value.settlement_block_number, }) } } diff --git a/crates/katana/primitives/src/genesis/mod.rs b/crates/katana/primitives/src/genesis/mod.rs index 53284b43e6..50f70beff5 100644 --- a/crates/katana/primitives/src/genesis/mod.rs +++ b/crates/katana/primitives/src/genesis/mod.rs @@ -106,7 +106,7 @@ pub struct Genesis { /// The genesis contract allocations. pub allocations: BTreeMap, /// The block on settlement chain from where Katana will start fetching messages. - pub gather_from_block: BlockNumber, + pub settlement_block_number: BlockNumber, } impl Genesis { @@ -311,7 +311,7 @@ impl Default for Genesis { allocations: BTreeMap::new(), fee_token, universal_deployer: Some(universal_deployer), - gather_from_block: 0, + settlement_block_number: 0, } } } diff --git a/crates/katana/storage/db/src/models/storage.rs b/crates/katana/storage/db/src/models/storage.rs index b6264a481f..0317116bcb 100644 --- a/crates/katana/storage/db/src/models/storage.rs +++ b/crates/katana/storage/db/src/models/storage.rs @@ -83,6 +83,43 @@ impl Decompress for ContractStorageEntry { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MessagingCheckpointId { + SendBlock, + SendIndex, + GatherBlock, + GatherNonce, +} + + +impl Encode for MessagingCheckpointId { + type Encoded = [u8; 1]; + fn encode(self) -> Self::Encoded { + let mut buf = [0u8; 1]; + buf[0] = match self { + MessagingCheckpointId::SendBlock => 1, + MessagingCheckpointId::SendIndex => 2, + MessagingCheckpointId::GatherBlock => 3, + MessagingCheckpointId::GatherNonce => 4, + }; + buf + } +} + +impl Decode for MessagingCheckpointId { + fn decode>(bytes: B) -> Result { + let bytes = bytes.as_ref(); + match bytes[0] { + 1 => Ok(MessagingCheckpointId::SendBlock), + 2 => Ok(MessagingCheckpointId::SendIndex), + 3 => Ok(MessagingCheckpointId::GatherBlock), + 4 => Ok(MessagingCheckpointId::GatherNonce), + _ => Err(CodecError::Decode("Invalid MessagingCheckpointId".into())), + } + } +} + + #[cfg(test)] mod tests { use starknet::macros::felt; diff --git a/crates/katana/storage/db/src/tables.rs b/crates/katana/storage/db/src/tables.rs index d99ee2ee59..b1a5d868ba 100644 --- a/crates/katana/storage/db/src/tables.rs +++ b/crates/katana/storage/db/src/tables.rs @@ -9,7 +9,7 @@ use crate::codecs::{Compress, Decode, Decompress, Encode}; use crate::models::block::StoredBlockBodyIndices; use crate::models::contract::{ContractClassChange, ContractInfoChangeList, ContractNonceChange}; use crate::models::list::BlockList; -use crate::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; +use crate::models::storage::{ContractStorageEntry, ContractStorageKey, MessagingCheckpointId, StorageEntry}; pub trait Key: Encode + Decode + Clone + std::fmt::Debug {} pub trait Value: Compress + Decompress + std::fmt::Debug {} @@ -168,10 +168,10 @@ define_tables_enum! {[ (ClassChangeHistory, TableType::DupSort), (StorageChangeHistory, TableType::DupSort), (StorageChangeSet, TableType::Table), - (MessagingInfo, TableType::Table), - (MessagingNonceInfo, TableType::Table), + (MessagingCheckpointBlock, TableType::Table), + (MessagingCheckpointNonce, TableType::Table), (MessagingMessageNonceMapping, TableType::Table), - (MessagingIndexInfo, TableType::Table), + (MessagingCheckpointIndex, TableType::Table) ]} tables! { @@ -230,16 +230,16 @@ tables! { StorageChangeHistory: (BlockNumber, ContractStorageKey) => ContractStorageEntry, /// Stores the block number related to messaging service - MessagingInfo: (u64) => BlockNumber, + MessagingCheckpointBlock: (MessagingCheckpointId) => BlockNumber, /// Stores the nonce related to messaging service - MessagingNonceInfo: (u64) => Nonce, + MessagingCheckpointNonce: (MessagingCheckpointId) => Nonce, /// Map a message hash to a message nonce MessagingMessageNonceMapping: (TxHash) => Nonce, /// Stores the index of the messaging service - MessagingIndexInfo: (u64) => u64, + MessagingCheckpointIndex: (MessagingCheckpointId) => u64 } @@ -274,10 +274,10 @@ mod tests { assert_eq!(Tables::ALL[20].name(), ClassChangeHistory::NAME); assert_eq!(Tables::ALL[21].name(), StorageChangeHistory::NAME); assert_eq!(Tables::ALL[22].name(), StorageChangeSet::NAME); - assert_eq!(Tables::ALL[23].name(), MessagingInfo::NAME); - assert_eq!(Tables::ALL[24].name(), MessagingNonceInfo::NAME); + assert_eq!(Tables::ALL[23].name(), MessagingCheckpointBlock::NAME); + assert_eq!(Tables::ALL[24].name(), MessagingCheckpointNonce::NAME); assert_eq!(Tables::ALL[25].name(), MessagingMessageNonceMapping::NAME); - assert_eq!(Tables::ALL[26].name(), MessagingMessageNonceMapping::NAME); + assert_eq!(Tables::ALL[26].name(), MessagingCheckpointIndex::NAME); assert_eq!(Tables::Headers.table_type(), TableType::Table); assert_eq!(Tables::BlockHashes.table_type(), TableType::Table); @@ -302,10 +302,10 @@ mod tests { assert_eq!(Tables::ClassChangeHistory.table_type(), TableType::DupSort); assert_eq!(Tables::StorageChangeHistory.table_type(), TableType::DupSort); assert_eq!(Tables::StorageChangeSet.table_type(), TableType::Table); - assert_eq!(Tables::MessagingInfo.table_type(), TableType::Table); - assert_eq!(Tables::MessagingNonceInfo.table_type(), TableType::Table); - assert_eq!(Tables::MessagingMessageNonMapping.table_type(), TableType::Table); - assert_eq!(Tables::MessagingIndexInfo.table_type(), TableType::Table); + assert_eq!(Tables::MessagingCheckpointBlock.table_type(), TableType::Table); + assert_eq!(Tables::MessagingCheckpointNonce.table_type(), TableType::Table); + assert_eq!(Tables::MessagingMessageNonceMapping.table_type(), TableType::Table); + assert_eq!(Tables::MessagingCheckpointIndex.table_type(), TableType::Table); } use katana_primitives::block::{BlockHash, BlockNumber, FinalityStatus, Header}; @@ -325,6 +325,7 @@ mod tests { }; use crate::models::list::BlockList; use crate::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; + use crate::tables::Tables::{MessagingCheckpointBlock, MessagingCheckpointNonce}; macro_rules! assert_key_encode_decode { { $( ($name:ty, $key:expr) ),* } => { diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index a4273ba6d7..37e6166de1 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -16,7 +16,7 @@ use katana_primitives::Felt; use traits::block::{BlockIdReader, BlockStatusProvider, BlockWriter}; use traits::contract::{ContractClassProvider, ContractClassWriter}; use traits::env::BlockEnvProvider; -use traits::messaging::MessagingProvider; +use traits::messaging::MessagingCheckpointProvider; use traits::state::{StateRootProvider, StateWriter}; use traits::transaction::{TransactionStatusProvider, TransactionTraceProvider}; @@ -382,9 +382,9 @@ where } } -impl MessagingProvider for BlockchainProvider +impl MessagingCheckpointProvider for BlockchainProvider where - Db: MessagingProvider, + Db: MessagingCheckpointProvider, { fn get_send_from_block(&self) -> ProviderResult> { self.provider.get_send_from_block() @@ -422,7 +422,7 @@ where self.provider.get_send_from_index() } - fn set_send_from_index(&self, _send_from_index: u64) -> ProviderResult<()> { - self.provider.get_send_from_index(send_from_index) + fn set_send_from_index(&self, send_from_index: u64) -> ProviderResult<()> { + self.provider.set_send_from_index(send_from_index) } } diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index bc3706995d..5ba57b4ed1 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -12,7 +12,7 @@ use katana_db::models::contract::{ ContractClassChange, ContractInfoChangeList, ContractNonceChange, }; use katana_db::models::list::BlockList; -use katana_db::models::storage::{ContractStorageEntry, ContractStorageKey, StorageEntry}; +use katana_db::models::storage::{ContractStorageEntry, ContractStorageKey, MessagingCheckpointId, StorageEntry}; use katana_db::tables::{self, DupSort, Table}; use katana_db::utils::KeyValue; use katana_primitives::block::{ @@ -36,7 +36,7 @@ use crate::traits::block::{ HeaderProvider, }; use crate::traits::env::BlockEnvProvider; -use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY, GATHER_FROM_NONCE_KEY, SEND_FROM_INDEX_KEY}; +use crate::traits::messaging::MessagingCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -763,45 +763,52 @@ impl BlockWriter for DbProvider { } } -impl MessagingProvider for DbProvider { +impl MessagingCheckpointProvider for DbProvider { + fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + self.0.update(|db_tx| { + db_tx.put::(MessagingCheckpointId::SendBlock, send_from_block)?; + Ok(()) + })? + } + fn get_send_from_block(&self) -> ProviderResult> { let db_tx = self.0.tx()?; - let block_num = db_tx.get::(SEND_FROM_BLOCK_KEY)?; + let block_num = db_tx.get::(MessagingCheckpointId::SendBlock)?; db_tx.commit()?; Ok(block_num) } - fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { self.0.update(|db_tx| { - db_tx.put::(SEND_FROM_BLOCK_KEY, send_from_block)?; + db_tx.put::(MessagingCheckpointId::GatherBlock, gather_from_block)?; Ok(()) })? } fn get_gather_from_block(&self) -> ProviderResult> { let db_tx = self.0.tx()?; - let block_num = db_tx.get::(GATHER_FROM_BLOCK_KEY)?; + let block_num = db_tx.get::(MessagingCheckpointId::GatherBlock)?; db_tx.commit()?; Ok(block_num) } - fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { + fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { self.0.update(|db_tx| { - db_tx.put::(GATHER_FROM_BLOCK_KEY, gather_from_block)?; + db_tx.put::(MessagingCheckpointId::GatherNonce, nonce)?; Ok(()) })? } fn get_gather_message_nonce(&self) -> ProviderResult> { let db_tx = self.0.tx()?; - let nonce = db_tx.get::(GATHER_FROM_NONCE_KEY)?; + let nonce = db_tx.get::(MessagingCheckpointId::GatherNonce)?; db_tx.commit()?; Ok(nonce) } - fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { self.0.update(|db_tx| { - db_tx.put::(GATHER_FROM_NONCE_KEY, nonce)?; + db_tx.put::(message_hash, nonce)?; Ok(()) })? } @@ -813,26 +820,19 @@ impl MessagingProvider for DbProvider { Ok(nonce) } - fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + fn set_send_from_index(&self, send_from_index: u64) -> ProviderResult<()> { self.0.update(|db_tx| { - db_tx.put::(message_hash, nonce)?; + db_tx.put::(MessagingCheckpointId::SendIndex, send_from_index)?; Ok(()) })? } fn get_send_from_index(&self) -> ProviderResult> { let db_tx = self.0.tx()?; - let index = db_tx.get::(SEND_FROM_INDEX_KEY)?; + let index = db_tx.get::(MessagingCheckpointId::SendIndex)?; db_tx.commit()?; Ok(index) } - - fn set_send_from_index(&self, send_from_index: u64) -> ProviderResult<()> { - self.0.update(|db_tx| { - db_tx.put::(SEND_FROM_INDEX_KEY, send_from_index)?; - Ok(()) - })? - } } #[cfg(test)] diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index 62cb9700ab..9c5ba640ab 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -30,7 +30,7 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; -use crate::traits::messaging::MessagingProvider; +use crate::traits::messaging::MessagingCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -583,45 +583,50 @@ impl BlockEnvProvider for ForkedProvider { } } -impl MessagingProvider for ForkedProvider { +impl MessagingCheckpointProvider for ForkedProvider { + fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.send_block = Some(send_from_block); + Ok(()) + } + fn get_send_from_block(&self) -> ProviderResult> { - Ok(None) + Ok(self.storage.read().messaging_info.send_block) } - fn set_send_from_block(&self, _send_from_block: BlockNumber) -> ProviderResult<()> { + fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.send_block = Some(gather_from_block); Ok(()) } fn get_gather_from_block(&self) -> ProviderResult> { - Ok(None) + Ok(self.storage.read().messaging_info.gather_block) } - fn set_gather_from_block(&self, _gather_from_block: BlockNumber) -> ProviderResult<()> { + fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_info.gather_nonce = Some(nonce); Ok(()) } fn get_gather_message_nonce(&self) -> ProviderResult> { - Ok(None) + Ok(self.storage.read().messaging_info.gather_nonce) } - fn set_gather_message_nonce(&self, _nonce: Nonce) -> ProviderResult<()> { + fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { + self.storage.write().messaging_message_nonce_mapping.insert(message_hash, nonce); Ok(()) } - fn get_nonce_from_message_hash(&self, _message_hash: MessageHash) -> ProviderResult> { - Ok(None) + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) } - fn set_nonce_from_message_hash(&self, _message_hash: MessageHash, _nonce: Nonce) -> ProviderResult<()> { + fn set_send_from_index(&self, index: u64) -> ProviderResult<()> { + self.storage.write().messaging_info.send_index = Some(index); Ok(()) } fn get_send_from_index(&self) -> ProviderResult> { - Ok(None) - } - - fn set_send_from_index(&self, _send_from_index: u64) -> ProviderResult<()> { - Ok(()) + Ok(self.storage.read().messaging_info.send_index) } } diff --git a/crates/katana/storage/provider/src/providers/in_memory/cache.rs b/crates/katana/storage/provider/src/providers/in_memory/cache.rs index c4728ceaaf..20bbd4ffa2 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/cache.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/cache.rs @@ -42,6 +42,14 @@ pub struct CacheStateDb { pub(crate) compiled_class_hashes: RwLock, } +#[derive(Default, Debug)] +pub struct MessagingCheckpointId { + pub(crate) send_block: Option, + pub(crate) send_index: Option, + pub(crate) gather_block: Option, + pub(crate) gather_nonce: Option, +} + impl CacheStateDb { /// Applies the given state updates to the cache. pub fn insert_updates(&self, updates: StateUpdatesWithDeclaredClasses) { @@ -89,10 +97,8 @@ pub struct CacheDb { pub(crate) transaction_hashes: HashMap, pub(crate) transaction_numbers: HashMap, pub(crate) transaction_block: HashMap, - pub(crate) messaging_info: HashMap, - pub(crate) messaging_nonce_info: HashMap, + pub(crate) messaging_info: MessagingCheckpointId, pub(crate) messaging_message_nonce_mapping: HashMap, - pub(crate) messaging_index_info: HashMap, } impl CacheStateDb { @@ -126,9 +132,7 @@ impl CacheDb { latest_block_hash: Default::default(), latest_block_number: Default::default(), messaging_info: Default::default(), - messaging_nonce_info: Default::default(), messaging_message_nonce_mapping: Default::default(), - messaging_index_info: Default::default(), } } } diff --git a/crates/katana/storage/provider/src/providers/in_memory/mod.rs b/crates/katana/storage/provider/src/providers/in_memory/mod.rs index 4c912f60eb..bdf6e0789e 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/mod.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/mod.rs @@ -26,7 +26,7 @@ use crate::traits::block::{ }; use crate::traits::contract::ContractClassWriter; use crate::traits::env::BlockEnvProvider; -use crate::traits::messaging::{MessagingProvider, GATHER_FROM_BLOCK_KEY, SEND_FROM_BLOCK_KEY, GATHER_FROM_NONCE_KEY, SEND_FROM_INDEX_KEY}; +use crate::traits::messaging::MessagingCheckpointProvider; use crate::traits::state::{StateFactoryProvider, StateProvider, StateRootProvider, StateWriter}; use crate::traits::state_update::StateUpdateProvider; use crate::traits::transaction::{ @@ -577,36 +577,32 @@ impl BlockEnvProvider for InMemoryProvider { } } -impl MessagingProvider for InMemoryProvider { - fn get_send_from_block(&self) -> ProviderResult> { - Ok(self.storage.read().messaging_info.get(&SEND_FROM_BLOCK_KEY).cloned()) - } - +impl MessagingCheckpointProvider for InMemoryProvider { fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { - self.storage.write().messaging_info.insert(SEND_FROM_BLOCK_KEY, send_from_block); + self.storage.write().messaging_info.send_block = Some(send_from_block); Ok(()) } - fn get_gather_from_block(&self) -> ProviderResult> { - Ok(self.storage.read().messaging_info.get(&GATHER_FROM_BLOCK_KEY).cloned()) + fn get_send_from_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.send_block) } fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { - self.storage.write().messaging_info.insert(GATHER_FROM_BLOCK_KEY, gather_from_block); + self.storage.write().messaging_info.send_block = Some(gather_from_block); Ok(()) } - fn get_gather_message_nonce(&self) -> ProviderResult> { - Ok(self.storage.read().messaging_nonce_info.get(&GATHER_FROM_NONCE_KEY).cloned()) + fn get_gather_from_block(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.gather_block) } fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { - self.storage.write().messaging_nonce_info.insert(GATHER_FROM_NONCE_KEY, nonce); + self.storage.write().messaging_info.gather_nonce = Some(nonce); Ok(()) } - fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { - Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) + fn get_gather_message_nonce(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.gather_nonce) } fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()> { @@ -614,12 +610,16 @@ impl MessagingProvider for InMemoryProvider { Ok(()) } - fn get_send_from_index(&self) -> ProviderResult> { - Ok(self.storage.read().messaging_index_info.get(&SEND_FROM_INDEX_KEY).cloned()) + fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { + Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) } fn set_send_from_index(&self, index: u64) -> ProviderResult<()> { - self.storage.write().messaging_index_info.insert(SEND_FROM_INDEX_KEY, index); + self.storage.write().messaging_info.send_index = Some(index); Ok(()) } + + fn get_send_from_index(&self) -> ProviderResult> { + Ok(self.storage.read().messaging_info.send_index) + } } diff --git a/crates/katana/storage/provider/src/traits/messaging.rs b/crates/katana/storage/provider/src/traits/messaging.rs index a556a4b8f4..d78dfa9d1b 100644 --- a/crates/katana/storage/provider/src/traits/messaging.rs +++ b/crates/katana/storage/provider/src/traits/messaging.rs @@ -3,13 +3,8 @@ use katana_primitives::contract::{Nonce, MessageHash}; use crate::ProviderResult; -pub const SEND_FROM_BLOCK_KEY: u64 = 1; -pub const GATHER_FROM_BLOCK_KEY: u64 = 2; -pub const GATHER_FROM_NONCE_KEY: u64 = 3; -pub const SEND_FROM_INDEX_KEY: u64 = 4; - #[auto_impl::auto_impl(&, Box, Arc)] -pub trait MessagingProvider: Send + Sync { +pub trait MessagingCheckpointProvider: Send + Sync { /// Sets the send from block. fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()>; /// Returns the send from block. From 7049d4137f3c52c8b21969348e92235b24da115b Mon Sep 17 00:00:00 2001 From: Yannick Bensacq Date: Tue, 29 Oct 2024 11:10:20 +0100 Subject: [PATCH 7/8] rename methods --- .../katana/core/src/service/block_producer.rs | 2 +- .../core/src/service/messaging/service.rs | 10 +++--- crates/katana/primitives/src/genesis/mod.rs | 2 +- crates/katana/storage/provider/src/lib.rs | 32 +++++++++--------- .../storage/provider/src/providers/db/mod.rs | 16 ++++----- .../provider/src/providers/fork/mod.rs | 23 +++++++------ .../provider/src/providers/in_memory/mod.rs | 22 ++++++------- .../storage/provider/src/traits/messaging.rs | 33 ++++++++++--------- 8 files changed, 70 insertions(+), 70 deletions(-) diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index c4c7349b0e..1e09e52cc7 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -324,7 +324,7 @@ impl IntervalBlockProducer { let message_hash = Felt::from_bytes_be(&message_hash_bytes); match provider.get_nonce_from_message_hash(message_hash) { - Ok(Some(nonce)) => provider.set_gather_message_nonce(nonce), + Ok(Some(nonce)) => provider.set_inbound_nonce(nonce), Ok(None) => Ok(()), Err(_e) => Ok(()), } diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index bd304e7231..830c837782 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -10,7 +10,7 @@ use katana_primitives::block::BlockHashOrNumber; use katana_primitives::receipt::MessageToL1; use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; use katana_provider::traits::block::BlockNumberProvider; -use katana_provider::traits::messaging::MessagingProvider; +use katana_provider::traits::messaging::{MessagingCheckpointProvider}; use katana_provider::traits::transaction::ReceiptProvider; use tokio::time::{interval_at, Instant, Interval}; use tracing::{error, info}; @@ -54,7 +54,7 @@ impl MessagingService { backend: Arc>, ) -> anyhow::Result { let provider = backend.blockchain.provider(); - let gather_from_block = match provider.get_gather_from_block() { + let gather_from_block = match provider.get_inbound_block() { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { @@ -64,7 +64,7 @@ impl MessagingService { ) } }; - let send_from_block = match provider.get_send_from_block() { + let send_from_block = match provider.get_outbound_block() { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { @@ -245,7 +245,7 @@ impl Stream for MessagingService { .backend .blockchain .provider() - .set_gather_from_block(pin.gather_from_block); + .set_inbound_block(pin.gather_from_block); return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, msg_count, @@ -272,7 +272,7 @@ impl Stream for MessagingService { // sent on the settlement chain. pin.send_from_block += 1; let _ = - pin.backend.blockchain.provider().set_send_from_block(pin.send_from_block); + pin.backend.blockchain.provider().set_outbound_block(pin.send_from_block); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); } Poll::Ready(Err(e)) => { diff --git a/crates/katana/primitives/src/genesis/mod.rs b/crates/katana/primitives/src/genesis/mod.rs index 50f70beff5..b0af9577d4 100644 --- a/crates/katana/primitives/src/genesis/mod.rs +++ b/crates/katana/primitives/src/genesis/mod.rs @@ -428,7 +428,7 @@ mod tests { fee_token: fee_token.clone(), allocations: BTreeMap::from(allocations.clone()), number: 0, - gather_from_block: 0, + settlement_block_number: 0, timestamp: 5123512314u64, state_root: felt!("0x99"), parent_hash: felt!("0x999"), diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index 37e6166de1..fc29a0ca96 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -386,28 +386,28 @@ impl MessagingCheckpointProvider for BlockchainProvider where Db: MessagingCheckpointProvider, { - fn get_send_from_block(&self) -> ProviderResult> { - self.provider.get_send_from_block() + fn get_outbound_block(&self) -> ProviderResult> { + self.provider.get_outbound_block() } - fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { - self.provider.set_send_from_block(send_from_block) + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()> { + self.provider.set_outbound_block(outbound_block) } - fn get_gather_from_block(&self) -> ProviderResult> { - self.provider.get_gather_from_block() + fn get_inbound_block(&self) -> ProviderResult> { + self.provider.get_inbound_block() } - fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { - self.provider.set_gather_from_block(gather_from_block) + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()> { + self.provider.set_inbound_block(inbound_block) } - fn get_gather_message_nonce(&self) -> ProviderResult> { - self.provider.get_gather_message_nonce() + fn get_inbound_nonce(&self) -> ProviderResult> { + self.provider.get_inbound_nonce() } - fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { - self.provider.set_gather_message_nonce(nonce) + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + self.provider.set_inbound_nonce(nonce) } fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult> { @@ -418,11 +418,11 @@ where self.provider.set_nonce_from_message_hash(message_hash, nonce) } - fn get_send_from_index(&self) -> ProviderResult> { - self.provider.get_send_from_index() + fn get_outbound_index(&self) -> ProviderResult> { + self.provider.get_outbound_index() } - fn set_send_from_index(&self, send_from_index: u64) -> ProviderResult<()> { - self.provider.set_send_from_index(send_from_index) + fn set_outbound_index(&self, outbound_index: u64) -> ProviderResult<()> { + self.provider.set_outbound_index(outbound_index) } } diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index 5ba57b4ed1..111cbe288b 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -764,42 +764,42 @@ impl BlockWriter for DbProvider { } impl MessagingCheckpointProvider for DbProvider { - fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { + fn set_outbound_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { self.0.update(|db_tx| { db_tx.put::(MessagingCheckpointId::SendBlock, send_from_block)?; Ok(()) })? } - fn get_send_from_block(&self) -> ProviderResult> { + fn get_outbound_block(&self) -> ProviderResult> { let db_tx = self.0.tx()?; let block_num = db_tx.get::(MessagingCheckpointId::SendBlock)?; db_tx.commit()?; Ok(block_num) } - fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { + fn set_inbound_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { self.0.update(|db_tx| { db_tx.put::(MessagingCheckpointId::GatherBlock, gather_from_block)?; Ok(()) })? } - fn get_gather_from_block(&self) -> ProviderResult> { + fn get_inbound_block(&self) -> ProviderResult> { let db_tx = self.0.tx()?; let block_num = db_tx.get::(MessagingCheckpointId::GatherBlock)?; db_tx.commit()?; Ok(block_num) } - fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { self.0.update(|db_tx| { db_tx.put::(MessagingCheckpointId::GatherNonce, nonce)?; Ok(()) })? } - fn get_gather_message_nonce(&self) -> ProviderResult> { + fn get_inbound_nonce(&self) -> ProviderResult> { let db_tx = self.0.tx()?; let nonce = db_tx.get::(MessagingCheckpointId::GatherNonce)?; db_tx.commit()?; @@ -820,14 +820,14 @@ impl MessagingCheckpointProvider for DbProvider { Ok(nonce) } - fn set_send_from_index(&self, send_from_index: u64) -> ProviderResult<()> { + fn set_outbound_index(&self, send_from_index: u64) -> ProviderResult<()> { self.0.update(|db_tx| { db_tx.put::(MessagingCheckpointId::SendIndex, send_from_index)?; Ok(()) })? } - fn get_send_from_index(&self) -> ProviderResult> { + fn get_outbound_index(&self) -> ProviderResult> { let db_tx = self.0.tx()?; let index = db_tx.get::(MessagingCheckpointId::SendIndex)?; db_tx.commit()?; diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index 9c5ba640ab..0e4fb97f62 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -584,30 +584,30 @@ impl BlockEnvProvider for ForkedProvider { } impl MessagingCheckpointProvider for ForkedProvider { - fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { - self.storage.write().messaging_info.send_block = Some(send_from_block); + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.send_block = Some(outbound_block); Ok(()) } - fn get_send_from_block(&self) -> ProviderResult> { + fn get_outbound_block(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.send_block) } - fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { - self.storage.write().messaging_info.send_block = Some(gather_from_block); + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.gather_block = Some(inbound_block); Ok(()) } - fn get_gather_from_block(&self) -> ProviderResult> { + fn get_inbound_block(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.gather_block) } - fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { self.storage.write().messaging_info.gather_nonce = Some(nonce); Ok(()) } - fn get_gather_message_nonce(&self) -> ProviderResult> { + fn get_inbound_nonce(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.gather_nonce) } @@ -620,13 +620,12 @@ impl MessagingCheckpointProvider for ForkedProvider { Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) } - fn set_send_from_index(&self, index: u64) -> ProviderResult<()> { - self.storage.write().messaging_info.send_index = Some(index); + fn set_outbound_index(&self, outbound_index: u64) -> ProviderResult<()> { + self.storage.write().messaging_info.send_index = Some(outbound_index); Ok(()) } - fn get_send_from_index(&self) -> ProviderResult> { + fn get_outbound_index(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.send_index) } - } diff --git a/crates/katana/storage/provider/src/providers/in_memory/mod.rs b/crates/katana/storage/provider/src/providers/in_memory/mod.rs index bdf6e0789e..39deffd7ee 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/mod.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/mod.rs @@ -578,30 +578,30 @@ impl BlockEnvProvider for InMemoryProvider { } impl MessagingCheckpointProvider for InMemoryProvider { - fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()> { - self.storage.write().messaging_info.send_block = Some(send_from_block); + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.send_block = Some(outbound_block); Ok(()) } - fn get_send_from_block(&self) -> ProviderResult> { + fn get_outbound_block(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.send_block) } - fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()> { - self.storage.write().messaging_info.send_block = Some(gather_from_block); + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()> { + self.storage.write().messaging_info.gather_block = Some(inbound_block); Ok(()) } - fn get_gather_from_block(&self) -> ProviderResult> { + fn get_inbound_block(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.gather_block) } - fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()> { + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()> { self.storage.write().messaging_info.gather_nonce = Some(nonce); Ok(()) } - fn get_gather_message_nonce(&self) -> ProviderResult> { + fn get_inbound_nonce(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.gather_nonce) } @@ -614,12 +614,12 @@ impl MessagingCheckpointProvider for InMemoryProvider { Ok(self.storage.read().messaging_message_nonce_mapping.get(&message_hash).cloned()) } - fn set_send_from_index(&self, index: u64) -> ProviderResult<()> { - self.storage.write().messaging_info.send_index = Some(index); + fn set_outbound_index(&self, outbound_index: u64) -> ProviderResult<()> { + self.storage.write().messaging_info.send_index = Some(outbound_index); Ok(()) } - fn get_send_from_index(&self) -> ProviderResult> { + fn get_outbound_index(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.send_index) } } diff --git a/crates/katana/storage/provider/src/traits/messaging.rs b/crates/katana/storage/provider/src/traits/messaging.rs index d78dfa9d1b..055ce8234b 100644 --- a/crates/katana/storage/provider/src/traits/messaging.rs +++ b/crates/katana/storage/provider/src/traits/messaging.rs @@ -5,24 +5,25 @@ use crate::ProviderResult; #[auto_impl::auto_impl(&, Box, Arc)] pub trait MessagingCheckpointProvider: Send + Sync { - /// Sets the send from block. - fn set_send_from_block(&self, send_from_block: BlockNumber) -> ProviderResult<()>; - /// Returns the send from block. - fn get_send_from_block(&self) -> ProviderResult>; - /// Sets the gather from block. - fn set_gather_from_block(&self, gather_from_block: BlockNumber) -> ProviderResult<()>; - /// Returns the gather from block. - fn get_gather_from_block(&self) -> ProviderResult>; - /// Sets the gather from nonce. - fn set_gather_message_nonce(&self, nonce: Nonce) -> ProviderResult<()>; - /// Returns the gather from nonce. - fn get_gather_message_nonce(&self) -> ProviderResult>; + /// Sets the outbound block. + fn set_outbound_block(&self, outbound_block: BlockNumber) -> ProviderResult<()>; + /// Returns the outbound block. + fn get_outbound_block(&self) -> ProviderResult>; + + /// Sets the inbound block. + fn set_inbound_block(&self, inbound_block: BlockNumber) -> ProviderResult<()>; + /// Returns the inbound block. + fn get_inbound_block(&self) -> ProviderResult>; + /// Sets the inbound nonce. + fn set_inbound_nonce(&self, nonce: Nonce) -> ProviderResult<()>; + /// Returns the inbound nonce. + fn get_inbound_nonce(&self) -> ProviderResult>; /// Sets the nonce by message_hash. fn set_nonce_from_message_hash(&self, message_hash: MessageHash, nonce: Nonce) -> ProviderResult<()>; /// Returns the nonce by message_hash. fn get_nonce_from_message_hash(&self, message_hash: MessageHash) -> ProviderResult>; - /// Sets the send from index. - fn set_send_from_index(&self, index: u64) -> ProviderResult<()>; - /// Returns the send from index. - fn get_send_from_index(&self) -> ProviderResult>; + /// Sets the outbound index. + fn set_outbound_index(&self, index: u64) -> ProviderResult<()>; + /// Returns the outbound index. + fn get_outbound_index(&self) -> ProviderResult>; } From ec90445c8b36171597e168396961bc2246807fa3 Mon Sep 17 00:00:00 2001 From: Yannick Bensacq Date: Tue, 29 Oct 2024 11:31:10 +0100 Subject: [PATCH 8/8] provider: add process_message_nonce --- .../katana/core/src/service/block_producer.rs | 12 +---------- crates/katana/storage/provider/src/lib.rs | 18 ++++++++++++++++- .../storage/provider/src/providers/db/mod.rs | 18 ++++++++++++++++- .../provider/src/providers/fork/mod.rs | 20 +++++++++++++++++-- .../provider/src/providers/in_memory/mod.rs | 20 +++++++++++++++++-- .../storage/provider/src/traits/messaging.rs | 4 +++- 6 files changed, 74 insertions(+), 18 deletions(-) diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 1e09e52cc7..d4ce7ce96f 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -16,7 +16,6 @@ use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; use katana_primitives::transaction::{ExecutableTxWithHash, Tx, TxHash, TxWithHash}; use katana_primitives::version::CURRENT_STARKNET_VERSION; -use katana_primitives::Felt; use katana_provider::error::ProviderError; use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider}; use katana_provider::traits::env::BlockEnvProvider; @@ -318,16 +317,7 @@ impl IntervalBlockProducer { trace!(target: LOG_TARGET, "Executed transaction: {:?}", tx); let _ = match tx_ref { Tx::L1Handler(l1_tx) => { - // get stored nonce from message hash - let message_hash_bytes = l1_tx.message_hash; - let message_hash_bytes: [u8; 32] = *message_hash_bytes; - - let message_hash = Felt::from_bytes_be(&message_hash_bytes); - match provider.get_nonce_from_message_hash(message_hash) { - Ok(Some(nonce)) => provider.set_inbound_nonce(nonce), - Ok(None) => Ok(()), - Err(_e) => Ok(()), - } + provider.process_message_nonce(l1_tx) } _ => Ok(()), }; diff --git a/crates/katana/storage/provider/src/lib.rs b/crates/katana/storage/provider/src/lib.rs index fc29a0ca96..5854b2d04d 100644 --- a/crates/katana/storage/provider/src/lib.rs +++ b/crates/katana/storage/provider/src/lib.rs @@ -11,7 +11,7 @@ use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, TxHash, TxNumber, TxWithHash}; use katana_primitives::Felt; use traits::block::{BlockIdReader, BlockStatusProvider, BlockWriter}; use traits::contract::{ContractClassProvider, ContractClassWriter}; @@ -425,4 +425,20 @@ where fn set_outbound_index(&self, outbound_index: u64) -> ProviderResult<()> { self.provider.set_outbound_index(outbound_index) } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } } diff --git a/crates/katana/storage/provider/src/providers/db/mod.rs b/crates/katana/storage/provider/src/providers/db/mod.rs index 111cbe288b..e3b6156d23 100644 --- a/crates/katana/storage/provider/src/providers/db/mod.rs +++ b/crates/katana/storage/provider/src/providers/db/mod.rs @@ -27,7 +27,7 @@ use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, TxHash, TxNumber, TxWithHash}; use katana_primitives::Felt; use crate::error::ProviderError; @@ -833,6 +833,22 @@ impl MessagingCheckpointProvider for DbProvider { db_tx.commit()?; Ok(index) } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } } #[cfg(test)] diff --git a/crates/katana/storage/provider/src/providers/fork/mod.rs b/crates/katana/storage/provider/src/providers/fork/mod.rs index 0e4fb97f62..c525506656 100644 --- a/crates/katana/storage/provider/src/providers/fork/mod.rs +++ b/crates/katana/storage/provider/src/providers/fork/mod.rs @@ -15,11 +15,11 @@ use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{Tx, TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, Tx, TxHash, TxNumber, TxWithHash}; use parking_lot::RwLock; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; - +use katana_primitives::Felt; use self::backend::{Backend, BackendError, SharedStateProvider}; use self::state::ForkedStateDb; use super::in_memory::cache::{CacheDb, CacheStateDb}; @@ -628,4 +628,20 @@ impl MessagingCheckpointProvider for ForkedProvider { fn get_outbound_index(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.send_index) } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } } diff --git a/crates/katana/storage/provider/src/providers/in_memory/mod.rs b/crates/katana/storage/provider/src/providers/in_memory/mod.rs index 39deffd7ee..f8ca389c5c 100644 --- a/crates/katana/storage/provider/src/providers/in_memory/mod.rs +++ b/crates/katana/storage/provider/src/providers/in_memory/mod.rs @@ -15,9 +15,9 @@ use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; use katana_primitives::trace::TxExecInfo; -use katana_primitives::transaction::{Tx, TxHash, TxNumber, TxWithHash}; +use katana_primitives::transaction::{L1HandlerTx, Tx, TxHash, TxNumber, TxWithHash}; use parking_lot::RwLock; - +use katana_primitives::Felt; use self::cache::CacheDb; use self::state::{HistoricalStates, InMemoryStateDb, LatestStateProvider}; use crate::traits::block::{ @@ -622,4 +622,20 @@ impl MessagingCheckpointProvider for InMemoryProvider { fn get_outbound_index(&self) -> ProviderResult> { Ok(self.storage.read().messaging_info.send_index) } + + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>{ + // Get stored nonce from message hash + let message_hash_bytes = l1_tx.message_hash; + let message_hash_bytes: [u8; 32] = *message_hash_bytes; + let message_hash = Felt::from_bytes_be(&message_hash_bytes); + + // Attempt to get the nonce from the message hash + match self.get_nonce_from_message_hash(message_hash) { + Ok(Some(nonce)) => + // Set the inbound nonce if a nonce is retrieved + self.set_inbound_nonce(nonce), + Ok(None) => Ok(()), + Err(_e) => Ok(()), + } + } } diff --git a/crates/katana/storage/provider/src/traits/messaging.rs b/crates/katana/storage/provider/src/traits/messaging.rs index 055ce8234b..94db0fff9e 100644 --- a/crates/katana/storage/provider/src/traits/messaging.rs +++ b/crates/katana/storage/provider/src/traits/messaging.rs @@ -1,6 +1,6 @@ use katana_primitives::block::BlockNumber; use katana_primitives::contract::{Nonce, MessageHash}; - +use katana_primitives::transaction::L1HandlerTx; use crate::ProviderResult; #[auto_impl::auto_impl(&, Box, Arc)] @@ -26,4 +26,6 @@ pub trait MessagingCheckpointProvider: Send + Sync { fn set_outbound_index(&self, index: u64) -> ProviderResult<()>; /// Returns the outbound index. fn get_outbound_index(&self) -> ProviderResult>; + /// Processes the nonce in the provided L1HandlerTx and updates the inbound nonce within the provider. + fn process_message_nonce(&self, l1_tx: &L1HandlerTx) -> ProviderResult<()>; }