Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(katana): prevent from resending messages to L1 #2354

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/katana/core/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use katana_provider::providers::db::DbProvider;
use katana_provider::traits::block::{BlockProvider, BlockWriter};
use katana_provider::traits::contract::ContractClassWriter;
use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::messaging::MessagingCheckpointProvider;
use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter};
use katana_provider::traits::state_update::StateUpdateProvider;
use katana_provider::traits::transaction::{
Expand All @@ -29,6 +30,7 @@ pub trait Database:
+ ContractClassWriter
+ StateFactoryProvider
+ BlockEnvProvider
+ MessagingCheckpointProvider
+ 'static
+ Send
+ Sync
Expand All @@ -50,6 +52,7 @@ impl<T> Database for T where
+ ContractClassWriter
+ StateFactoryProvider
+ BlockEnvProvider
+ MessagingCheckpointProvider
+ 'static
+ Send
+ Sync
Expand Down
49 changes: 37 additions & 12 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::backend::Backend;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::stream::{Stream, StreamExt};
use futures::FutureExt;
Expand All @@ -13,20 +14,20 @@ use katana_pool::validation::stateful::TxValidator;
use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader};
use katana_primitives::receipt::Receipt;
use katana_primitives::trace::TxExecInfo;
use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash};
use katana_primitives::transaction::{ExecutableTxWithHash, Tx, TxHash, TxWithHash};
use katana_primitives::version::CURRENT_STARKNET_VERSION;
use katana_primitives::Felt;
use katana_provider::error::ProviderError;
use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider};
use katana_provider::traits::env::BlockEnvProvider;
use katana_provider::traits::messaging::MessagingCheckpointProvider;
use katana_provider::traits::state::StateFactoryProvider;
use katana_tasks::{BlockingTaskPool, BlockingTaskResult};
use parking_lot::lock_api::RawMutex;
use parking_lot::{Mutex, RwLock};
use tokio::time::{interval_at, Instant, Interval};
use tracing::{error, info, trace, warn};

use crate::backend::Backend;

pub(crate) const LOG_TARGET: &str = "miner";

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -296,9 +297,11 @@ impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
fn execute_transactions(
executor: PendingExecutor,
transactions: Vec<ExecutableTxWithHash>,
backend: Arc<Backend<EF>>,
) -> TxExecutionResult {
let executor = &mut executor.write();
let provider = backend.blockchain.provider();

let executor = &mut executor.write();
let new_txs_count = transactions.len();
executor.execute_transactions(transactions)?;

Expand All @@ -309,13 +312,34 @@ impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
let results = txs
.iter()
.skip(total_txs - new_txs_count)
.filter_map(|(tx, res)| match res {
ExecutionResult::Failed { .. } => None,
ExecutionResult::Success { receipt, trace, .. } => Some(TxWithOutcome {
tx: tx.clone(),
receipt: receipt.clone(),
exec_info: trace.clone(),
}),
.filter_map(|(tx, res)| {
let tx_ref: &Tx = &tx.transaction;

trace!(target: LOG_TARGET, "Executed transaction: {:?}", tx);
let _ = match tx_ref {
Tx::L1Handler(l1_tx) => {
// get stored nonce from message hash
let message_hash_bytes = l1_tx.message_hash;
let message_hash_bytes: [u8; 32] = *message_hash_bytes;

let message_hash = Felt::from_bytes_be(&message_hash_bytes);
match provider.get_nonce_from_message_hash(message_hash) {
Ok(Some(nonce)) => provider.set_gather_message_nonce(nonce),
Ok(None) => Ok(()),
Err(_e) => Ok(()),
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move that into a function taking the l1_tx as input. Maybe into the provider directly. Since we're calling provider several time, internalizing the process and having only one function here would make it simpler and easier to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glihm ok :-)

}
_ => Ok(()),
};

match res {
ExecutionResult::Failed { .. } => None,
ExecutionResult::Success { receipt, trace, .. } => Some(TxWithOutcome {
tx: tx.clone(),
receipt: receipt.clone(),
exec_info: trace.clone(),
}),
}
})
.collect::<Vec<TxWithOutcome>>();

Expand Down Expand Up @@ -399,10 +423,11 @@ impl<EF: ExecutorFactory> Stream for IntervalBlockProducer<EF> {

let transactions: Vec<ExecutableTxWithHash> =
std::mem::take(&mut pin.queued).into_iter().flatten().collect();
let backend = pin.backend.clone();

let fut = pin
.blocking_task_spawner
.spawn(|| Self::execute_transactions(executor, transactions));
.spawn(|| Self::execute_transactions(executor, transactions, backend));

pin.ongoing_execution = Some(Box::pin(fut));
}
Expand Down
4 changes: 4 additions & 0 deletions crates/katana/core/src/service/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ pub struct MessagingConfig {
pub interval: u64,
/// The block on settlement chain from where Katana will start fetching messages.
pub from_block: u64,
/// The maximum number of blocks in gather messages
pub max_block: u64,
/// The size of events returned by get_events call
pub chunk_size: u64,
}

impl MessagingConfig {
Expand Down
59 changes: 49 additions & 10 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use katana_primitives::block::BlockHashOrNumber;
use katana_primitives::receipt::MessageToL1;
use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash};
use katana_provider::traits::block::BlockNumberProvider;
use katana_provider::traits::messaging::MessagingProvider;
use katana_provider::traits::transaction::ReceiptProvider;
use tokio::time::{interval_at, Instant, Interval};
use tracing::{error, info};
Expand Down Expand Up @@ -38,6 +39,10 @@ pub struct MessagingService<EF: ExecutorFactory> {
send_from_block: u64,
/// The message sending future.
msg_send_fut: Option<MessageSettlingFuture>,
/// The maximum block number to gather messages from.
max_block: u64,
/// The chunk size of messages to gather.
chunk_size: u64,
}

impl<EF: ExecutorFactory> MessagingService<EF> {
Expand All @@ -48,14 +53,38 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
pool: TxPool,
backend: Arc<Backend<EF>>,
) -> anyhow::Result<Self> {
let gather_from_block = config.from_block;
let provider = backend.blockchain.provider();
let gather_from_block = match provider.get_gather_from_block() {
Ok(Some(block)) => block,
Ok(None) => 0,
Err(_) => {
anyhow::bail!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n"
)
}
};
let send_from_block = match provider.get_send_from_block() {
Ok(Some(block)) => block,
Ok(None) => 0,
Err(_) => {
anyhow::bail!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n"
)
}
};

let max_block = config.max_block;
let chunk_size = config.chunk_size;

let interval = interval_from_seconds(config.interval);
let messenger = match MessengerMode::from_config(config).await {
Ok(m) => Arc::new(m),
Err(_) => {
panic!(
anyhow::bail!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n",
(anvil or other katana) is running.\n"
)
}
};
Expand All @@ -66,9 +95,11 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
interval,
messenger,
gather_from_block,
send_from_block: 0,
send_from_block,
msg_gather_fut: None,
msg_send_fut: None,
max_block,
chunk_size,
})
}

Expand All @@ -77,11 +108,9 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
pool: TxPool,
backend: Arc<Backend<EF>>,
from_block: u64,
max_block: u64,
chunk_size: u64,
) -> MessengerResult<(u64, usize)> {
// 200 avoids any possible rejection from RPC with possibly lot's of messages.
// TODO: May this be configurable?
let max_block = 200;

match messenger.as_ref() {
MessengerMode::Ethereum(inner) => {
let (block_num, txs) =
Expand All @@ -102,8 +131,9 @@ impl<EF: ExecutorFactory> MessagingService<EF> {

#[cfg(feature = "starknet-messaging")]
MessengerMode::Starknet(inner) => {
let (block_num, txs) =
inner.gather_messages(from_block, max_block, backend.chain_id).await?;
let (block_num, txs) = inner
.gather_messages(from_block, max_block, chunk_size, backend.chain_id)
.await?;
let txs_count = txs.len();

txs.into_iter().for_each(|tx| {
Expand Down Expand Up @@ -188,6 +218,8 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
pin.pool.clone(),
pin.backend.clone(),
pin.gather_from_block,
pin.max_block,
pin.chunk_size,
)));
}

Expand All @@ -209,6 +241,11 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
match gather_fut.poll_unpin(cx) {
Poll::Ready(Ok((last_block, msg_count))) => {
pin.gather_from_block = last_block + 1;
let _ = pin
.backend
.blockchain
.provider()
.set_gather_from_block(pin.gather_from_block);
return Poll::Ready(Some(MessagingOutcome::Gather {
lastest_block: last_block,
msg_count,
Expand All @@ -234,6 +271,8 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
// +1 to move to the next local block to check messages to be
// sent on the settlement chain.
pin.send_from_block += 1;
let _ =
pin.backend.blockchain.provider().set_send_from_block(pin.send_from_block);
return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count }));
}
Poll::Ready(Err(e)) => {
Expand Down
30 changes: 21 additions & 9 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl StarknetMessaging {
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<Vec<EmittedEvent>> {
chunk_size: u64,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut events = vec![];
Expand All @@ -77,8 +78,6 @@ impl StarknetMessaging {
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
let mut continuation_token: Option<String> = None;

loop {
Expand Down Expand Up @@ -165,6 +164,7 @@ impl Messenger for StarknetMessaging {
&self,
from_block: u64,
max_blocks: u64,
chunk_size: u64,
chain_id: ChainId,
) -> MessengerResult<(u64, Vec<Self::MessageTransaction>)> {
let chain_latest_block: u64 = match self.provider.block_number().await {
Expand Down Expand Up @@ -193,7 +193,7 @@ impl Messenger for StarknetMessaging {

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block), chunk_size)
.await
.map_err(|_| Error::SendError)
.unwrap()
Expand All @@ -204,10 +204,15 @@ impl Messenger for StarknetMessaging {
event = ?e,
"Converting event into L1HandlerTx."
);

if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
}
block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
let last_processed_nonce =
self.provider.get_gather_message_nonce().unwrap_or(0.into());
if tx.nonce > last_processed_nonce {
l1_handler_txs.push(tx)
}
}
})
});

Ok((to_block, l1_handler_txs))
Expand Down Expand Up @@ -235,7 +240,14 @@ impl Messenger for StarknetMessaging {
};
}

self.send_hashes(hashes.clone()).await?;
for (index, hash) in hashes.iter().enumerate() {
let stored_index = self.provider.get_send_from_index();
self.send_hashes(std::slice::from_ref(hash)).await?;
self.provider.set_send_from_index((index as u64) + 1).await?;
}

// reset the index
self.provider.set_send_from_index(0).await?;

Ok(hashes)
}
Expand Down
3 changes: 3 additions & 0 deletions crates/katana/primitives/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub type StorageValue = Felt;
/// Represents the type for a contract nonce.
pub type Nonce = Felt;

/// Represents the type for a message hash.
pub type MessageHash = Felt;

/// Represents a contract address.
#[derive(Default, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, Debug, Deref)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down
4 changes: 4 additions & 0 deletions crates/katana/primitives/src/genesis/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ pub struct GenesisJson {
pub accounts: HashMap<ContractAddress, GenesisAccountJson>,
#[serde(default)]
pub contracts: HashMap<ContractAddress, GenesisContractJson>,
pub settlement_block_number: BlockNumber,
}

impl GenesisJson {
Expand Down Expand Up @@ -611,6 +612,7 @@ impl TryFrom<GenesisJson> for Genesis {
gas_prices: value.gas_prices,
state_root: value.state_root,
parent_hash: value.parent_hash,
settlement_block_number: value.settlement_block_number,
})
}
}
Expand Down Expand Up @@ -1039,6 +1041,7 @@ mod tests {
let expected_genesis = Genesis {
classes: expected_classes,
number: 0,
gather_from_block: 0,
fee_token: expected_fee_token,
allocations: expected_allocations,
timestamp: 5123512314u64,
Expand Down Expand Up @@ -1179,6 +1182,7 @@ mod tests {
classes,
allocations,
number: 0,
gather_from_block: 0,
timestamp: 5123512314u64,
state_root: felt!("0x99"),
parent_hash: felt!("0x999"),
Expand Down
Loading
Loading