diff --git a/offchain/Cargo.lock b/offchain/Cargo.lock index 65f3decd8..9f990e30f 100644 --- a/offchain/Cargo.lock +++ b/offchain/Cargo.lock @@ -4451,6 +4451,7 @@ dependencies = [ "prometheus-client", "redacted", "redis", + "regex", "serde", "serde_json", "snafu 0.8.2", diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index 1993b6e19..f550edc5d 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -55,7 +55,7 @@ impl BrokerFacade { let client = Broker::new(config).await.context(BrokerInternalSnafu)?; let inputs_stream = RollupsInputsStream::new(&dapp_metadata); let outputs_stream = RollupsOutputsStream::new(&dapp_metadata); - let claims_stream = RollupsClaimsStream::new(dapp_metadata.chain_id); + let claims_stream = RollupsClaimsStream::new(&dapp_metadata); Ok(Self { client, inputs_stream, diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index 43c475d20..69be76059 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -3,8 +3,10 @@ use async_trait::async_trait; use snafu::ResultExt; -use std::fmt::Debug; -use tracing::{info, trace}; +use std::{collections::HashMap, fmt::Debug}; +use tracing::{debug, info}; + +use rollups_events::Address; use crate::{ checker::DuplicateChecker, listener::BrokerListener, @@ -31,6 +33,9 @@ pub enum ClaimerError< D: DuplicateChecker, T: TransactionSender, > { + #[snafu(display("invalid app address {:?}", app_address))] + InvalidAppAddress { app_address: Address }, + #[snafu(display("broker listener error"))] BrokerListenerError { source: B::Error }, @@ -84,14 +89,14 @@ where type Error = ClaimerError; async fn start(mut self) -> Result<(), Self::Error> { - trace!("Starting the authority claimer loop"); + debug!("Starting the authority claimer loop"); loop { let rollups_claim = self .broker_listener .listen() .await .context(BrokerListenerSnafu)?; - trace!("Got a claim from the broker: {:?}", rollups_claim); + debug!("Got a claim from the broker: {:?}", rollups_claim); let is_duplicated_rollups_claim = self .duplicate_checker @@ -99,10 +104,98 @@ where .await .context(DuplicatedClaimSnafu)?; if is_duplicated_rollups_claim { - trace!("It was a duplicated claim"); + debug!("It was a duplicated claim"); + continue; + } + + info!("Sending a new rollups claim"); + self.transaction_sender = self + .transaction_sender + .send_rollups_claim_transaction(rollups_claim) + .await + .context(TransactionSenderSnafu)? + } + } +} + +// ------------------------------------------------------------------------------------------------ +// MultidappClaimer +// ------------------------------------------------------------------------------------------------ + +/// The `MultidappClaimer` must be injected with a `BrokerListener`, a map of `Address` to +/// `DuplicateChecker`, and a `TransactionSender`. +#[derive(Debug)] +pub struct MultidappClaimer< + B: BrokerListener, + D: DuplicateChecker, + T: TransactionSender, +> { + broker_listener: B, + duplicate_checkers: HashMap, + transaction_sender: T, +} + +impl + MultidappClaimer +{ + pub fn new( + broker_listener: B, + duplicate_checkers: HashMap, + transaction_sender: T, + ) -> Self { + Self { + broker_listener, + duplicate_checkers, + transaction_sender, + } + } +} + +#[async_trait] +impl Claimer for MultidappClaimer +where + B: BrokerListener + Send + Sync + 'static, + D: DuplicateChecker + Send + Sync + 'static, + T: TransactionSender + Send + 'static, +{ + type Error = ClaimerError; + + async fn start(mut self) -> Result<(), Self::Error> { + debug!("Starting the multidapp authority claimer loop"); + loop { + // Listens for claims from multiple dapps. + let rollups_claim = self + .broker_listener + .listen() + .await + .context(BrokerListenerSnafu)?; + let dapp_address = rollups_claim.dapp_address.clone(); + debug!( + "Got a claim from the broker for {:?}: {:?}", + dapp_address, rollups_claim + ); + + // Gets the duplicate checker for the dapp. + let duplicate_checker = self + .duplicate_checkers + .get_mut(&dapp_address) + .ok_or(ClaimerError::InvalidAppAddress { + app_address: dapp_address.clone(), + })?; + + // Checks for duplicates. + let is_duplicated_rollups_claim = duplicate_checker + .is_duplicated_rollups_claim(&rollups_claim) + .await + .context(DuplicatedClaimSnafu)?; + + // If it is a duplicate, the loop continues. + if is_duplicated_rollups_claim { + debug!("It was a duplicated claim"); continue; } + // Sends the claim. info!("Sending a new rollups claim"); self.transaction_sender = self .transaction_sender diff --git a/offchain/authority-claimer/src/lib.rs b/offchain/authority-claimer/src/lib.rs index 944efb4b4..8b31a21c9 100644 --- a/offchain/authority-claimer/src/lib.rs +++ b/offchain/authority-claimer/src/lib.rs @@ -9,7 +9,10 @@ pub mod metrics; pub mod sender; pub mod signer; -use config::Config; +use claimer::MultidappClaimer; +use config::{AuthorityClaimerConfig, Config}; +use listener::MultidappBrokerListener; +use rollups_events::Address; use snafu::Error; use tracing::trace; @@ -28,13 +31,43 @@ pub async fn run(config: Config) -> Result<(), Box> { http_server::start(config.http_server_config, metrics.clone().into()); let config = config.authority_claimer_config; - let chain_id = config.tx_manager_config.chain_id; + let claimer = create_default_claimer(metrics, config).await?; + + let claimer_handle = claimer.start(); + + // Starting the HTTP server and the claimer loop. + tokio::select! { + ret = http_server_handle => { ret? } + ret = claimer_handle => { ret? } + }; + + unreachable!() +} + +async fn create_default_claimer( + metrics: AuthorityClaimerMetrics, + config: AuthorityClaimerConfig, +) -> Result< + DefaultClaimer< + DefaultBrokerListener, + DefaultDuplicateChecker, + DefaultTransactionSender, + >, + Box, +> { // Creating the broker listener. trace!("Creating the broker listener"); - let broker_listener = - DefaultBrokerListener::new(config.broker_config.clone(), chain_id) - .await?; + + let chain_id = config.tx_manager_config.chain_id; + let dapp_address = Address::default(); // TODO + + let broker_listener = DefaultBrokerListener::new( + config.broker_config.clone(), + chain_id, + dapp_address, + ) + .await?; // Creating the duplicate checker. trace!("Creating the duplicate checker"); @@ -52,19 +85,26 @@ pub async fn run(config: Config) -> Result<(), Box> { DefaultTransactionSender::new(config.clone(), chain_id, metrics) .await?; - // Creating the claimer loop. + // Creating the claimer. let claimer = DefaultClaimer::new( broker_listener, duplicate_checker, transaction_sender, ); - let claimer_handle = claimer.start(); - // Starting the HTTP server and the claimer loop. - tokio::select! { - ret = http_server_handle => { ret? } - ret = claimer_handle => { ret? } - }; + Ok(claimer) +} - unreachable!() +async fn create_multidapp_claimer( + _metrics: AuthorityClaimerMetrics, + _config: AuthorityClaimerConfig, +) -> Result< + MultidappClaimer< + MultidappBrokerListener, + DefaultDuplicateChecker, + DefaultTransactionSender, + >, + Box, +> { + todo!() } diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 3f50a1cb1..93b677ac9 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -3,21 +3,26 @@ use async_trait::async_trait; use rollups_events::{ - Broker, BrokerConfig, BrokerError, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, + Address, Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaim, + RollupsClaimsStream, INITIAL_ID, }; use snafu::ResultExt; -use std::fmt::Debug; +use std::{collections::HashMap, fmt::Debug}; -/// The `BrokerListener` listens for new claims from the broker +/// The `BrokerListener` listens for new claims from the broker. #[async_trait] pub trait BrokerListener: Debug { type Error: snafu::Error + 'static; - /// Listen to claims async fn listen(&mut self) -> Result; } +#[derive(Debug, snafu::Snafu)] +pub enum BrokerListenerError { + #[snafu(display("broker error"))] + BrokerError { source: BrokerError }, +} + // ------------------------------------------------------------------------------------------------ // DefaultBrokerListener // ------------------------------------------------------------------------------------------------ @@ -29,20 +34,19 @@ pub struct DefaultBrokerListener { last_claim_id: String, } -#[derive(Debug, snafu::Snafu)] -pub enum BrokerListenerError { - #[snafu(display("broker error"))] - BrokerError { source: BrokerError }, -} - impl DefaultBrokerListener { pub async fn new( broker_config: BrokerConfig, chain_id: u64, + dapp_address: Address, ) -> Result { tracing::trace!("Connecting to the broker ({:?})", broker_config); let broker = Broker::new(broker_config).await?; - let stream = RollupsClaimsStream::new(chain_id); + let dapp_metadata = DAppMetadata { + chain_id, + dapp_address, + }; + let stream = RollupsClaimsStream::new(&dapp_metadata); let last_claim_id = INITIAL_ID.to_string(); Ok(Self { broker, @@ -70,31 +74,109 @@ impl BrokerListener for DefaultBrokerListener { } } +// ------------------------------------------------------------------------------------------------ +// MultidappBrokerListener +// ------------------------------------------------------------------------------------------------ + +#[derive(Debug)] +pub struct MultidappBrokerListener { + broker: Broker, + streams: HashMap, // stream => last-claim-id +} + +impl MultidappBrokerListener { + pub async fn new( + broker_config: BrokerConfig, + dapps: Vec<(u64, Address)>, // Vec<(chain_id, app_address)> + ) -> Result { + tracing::trace!("Connecting to the broker ({:?})", broker_config); + let broker = Broker::new(broker_config).await?; + + // Converts to the stream to last-consumed-id map. + let streams: Vec<_> = dapps + .into_iter() + .map(|(chain_id, dapp_address)| { + let dapp_metadata = &DAppMetadata { + chain_id, + dapp_address, + }; + let stream = RollupsClaimsStream::new(dapp_metadata); + let initial_id = INITIAL_ID.to_string(); + (stream, initial_id) + }) + .collect(); + let streams = HashMap::from_iter(streams); + + Ok(Self { broker, streams }) + } +} + +#[async_trait] +impl BrokerListener for MultidappBrokerListener { + type Error = BrokerListenerError; + + async fn listen(&mut self) -> Result { + tracing::trace!("Waiting for claim"); + + let (stream, event) = self + .broker + .consume_blocking_from_multiple_streams(self.streams.clone()) + .await + .context(BrokerSnafu)?; + + // Updates the last-consumed-id from the stream. + let replaced = self.streams.insert(stream.clone(), event.id); + assert!(replaced.is_some()); + + Ok(event.payload) + } +} + +// ------------------------------------------------------------------------------------------------ +// Tests +// ------------------------------------------------------------------------------------------------ + #[cfg(test)] mod tests { use std::time::Duration; use testcontainers::clients::Cli; - use test_fixtures::BrokerFixture; - - use crate::listener::{BrokerListener, DefaultBrokerListener}; + use test_fixtures::{broker::ClaimerMultidappBrokerFixture, BrokerFixture}; use backoff::ExponentialBackoffBuilder; use rollups_events::{ - BrokerConfig, BrokerEndpoint, BrokerError, RedactedUrl, RollupsClaim, - Url, + Address, BrokerConfig, BrokerEndpoint, BrokerError, RedactedUrl, + RollupsClaim, Url, }; - // ------------------------------------------------------------------------------------------------ + use crate::listener::BrokerListener; + + use super::{DefaultBrokerListener, MultidappBrokerListener}; + + // -------------------------------------------------------------------------------------------- // Broker Mock - // ------------------------------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- + + fn config(redis_endpoint: BrokerEndpoint) -> BrokerConfig { + BrokerConfig { + redis_endpoint, + consume_timeout: 300000, + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1000)) + .with_max_elapsed_time(Some(Duration::from_millis(3000))) + .build(), + } + } + + // -------------------------------------------------------------------------------------------- + // DefaultListener Tests + // -------------------------------------------------------------------------------------------- - pub async fn setup_broker( + async fn setup_default_broker_listener( docker: &Cli, should_fail: bool, ) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { let fixture = BrokerFixture::setup(docker).await; - let redis_endpoint = if should_fail { BrokerEndpoint::Single(RedactedUrl::new( Url::parse("https://invalid.com").unwrap(), @@ -102,21 +184,16 @@ mod tests { } else { fixture.redis_endpoint().clone() }; - - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let broker = - DefaultBrokerListener::new(config, fixture.chain_id()).await?; + let broker = DefaultBrokerListener::new( + config(redis_endpoint), + fixture.chain_id(), + fixture.dapp_address().clone(), + ) + .await?; Ok((fixture, broker)) } - pub async fn produce_rollups_claims( + async fn default_produce_claims( fixture: &BrokerFixture<'_>, n: usize, epoch_index_start: usize, @@ -132,59 +209,57 @@ mod tests { } /// The last claim should trigger an `EndError` error. - pub async fn produce_last_claim( + async fn default_produce_last_claim( fixture: &BrokerFixture<'_>, epoch_index: usize, ) -> Vec { - produce_rollups_claims(fixture, 1, epoch_index).await + default_produce_claims(fixture, 1, epoch_index).await } - // ------------------------------------------------------------------------------------------------ - // Listener Unit Tests - // ------------------------------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- #[tokio::test] - async fn instantiate_new_broker_listener_ok() { + async fn instantiate_new_default_broker_listener_ok() { let docker = Cli::default(); - let _ = setup_broker(&docker, false).await; + let _ = setup_default_broker_listener(&docker, false).await; } #[tokio::test] - async fn instantiate_new_broker_listener_error() { + async fn instantiate_new_default_broker_listener_error() { let docker = Cli::default(); - let result = setup_broker(&docker, true).await; - assert!(result.is_err(), "setup_broker didn't fail as it should"); + let result = setup_default_broker_listener(&docker, true).await; + assert!(result.is_err(), "setup didn't fail as it should"); let error = result.err().unwrap().to_string(); assert_eq!(error, "error connecting to Redis"); } #[tokio::test] - async fn start_broker_listener_with_one_claim_enqueued() { + async fn start_default_broker_listener_with_one_claim_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); + setup_default_broker_listener(&docker, false).await.unwrap(); let n = 5; - produce_rollups_claims(&fixture, n, 0).await; - produce_last_claim(&fixture, n).await; + default_produce_claims(&fixture, n, 0).await; + default_produce_last_claim(&fixture, n).await; let result = broker_listener.listen().await; assert!(result.is_ok()); } #[tokio::test] - async fn start_broker_listener_with_claims_enqueued() { + async fn start_default_broker_listener_with_claims_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - produce_last_claim(&fixture, 0).await; + setup_default_broker_listener(&docker, false).await.unwrap(); + default_produce_last_claim(&fixture, 0).await; let claim = broker_listener.listen().await; assert!(claim.is_ok()); } #[tokio::test] - async fn start_broker_listener_listener_with_no_claims_enqueued() { + async fn start_default_broker_listener_listener_with_no_claims_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); + setup_default_broker_listener(&docker, false).await.unwrap(); let n = 7; let broker_listener_thread = tokio::spawn(async move { @@ -198,17 +273,208 @@ mod tests { let x = 2; println!("Creating {} claims.", x); - produce_rollups_claims(&fixture, x, 0).await; + default_produce_claims(&fixture, x, 0).await; println!("Going to sleep for 2 seconds."); tokio::time::sleep(Duration::from_secs(2)).await; let y = 5; println!("Creating {} claims.", y); - produce_rollups_claims(&fixture, y, x).await; + default_produce_claims(&fixture, y, x).await; assert_eq!(x + y, n); - produce_last_claim(&fixture, n).await; + default_produce_last_claim(&fixture, n).await; + + broker_listener_thread.await.unwrap(); + } + + // -------------------------------------------------------------------------------------------- + // MultidappListener Tests + // -------------------------------------------------------------------------------------------- + + async fn setup_multidapp_listener( + docker: &Cli, + should_fail: bool, + ) -> Result< + ( + ClaimerMultidappBrokerFixture, + MultidappBrokerListener, + Vec
, + ), + BrokerError, + > { + let chain_id: u64 = 0; + let dapp_addresses: Vec
= vec![ + [3; 20].into(), // + [5; 20].into(), // + [7; 20].into(), // + ]; + let dapps: Vec<_> = dapp_addresses + .clone() + .into_iter() + .map(|dapp_address| (chain_id, dapp_address)) + .collect(); + + let fixture = + ClaimerMultidappBrokerFixture::setup(docker, dapps.clone()).await; + let redis_endpoint = if should_fail { + BrokerEndpoint::Single(RedactedUrl::new( + Url::parse("https://invalid.com").unwrap(), + )) + } else { + fixture.redis_endpoint().clone() + }; + + let broker = + MultidappBrokerListener::new(config(redis_endpoint), dapps).await?; + Ok((fixture, broker, dapp_addresses)) + } + + // For each index in indexes, this function produces a claim + // with rollups_claim.dapp_address = dapps[index] + // and rollups_claim.epoch_index = epochs[index]. + // It then increments epochs[index]. + async fn multidapp_produce_claims( + fixture: &ClaimerMultidappBrokerFixture<'_>, + epochs: &mut Vec, + dapps: &Vec
, + indexes: &Vec, + ) { + for &index in indexes { + let epoch = *epochs.get(index).unwrap(); + + let mut rollups_claim = RollupsClaim::default(); + rollups_claim.dapp_address = dapps.get(index).unwrap().clone(); + rollups_claim.epoch_index = epoch; + fixture.produce_rollups_claim(rollups_claim.clone()).await; + + epochs[index] = epoch + 1; + } + } + + // Asserts that listener.listen() will return indexes.len() claims, + // and that for each index in indexes + // there is an unique claim for which claim.dapp_address = dapps[index]. + async fn assert_listen( + listener: &mut MultidappBrokerListener, + dapps: &Vec
, + indexes: &Vec, + ) { + let mut dapps: Vec<_> = indexes + .iter() + .map(|&index| dapps.get(index).unwrap().clone()) + .collect(); + for _ in indexes.clone() { + println!("Listening..."); + let result = listener.listen().await; + assert!(result.is_ok()); + let dapp = result.unwrap().dapp_address; + + let index = dapps.iter().position(|expected| *expected == dapp); + assert!(index.is_some()); + dapps.remove(index.unwrap()); + } + assert!(dapps.is_empty()); + } + + // -------------------------------------------------------------------------------------------- + + #[tokio::test] + async fn instantiate_multidapp_broker_listener_ok() { + let docker = Cli::default(); + let _ = setup_multidapp_listener(&docker, false).await; + } + + #[tokio::test] + async fn instantiate_multidapp_broker_listener_error() { + let docker = Cli::default(); + let result = setup_multidapp_listener(&docker, true).await; + assert!(result.is_err(), "setup didn't fail as it should"); + let error = result.err().unwrap().to_string(); + assert_eq!(error, "error connecting to Redis"); + } + + #[tokio::test] + async fn multidapp_listen_with_one_claim_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let index = 0; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &vec![index]) + .await; + + let result = listener.listen().await; + assert!(result.is_ok()); + + let expected_dapp = dapps.get(index).unwrap().clone(); + let actual_dapp = result.unwrap().dapp_address; + assert_eq!(expected_dapp, actual_dapp); + } + + #[tokio::test] + async fn multidapp_listen_with_multiple_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 1, 2]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &indexes).await; + } + + #[tokio::test] + async fn multidapp_listen_with_one_claim_for_each_dapp_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &indexes).await; + } + + #[tokio::test] + async fn multidapp_listen_with_no_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let first_batch = vec![0, 1, 2, 0]; + let second_batch = vec![2, 1, 0, 0, 2, 1]; + + let broker_listener_thread = { + let _dapps = dapps.clone(); + let _first_batch = first_batch.clone(); + let _second_batch = second_batch.clone(); + tokio::spawn(async move { + println!("Spawned the broker-listener thread."); + assert_listen(&mut listener, &_dapps, &_first_batch).await; + println!("All good with the first batch!"); + assert_listen(&mut listener, &_dapps, &_second_batch).await; + println!("All good with the second batch!"); + }) + }; + + println!("Going to sleep for 1 second."); + tokio::time::sleep(Duration::from_secs(1)).await; + + println!("Producing the first batch of claims."); + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &first_batch) + .await; + println!("Epochs: {:?}", epochs); + + println!("Going to sleep for 2 seconds."); + tokio::time::sleep(Duration::from_secs(2)).await; + + println!("Producing the second batch of claims."); + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &second_batch) + .await; + println!("Epochs: {:?}", epochs); broker_listener_thread.await.unwrap(); } diff --git a/offchain/rollups-events/Cargo.toml b/offchain/rollups-events/Cargo.toml index 7eda7413b..d784f8024 100644 --- a/offchain/rollups-events/Cargo.toml +++ b/offchain/rollups-events/Cargo.toml @@ -12,6 +12,7 @@ base64.workspace = true clap = { workspace = true, features = ["derive", "env"] } hex.workspace = true prometheus-client.workspace = true +regex.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true snafu.workspace = true diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index fa622a3d1..c4c709522 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -14,6 +14,8 @@ use redis::{ }; use serde::{de::DeserializeOwned, Serialize}; use snafu::{ResultExt, Snafu}; +use std::collections::HashMap; +use std::convert::identity; use std::fmt; use std::time::Duration; @@ -242,7 +244,6 @@ impl Broker { } } } - /// Consume the next event in stream without blocking /// This function returns None if there are no more remaining events. /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. @@ -280,6 +281,79 @@ impl Broker { Ok(None) } } + + #[tracing::instrument(level = "trace", skip_all)] + async fn _consume_blocking_from_multiple_streams( + &mut self, + streams: &Vec, + last_consumed_ids: &Vec, + ) -> Result<(S, Event), BrokerError> { + let reply = retry(self.backoff.clone(), || async { + let stream_keys: Vec = streams + .iter() + .map(|stream| stream.key().to_string()) + .collect(); + + let opts = StreamReadOptions::default() + .count(1) + .block(self.consume_timeout); + let reply: StreamReadReply = self + .connection + .clone() + .xread_options(&stream_keys, &last_consumed_ids, &opts) + .await?; + + Ok(reply) + }) + .await + .context(ConnectionSnafu)?; + + tracing::trace!("checking for timeout"); + if reply.keys.is_empty() { + return Err(BrokerError::ConsumeTimeout); + } + + tracing::trace!("checking if any events were received"); + for mut stream_key in reply.keys { + if let Some(event) = stream_key.ids.pop() { + tracing::trace!("parsing received event"); + let stream = S::from_key(stream_key.key); + let event = event.try_into()?; + return Ok((stream, event)); + } + } + return Err(BrokerError::FailedToConsume); + } + + /// Consume the next event from one of the streams. + /// + /// This function blocks until a new event is available in one of the streams, + /// and retries whenever a timeout happens instead of returning an error. + /// + /// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn consume_blocking_from_multiple_streams( + &mut self, + streams: HashMap, // streams to last-consumed-ids + ) -> Result<(S, Event), BrokerError> { + let (streams, last_consumed_ids): (Vec<_>, Vec<_>) = + streams.into_iter().map(identity).unzip(); + + loop { + let result = self + ._consume_blocking_from_multiple_streams( + &streams, + &last_consumed_ids, + ) + .await; + + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } + } } /// Custom implementation of Debug because ConnectionManager doesn't implement debug @@ -295,6 +369,7 @@ impl fmt::Debug for Broker { pub trait BrokerStream { type Payload: Serialize + DeserializeOwned + Clone + Eq + PartialEq; fn key(&self) -> &str; + fn from_key(key: String) -> Self; } /// Event that goes through the broker diff --git a/offchain/rollups-events/src/lib.rs b/offchain/rollups-events/src/lib.rs index 8c75b8ff8..fa05140dd 100644 --- a/offchain/rollups-events/src/lib.rs +++ b/offchain/rollups-events/src/lib.rs @@ -23,4 +23,6 @@ pub use rollups_outputs::{ RollupsOutput, RollupsOutputEnum, RollupsOutputValidityProof, RollupsOutputsStream, RollupsProof, RollupsReport, RollupsVoucher, }; -pub use rollups_stream::{DAppMetadata, DAppMetadataCLIConfig}; +pub use rollups_stream::{ + parse_stream_with_key, DAppMetadata, DAppMetadataCLIConfig, +}; diff --git a/offchain/rollups-events/src/rollups_claims.rs b/offchain/rollups-events/src/rollups_claims.rs index 69cb2a486..a08b10cac 100644 --- a/offchain/rollups-events/src/rollups_claims.rs +++ b/offchain/rollups-events/src/rollups_claims.rs @@ -3,28 +3,9 @@ use serde::{Deserialize, Serialize}; -use crate::{Address, BrokerStream, Hash}; +use crate::{rollups_stream::decl_broker_stream, Address, Hash}; -#[derive(Debug)] -pub struct RollupsClaimsStream { - key: String, -} - -impl BrokerStream for RollupsClaimsStream { - type Payload = RollupsClaim; - - fn key(&self) -> &str { - &self.key - } -} - -impl RollupsClaimsStream { - pub fn new(chain_id: u64) -> Self { - Self { - key: format!("{{chain-{}}}:rollups-claims", chain_id), - } - } -} +decl_broker_stream!(RollupsClaimsStream, RollupsClaim, "rollups-claim"); /// Event generated when the Cartesi Rollups epoch finishes #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/offchain/rollups-events/src/rollups_outputs.rs b/offchain/rollups-events/src/rollups_outputs.rs index 57c8bc0c3..a485391e7 100644 --- a/offchain/rollups-events/src/rollups_outputs.rs +++ b/offchain/rollups-events/src/rollups_outputs.rs @@ -9,7 +9,7 @@ use crate::{rollups_stream::decl_broker_stream, Address, Hash, Payload}; decl_broker_stream!(RollupsOutputsStream, RollupsOutput, "rollups-outputs"); -/// Cartesi output +/// Cartesi output #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum RollupsOutput { AdvanceResult(RollupsAdvanceResult), diff --git a/offchain/rollups-events/src/rollups_stream.rs b/offchain/rollups-events/src/rollups_stream.rs index 7dd677fd3..1b366cac1 100644 --- a/offchain/rollups-events/src/rollups_stream.rs +++ b/offchain/rollups-events/src/rollups_stream.rs @@ -63,15 +63,38 @@ impl From for DAppMetadata { } } +pub fn parse_stream_with_key(key: String, inner_key: &str) -> (u64, Address) { + let mut re = r"^\{chain-([^:]+):dapp-([^}]+)\}:".to_string(); + re.push_str(inner_key); + re.push_str("$"); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let chain_id = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let address = caps.get(2).unwrap().as_str().to_string(); + let address = + serde_json::from_value(serde_json::Value::String(address)).unwrap(); + + return (chain_id, address); +} + /// Declares a struct that implements the BrokerStream interface /// The generated key has the format `{chain-:dapp-}:`. /// The curly braces define a hash tag to ensure that all of a dapp's streams /// are located in the same node when connected to a Redis cluster. macro_rules! decl_broker_stream { ($stream: ident, $payload: ty, $key: literal) => { - #[derive(Debug)] + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct $stream { key: String, + pub chain_id: u64, + pub dapp_address: Address, } impl crate::broker::BrokerStream for $stream { @@ -80,17 +103,31 @@ macro_rules! decl_broker_stream { fn key(&self) -> &str { &self.key } + + fn from_key(key: String) -> Self { + let (chain_id, dapp_address) = + crate::parse_stream_with_key(key.clone(), $key); + Self { + key: key, + chain_id: chain_id, + dapp_address: dapp_address, + } + } } impl $stream { pub fn new(metadata: &crate::rollups_stream::DAppMetadata) -> Self { + let chain_id = metadata.chain_id; + let dapp_address = metadata.dapp_address.clone(); Self { key: format!( "{{chain-{}:dapp-{}}}:{}", - metadata.chain_id, - hex::encode(metadata.dapp_address.inner()), + chain_id, + hex::encode(dapp_address.inner()), $key ), + chain_id: chain_id, + dapp_address: dapp_address, } } } @@ -102,7 +139,7 @@ pub(crate) use decl_broker_stream; #[cfg(test)] mod tests { use super::*; - use crate::ADDRESS_SIZE; + use crate::{BrokerStream, ADDRESS_SIZE}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -119,4 +156,21 @@ mod tests { let stream = MockStream::new(&metadata); assert_eq!(stream.key, "{chain-123:dapp-fafafafafafafafafafafafafafafafafafafafa}:rollups-mock"); } + + #[test] + fn it_parses_the_key() { + let metadata = DAppMetadata { + chain_id: 123, + dapp_address: Address::new([0xfe; ADDRESS_SIZE]), + }; + + let stream = MockStream::new(&metadata); + let expected = "{chain-123:dapp-fefefefefefefefefefefefefefefefefefefefe}:rollups-mock"; + let key = stream.key().to_string(); + assert_eq!(expected, &key); + + let stream = MockStream::from_key(key); + assert_eq!(metadata.chain_id, stream.chain_id); + assert_eq!(metadata.dapp_address, stream.dapp_address); + } } diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 6bfaa9cc8..60c077f08 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -1,6 +1,9 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) +use std::collections::HashMap; +use std::time::{Duration, Instant}; + use backoff::ExponentialBackoff; use redis::aio::ConnectionManager; use redis::streams::StreamRangeReply; @@ -76,6 +79,10 @@ impl BrokerStream for MockStream { fn key(&self) -> &str { STREAM_KEY } + + fn from_key(_: String) -> Self { + unimplemented!() + } } #[test_log::test(tokio::test)] @@ -286,3 +293,139 @@ async fn test_it_does_not_block_when_consuming_empty_stream() { .expect("failed to peek"); assert!(matches!(event, None)); } + +// ------------------------------------------------------------------------------------------------ + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct AnotherMockStream { + key: String, + a: u8, + b: u8, +} + +impl AnotherMockStream { + fn new(a: u8, b: u8) -> Self { + let key = format!("{{a-{}:b-{}}}:{}", a, b, STREAM_KEY); + Self { key, a, b } + } +} + +impl BrokerStream for AnotherMockStream { + type Payload = MockPayload; + + fn key(&self) -> &str { + &self.key + } + + fn from_key(key: String) -> Self { + let re = r"^\{a-([^:]+):b-([^}]+)\}:test-stream$".to_string(); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let a = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let b = caps + .get(2) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + + Self { key, a, b } + } +} + +#[test_log::test(tokio::test)] +async fn test_it_consumes_from_multiple_streams() { + let docker = Cli::default(); + let state = TestState::setup(&docker).await; + let mut broker = state.create_broker().await; + + // Creates the map of streams to last-consumed-ids. + let mut streams = HashMap::new(); + let initial_id = INITIAL_ID.to_string(); + streams.insert(AnotherMockStream::new(1, 2), initial_id.clone()); + streams.insert(AnotherMockStream::new(3, 4), initial_id.clone()); + streams.insert(AnotherMockStream::new(5, 6), initial_id.clone()); + + // Produces N events for each stream using the broker struct. + const N: usize = 3; + for stream in streams.keys() { + for i in 0..N { + let data = format!("{}{}{}", stream.a, stream.b, i); + let payload = MockPayload { data }; + let _ = broker + .produce(stream, payload) + .await + .expect("failed to produce events"); + } + } + + // Consumes all events using the broker struct. + let mut counters = HashMap::new(); + for _ in 0..(N * streams.len()) { + let (stream, event) = broker + .consume_blocking_from_multiple_streams(streams.clone()) + .await + .expect("failed to consume"); + + let i = counters + .entry(stream.clone()) + .and_modify(|n| *n += 1) + .or_insert(0) + .clone(); + + // Asserts that the payload is correct. + let expected = format!("{}{}{}", stream.a, stream.b, i); + assert_eq!(expected, event.payload.data); + + // Updates the map of streams with the last consumed id. + let replaced = streams.insert(stream, event.id); + // And asserts that the key from the map was indeed overwritten. + assert!(replaced.is_some()); + } + + // Asserts that N events were consumed from each stream. + for counter in counters.values() { + assert_eq!(N - 1, *counter); + } + + // Gets one of the streams. + let stream = streams.clone().into_keys().next().unwrap(); + let expected_stream = stream.clone(); + + // Sets a thread to produce an event in that stream after WAIT seconds. + const WAIT: u64 = 2; + let mut producer_broker = broker.clone(); + let handler = tokio::spawn(async move { + let duration = Duration::from_secs(WAIT); + tokio::time::sleep(duration).await; + let data = "final event".to_string(); + let payload = MockPayload { data }; + let _ = producer_broker + .produce(&stream, payload) + .await + .expect("failed to produce the final event"); + }); + + // Consumes the final event. + let marker = Instant::now(); + let (final_stream, _) = broker + .consume_blocking_from_multiple_streams(streams) + .await + .expect("failed to consume the final event"); + + // Asserts that the main thread blocked for at least WAIT seconds. + assert!(marker.elapsed().as_secs() >= WAIT); + + // Asserts that the event came from the correct stream. + assert_eq!(expected_stream, final_stream); + + handler.await.expect("failed to wait handler"); +} diff --git a/offchain/test-fixtures/src/broker.rs b/offchain/test-fixtures/src/broker.rs index 95b9ce19b..182bce518 100644 --- a/offchain/test-fixtures/src/broker.rs +++ b/offchain/test-fixtures/src/broker.rs @@ -1,6 +1,8 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) +use std::collections::HashMap; + use backoff::ExponentialBackoff; use rollups_events::{ Address, Broker, BrokerConfig, BrokerEndpoint, DAppMetadata, Event, @@ -16,6 +18,40 @@ use tokio::sync::Mutex; const CHAIN_ID: u64 = 0; const DAPP_ADDRESS: Address = Address::new([0xfa; ADDRESS_SIZE]); const CONSUME_TIMEOUT: usize = 10_000; // ms + // +async fn start_redis( + docker: &Cli, +) -> (Container, BrokerEndpoint, Mutex) { + tracing::trace!("starting redis docker container"); + let image = GenericImage::new("redis", "6.2").with_wait_for( + WaitFor::message_on_stdout("Ready to accept connections"), + ); + let node = docker.run(image); + let port = node.get_host_port_ipv4(6379); + let endpoint = BrokerEndpoint::Single( + Url::parse(&format!("redis://127.0.0.1:{}", port)) + .map(RedactedUrl::new) + .expect("failed to parse Redis Url"), + ); + + let backoff = ExponentialBackoff::default(); + let config = BrokerConfig { + redis_endpoint: endpoint.clone(), + consume_timeout: CONSUME_TIMEOUT, + backoff, + }; + + tracing::trace!(?endpoint, "connecting to redis with rollups_events crate"); + let client = Mutex::new( + Broker::new(config) + .await + .expect("failed to connect to broker"), + ); + + (node, endpoint, client) +} + +// ------------------------------------------------------------------------------------------------ pub struct BrokerFixture<'d> { _node: Container<'d, GenericImage>, @@ -33,51 +69,23 @@ impl BrokerFixture<'_> { pub async fn setup(docker: &Cli) -> BrokerFixture<'_> { tracing::info!("setting up redis fixture"); - tracing::trace!("starting redis docker container"); - let image = GenericImage::new("redis", "6.2").with_wait_for( - WaitFor::message_on_stdout("Ready to accept connections"), - ); - let node = docker.run(image); - let port = node.get_host_port_ipv4(6379); - let redis_endpoint = BrokerEndpoint::Single( - Url::parse(&format!("redis://127.0.0.1:{}", port)) - .map(RedactedUrl::new) - .expect("failed to parse Redis Url"), - ); - let chain_id = CHAIN_ID; - let dapp_address = DAPP_ADDRESS; - let backoff = ExponentialBackoff::default(); + let (redis_node, redis_endpoint, redis_client) = + start_redis(&docker).await; + let metadata = DAppMetadata { - chain_id, - dapp_address: dapp_address.clone(), - }; - let inputs_stream = RollupsInputsStream::new(&metadata); - let claims_stream = RollupsClaimsStream::new(metadata.chain_id); - let outputs_stream = RollupsOutputsStream::new(&metadata); - let config = BrokerConfig { - redis_endpoint: redis_endpoint.clone(), - consume_timeout: CONSUME_TIMEOUT, - backoff, + chain_id: CHAIN_ID, + dapp_address: DAPP_ADDRESS.clone(), }; - tracing::trace!( - ?redis_endpoint, - "connecting to redis with rollups_events crate" - ); - let client = Mutex::new( - Broker::new(config) - .await - .expect("failed to connect to broker"), - ); BrokerFixture { - _node: node, - client, - inputs_stream, - claims_stream, - outputs_stream, + _node: redis_node, + client: redis_client, + inputs_stream: RollupsInputsStream::new(&metadata), + claims_stream: RollupsClaimsStream::new(&metadata), + outputs_stream: RollupsOutputsStream::new(&metadata), redis_endpoint, - chain_id, - dapp_address, + chain_id: CHAIN_ID, + dapp_address: DAPP_ADDRESS, } } @@ -246,3 +254,86 @@ impl BrokerFixture<'_> { .expect("failed to produce output"); } } + +// ------------------------------------------------------------------------------------------------ + +pub struct ClaimerMultidappBrokerFixture<'d> { + _node: Container<'d, GenericImage>, + client: Mutex, + redis_endpoint: BrokerEndpoint, + claims_streams: HashMap, +} + +impl ClaimerMultidappBrokerFixture<'_> { + #[tracing::instrument(level = "trace", skip_all)] + pub async fn setup( + docker: &Cli, + dapps: Vec<(u64, Address)>, + ) -> ClaimerMultidappBrokerFixture<'_> { + let (redis_node, redis_endpoint, redis_client) = + start_redis(&docker).await; + + let claims_streams = dapps + .into_iter() + .map(|(chain_id, dapp_address)| { + let dapp_metadata = DAppMetadata { + chain_id, + dapp_address: dapp_address.clone(), + }; + let stream = RollupsClaimsStream::new(&dapp_metadata); + (dapp_address, stream) + }) + .collect::>(); + let claims_streams = HashMap::from_iter(claims_streams); + + ClaimerMultidappBrokerFixture { + _node: redis_node, + client: redis_client, + redis_endpoint, + claims_streams, + } + } + + pub fn redis_endpoint(&self) -> &BrokerEndpoint { + &self.redis_endpoint + } + + // Different from the default function, + // this one requires `rollups_claim.dapp_address` to be set, + // and to match one of the addresses from the streams. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn produce_rollups_claim(&self, rollups_claim: RollupsClaim) { + tracing::trace!(?rollups_claim.epoch_hash, "producing rollups-claim event"); + + let stream = self + .claims_streams + .get(&rollups_claim.dapp_address) + .unwrap() + .clone(); + + { + let last_claim = self + .client + .lock() + .await + .peek_latest(&stream) + .await + .expect("failed to get latest claim"); + let epoch_index = match last_claim { + Some(event) => event.payload.epoch_index + 1, + None => 0, + }; + assert_eq!( + rollups_claim.epoch_index, epoch_index, + "invalid epoch index", + ); + } + + self.client + .lock() + .await + .produce(&stream, rollups_claim) + .await + .expect("failed to produce claim"); + } +}