From 621ae0bca3c51dc7044c2bf8da23e730115b2d3f Mon Sep 17 00:00:00 2001 From: bexan Date: Tue, 20 Aug 2024 07:18:44 +0200 Subject: [PATCH] starknet messenging: gather message & update gather from in config file --- .../core/src/service/messaging/service.rs | 4 +- .../core/src/service/messaging/starknet.rs | 94 ++++++++++++++++++- 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index a524e41f32..86333cac11 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -222,12 +222,15 @@ impl Stream for MessagingService { } } + let mut messaging_config = pin.messaging_config.write().unwrap(); // Poll the gathering future if let Some(mut gather_fut) = pin.msg_gather_fut.take() { match gather_fut.poll_unpin(cx) { Poll::Ready(Ok((last_block, msg_count))) => { info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block); pin.gather_from_block = last_block + 1; + messaging_config.gather_from_block = pin.gather_from_block; + let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Gather { lastest_block: last_block, msg_count, @@ -248,7 +251,6 @@ impl Stream for MessagingService { info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num); pin.send_from_block = block_num + 1; // update the config with the latest block number sent. - let mut messaging_config = pin.messaging_config.write().unwrap(); messaging_config.send_from_block = pin.send_from_block; let _ = messaging_config.save(); return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count })); diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 5473f3bf47..4854722728 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use crate::hooker::KatanaHooker; use anyhow::Result; use async_trait::async_trait; @@ -69,6 +70,52 @@ impl StarknetMessaging { }) } + /// Fetches events for the given blocks range. + pub async fn fetch_events( + &self, + from_block: BlockId, + to_block: BlockId, + ) -> Result>> { + trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); + + let mut block_to_events: HashMap> = HashMap::new(); + + let filter = EventFilter { + from_block: Some(from_block), + to_block: Some(to_block), + address: Some(self.messaging_contract_address), + // TODO: this might come from the configuration actually. + keys: None, + }; + + // TODO: this chunk_size may also come from configuration? + let chunk_size = 200; + let mut continuation_token: Option = None; + + loop { + let event_page = + self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?; + + event_page.events.into_iter().for_each(|event| { + // We ignore events without the block number + if let Some(block_number) = event.block_number { + block_to_events + .entry(block_number) + .and_modify(|v| v.push(event.clone())) + .or_insert(vec![event]); + } + }); + + continuation_token = event_page.continuation_token; + + if continuation_token.is_none() { + break; + } + } + + Ok(block_to_events) + } + async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult> { let mut l1_handler_txs: Vec = vec![]; let mut continuation_token: Option = None; @@ -225,8 +272,8 @@ impl Messenger for StarknetM async fn gather_messages( &self, - _from_block: u64, - _max_blocks: u64, + from_block: u64, + max_blocks: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { debug!(target: LOG_TARGET, "Gathering messages"); @@ -245,6 +292,41 @@ impl Messenger for StarknetM } }; + if from_block > chain_latest_block { + // Nothing to fetch, we can skip waiting the next tick. + return Ok((chain_latest_block, vec![])); + } + + // +1 as the from_block counts as 1 block fetched. + let to_block = if from_block + max_blocks + 1 < chain_latest_block { + from_block + max_blocks + } else { + chain_latest_block + }; + + let mut l1_handler_txs: Vec = vec![]; + + // fetch events for the given range before fetching pending events + self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) + .await + .map_err(|_| Error::SendError) + .unwrap() + .iter() + .for_each(|(block_number, block_events)| { + debug!( + target: LOG_TARGET, + block_number = %block_number, + events_count = %block_events.len(), + "Converting events of block into L1HandlerTx." + ); + + block_events.iter().for_each(|e| { + if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) { + l1_handler_txs.push(tx) + } + }) + }); + // Check if the block number has changed let previous_block = self.latest_block.load(Ordering::Relaxed); if previous_block != chain_latest_block { @@ -252,11 +334,13 @@ impl Messenger for StarknetM self.event_cache.write().await.clear(); self.latest_block.store(chain_latest_block, Ordering::Relaxed); } - + // Fetch pending events let pending_txs = self.fetch_pending_events(chain_id).await?; - debug!(target: LOG_TARGET, "Returning {} pending transactions", pending_txs.len()); + // Add pending events to the list + l1_handler_txs.extend(pending_txs); - Ok((chain_latest_block, pending_txs)) + debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len()); + Ok((chain_latest_block, l1_handler_txs)) } async fn send_messages(