Skip to content

Commit

Permalink
starknet messenging: gather message & update gather from in config file
Browse files Browse the repository at this point in the history
  • Loading branch information
ybensacq committed Aug 20, 2024
1 parent c3c91ca commit 621ae0b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
4 changes: 3 additions & 1 deletion crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,15 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
}
}

let mut messaging_config = pin.messaging_config.write().unwrap();
// Poll the gathering future
if let Some(mut gather_fut) = pin.msg_gather_fut.take() {
match gather_fut.poll_unpin(cx) {
Poll::Ready(Ok((last_block, msg_count))) => {
info!(target: LOG_TARGET, "Gathered {} transactions up to block {}", msg_count, last_block);
pin.gather_from_block = last_block + 1;
messaging_config.gather_from_block = pin.gather_from_block;
let _ = messaging_config.save();
return Poll::Ready(Some(MessagingOutcome::Gather {
lastest_block: last_block,
msg_count,
Expand All @@ -248,7 +251,6 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
info!(target: LOG_TARGET, "Sent {} messages from block {}", msg_count, block_num);
pin.send_from_block = block_num + 1;
// update the config with the latest block number sent.
let mut messaging_config = pin.messaging_config.write().unwrap();
messaging_config.send_from_block = pin.send_from_block;
let _ = messaging_config.save();
return Poll::Ready(Some(MessagingOutcome::Send { block_num, msg_count }));
Expand Down
94 changes: 89 additions & 5 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use crate::hooker::KatanaHooker;
use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -69,6 +70,52 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> StarknetMessaging<EF> {
})
}

/// Fetches events for the given blocks range.
pub async fn fetch_events(
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut block_to_events: HashMap<u64, Vec<EmittedEvent>> = HashMap::new();

let filter = EventFilter {
from_block: Some(from_block),
to_block: Some(to_block),
address: Some(self.messaging_contract_address),
// TODO: this might come from the configuration actually.
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
let mut continuation_token: Option<String> = None;

loop {
let event_page =
self.provider.get_events(filter.clone(), continuation_token, chunk_size).await?;

event_page.events.into_iter().for_each(|event| {
// We ignore events without the block number
if let Some(block_number) = event.block_number {
block_to_events
.entry(block_number)
.and_modify(|v| v.push(event.clone()))
.or_insert(vec![event]);
}
});

continuation_token = event_page.continuation_token;

if continuation_token.is_none() {
break;
}
}

Ok(block_to_events)
}

async fn fetch_pending_events(&self, chain_id: ChainId) -> MessengerResult<Vec<L1HandlerTx>> {
let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];
let mut continuation_token: Option<String> = None;
Expand Down Expand Up @@ -225,8 +272,8 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM

async fn gather_messages(
&self,
_from_block: u64,
_max_blocks: u64,
from_block: u64,
max_blocks: u64,
chain_id: ChainId,
) -> MessengerResult<(u64, Vec<L1HandlerTx>)> {
debug!(target: LOG_TARGET, "Gathering messages");
Expand All @@ -245,18 +292,55 @@ impl<EF: katana_executor::ExecutorFactory + Send + Sync> Messenger for StarknetM
}
};

if from_block > chain_latest_block {
// Nothing to fetch, we can skip waiting the next tick.
return Ok((chain_latest_block, vec![]));
}

// +1 as the from_block counts as 1 block fetched.
let to_block = if from_block + max_blocks + 1 < chain_latest_block {
from_block + max_blocks
} else {
chain_latest_block
};

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

// fetch events for the given range before fetching pending events
self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
.await
.map_err(|_| Error::SendError)
.unwrap()
.iter()
.for_each(|(block_number, block_events)| {
debug!(
target: LOG_TARGET,
block_number = %block_number,
events_count = %block_events.len(),
"Converting events of block into L1HandlerTx."
);

block_events.iter().for_each(|e| {
if let Ok(tx) = l1_handler_tx_from_event(e, chain_id) {
l1_handler_txs.push(tx)
}
})
});

// Check if the block number has changed
let previous_block = self.latest_block.load(Ordering::Relaxed);
if previous_block != chain_latest_block {
debug!(target: LOG_TARGET, "Block number changed from {} to {}, clearing cache", previous_block, chain_latest_block);
self.event_cache.write().await.clear();
self.latest_block.store(chain_latest_block, Ordering::Relaxed);
}

// Fetch pending events
let pending_txs = self.fetch_pending_events(chain_id).await?;
debug!(target: LOG_TARGET, "Returning {} pending transactions", pending_txs.len());
// Add pending events to the list
l1_handler_txs.extend(pending_txs);

Ok((chain_latest_block, pending_txs))
debug!(target: LOG_TARGET, "Returning {} transactions", l1_handler_txs.len());
Ok((chain_latest_block, l1_handler_txs))
}

async fn send_messages(
Expand Down

0 comments on commit 621ae0b

Please sign in to comment.