diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index 12284a986..d871e1179 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -105,7 +105,8 @@ impl BrokerFacade { tracing::trace!(rollups_claim.epoch_index, ?rollups_claim.epoch_hash, - "producing rollups claim" + "producing rollups claim for stream {:?}", + self.claims_stream, ); let last_claim_event = self diff --git a/offchain/advance-runner/src/runner.rs b/offchain/advance-runner/src/runner.rs index 81736fcc3..69b7e89c5 100644 --- a/offchain/advance-runner/src/runner.rs +++ b/offchain/advance-runner/src/runner.rs @@ -127,11 +127,15 @@ impl Runner { .context(ProduceOutputsSnafu)?; tracing::trace!("produced outputs in broker stream"); + let dapp_address = rollups_claim.dapp_address.clone(); self.broker .produce_rollups_claim(rollups_claim) .await .context(ProduceClaimSnafu)?; - tracing::info!("produced epoch claim in broker stream"); + tracing::info!( + "produced epoch claim in broker stream for dapp address {:?}", + dapp_address + ); } Err(source) => { if let ServerManagerError::EmptyEpochError { .. } = source { diff --git a/offchain/authority-claimer/README.md b/offchain/authority-claimer/README.md index 34582a06e..068689fac 100644 --- a/offchain/authority-claimer/README.md +++ b/offchain/authority-claimer/README.md @@ -20,8 +20,10 @@ Instead of using evironment variables, This key holds a Redis Set value. You must use commands such as SADD and SREM to manipulate the list of addresses. Addresses are encoded as hex strings without the leading `"0x"`. -Example address value: `"0202020202020202020202020202020202020202"`. +Redis values are case sensitive, so addresses must be in lowercase format. +Example address value: `"0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a"`. -You must set `experimental-dapp-addresses-config` **before** starting the `authority-claimer`. -The `authority-claimer` stops with an error if the list is empty. -You may rewrite the list of addresses at any time, the claimer will adjust accordingly. +You may rewrite the list of addresses at any time, + the claimer will adjust accordingly. +The list of addresses can be empty at any time, + the claimer will wait until an application address is added to the set to resume operations. diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 87bca4690..992611908 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -21,9 +21,6 @@ pub trait BrokerListener: Debug { pub enum BrokerListenerError { #[snafu(display("broker error"))] BrokerError { source: BrokerError }, - - #[snafu(display("no applications configured"))] - NoApplicationsConfigured, } // ------------------------------------------------------------------------------------------------ @@ -121,9 +118,7 @@ impl MultidappBrokerListener { // Gets the dapps from the broker. let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?; - if dapps.is_empty() { - return Err(BrokerListenerError::NoApplicationsConfigured); - } + assert!(!dapps.is_empty()); tracing::info!( "Got the following dapps from key \"{}\": {:?}", rollups_events::DAPPS_KEY, @@ -161,13 +156,17 @@ impl MultidappBrokerListener { Ok(()) } - async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> { - let streams_and_events: Vec<_> = self + // Returns true if it succeeded in filling the buffer and false otherwise. + async fn fill_buffer(&mut self) -> Result { + let streams_and_events = self .broker .consume_blocking_from_multiple_streams(self.streams.clone()) - .await - .context(BrokerSnafu)?; + .await; + if let Err(BrokerError::FailedToConsume) = streams_and_events { + return Ok(false); + } + let streams_and_events = streams_and_events.context(BrokerSnafu)?; for (stream, event) in streams_and_events { // Updates the last-consumed-id from the stream. let replaced = self.streams.insert(stream.clone(), event.id); @@ -177,7 +176,7 @@ impl MultidappBrokerListener { assert!(replaced.is_none()); } - Ok(()) + Ok(true) } } @@ -190,7 +189,13 @@ impl BrokerListener for MultidappBrokerListener { tracing::trace!("Waiting for a claim"); if self.buffer.is_empty() { - self.fill_buffer().await?; + loop { + if self.fill_buffer().await? { + break; + } else { + self.update_streams().await?; + } + } } let buffer = self.buffer.clone(); @@ -217,7 +222,7 @@ mod tests { RollupsClaim, RollupsClaimsStream, Url, }; - use crate::listener::{BrokerListener, BrokerListenerError}; + use crate::listener::BrokerListener; use super::{DefaultBrokerListener, MultidappBrokerListener}; @@ -373,9 +378,9 @@ mod tests { > { let chain_id: u64 = 0; let dapp_addresses: Vec
= vec![ - [3; 20].into(), // - [5; 20].into(), // - [7; 20].into(), // + [3; 20].into(), // + [5; 20].into(), // + [10; 20].into(), // ]; let dapps: Vec<_> = dapp_addresses .clone() @@ -385,7 +390,7 @@ mod tests { let fixture = ClaimerMultidappBrokerFixture::setup(docker, dapps.clone()).await; - fixture.set_dapps(dapp_addresses.clone()).await; + fixture.dapps_set(dapp_addresses.clone()).await; let redis_endpoint = if should_fail { BrokerEndpoint::Single(RedactedUrl::new( @@ -490,16 +495,17 @@ mod tests { let docker = Cli::default(); let (fixture, mut listener, dapps) = setup_multidapp_listener(&docker, false).await.unwrap(); - fixture.set_dapps(vec![]).await; + fixture.dapps_set(vec![]).await; let mut epochs = vec![0; dapps.len()]; let indexes = vec![0, 1, 2]; multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; - let result = listener.listen().await; + + let thread = tokio::spawn(async move { + let _ = listener.listen().await; + unreachable!(); + }); + let result = tokio::time::timeout(Duration::from_secs(3), thread).await; assert!(result.is_err()); - assert_eq!( - BrokerListenerError::NoApplicationsConfigured.to_string(), - result.unwrap_err().to_string() - ); } #[tokio::test] @@ -507,13 +513,42 @@ mod tests { let docker = Cli::default(); let (fixture, mut listener, dapps) = setup_multidapp_listener(&docker, false).await.unwrap(); - fixture.set_dapps(vec![dapps.get(0).unwrap().clone()]).await; + fixture.dapps_set(vec![dapps.get(0).unwrap().clone()]).await; let mut epochs = vec![0; dapps.len()]; let indexes = vec![2, 1, 1, 2, 0]; multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; assert_listen(&mut listener, &dapps, &vec![0]).await; } + #[tokio::test] + async fn multidapp_listen_with_duplicate_dapps() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + fixture.dapps_set(vec![]).await; + + // Initializes with 0 addresses in the set. + assert_eq!(0, fixture.dapps_members().await.len()); + + // We add a lowercase and an uppercase version of the same address. + let dapp: Address = [10; 20].into(); + fixture.dapps_add(dapp.to_string().to_lowercase()).await; + fixture.dapps_add(dapp.to_string().to_uppercase()).await; + + // We now have 2 addresses in the set. + assert_eq!(2, fixture.dapps_members().await.len()); + + // We then produce some claims and listen for them. + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 2, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + let indexes = vec![2, 2]; + assert_listen(&mut listener, &dapps, &indexes).await; + + // Now we have 1 address because one of the duplicates got deleted. + assert_eq!(1, fixture.dapps_members().await.len()); + } + #[tokio::test] async fn multidapp_listen_with_changing_dapps() { let docker = Cli::default(); @@ -554,7 +589,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(first_batch_dapps.clone()).await; + fixture.dapps_set(first_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &first_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -575,7 +610,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(second_batch_dapps.clone()).await; + fixture.dapps_set(second_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &second_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -596,7 +631,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(third_batch_dapps.clone()).await; + fixture.dapps_set(third_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &third_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -617,7 +652,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(fourth_batch_dapps.clone()).await; + fixture.dapps_set(fourth_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &fourth_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -761,7 +796,7 @@ mod tests { // Removes the last dapp. assert!(dapps.pop().is_some()); - fixture.set_dapps(dapps.clone()).await; + fixture.dapps_set(dapps.clone()).await; let mut buffers = vec![ vec![0, 1], // diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index be9118997..529d1d1ee 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -338,8 +338,8 @@ impl Broker { /// Consume the next event from one of the streams. /// - /// This function blocks until a new event is available in one of the streams, - /// and retries whenever a timeout happens instead of returning an error. + /// This function blocks until a new event is available in one of the streams. + /// It timeouts with BrokerError::FailedToConsume. /// /// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`. #[tracing::instrument(level = "trace", skip_all)] @@ -352,45 +352,67 @@ impl Broker { let (streams, last_consumed_ids): (Vec<_>, Vec<_>) = streams.into_iter().map(identity).unzip(); - loop { - let result = self - ._consume_blocking_from_multiple_streams( - &streams, - &last_consumed_ids, - ) - .await; + let result = self + ._consume_blocking_from_multiple_streams( + &streams, + &last_consumed_ids, + ) + .await; - if let Err(BrokerError::ConsumeTimeout) = result { - tracing::trace!("consume timed out, retrying"); - } else { - return result; - } + if let Err(BrokerError::ConsumeTimeout) = result { + Err(BrokerError::FailedToConsume) + } else { + result } } - /// Gets the dapp addresses. #[tracing::instrument(level = "trace", skip_all)] - pub async fn get_dapps(&mut self) -> Result, BrokerError> { - retry(self.backoff.clone(), || async { + pub async fn _get_dapps(&mut self) -> Result, BrokerError> { + let reply = retry(self.backoff.clone(), || async { tracing::trace!(key = DAPPS_KEY, "getting key"); let reply: Vec = self.connection.clone().smembers(DAPPS_KEY).await?; - let dapp_addresses: Vec
= reply - .iter() - .map(|s| Address::from_str(s).unwrap()) - .collect(); + + let mut dapp_addresses: Vec
= vec![]; + for value in reply { + let normalized = value.to_lowercase(); + let dapp_address = Address::from_str(&normalized).unwrap(); + if dapp_addresses.contains(&dapp_address) { + let _: () = + self.connection.clone().srem(DAPPS_KEY, value).await?; + } else { + dapp_addresses.push(dapp_address); + } + } + Ok(dapp_addresses) }) .await - .context(ConnectionSnafu) + .context(ConnectionSnafu)?; + + if reply.is_empty() { + Err(BrokerError::ConsumeTimeout) + } else { + Ok(reply) + } + } + + /// Gets the dapp addresses. + pub async fn get_dapps(&mut self) -> Result, BrokerError> { + loop { + let result = self._get_dapps().await; + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } } /// Sets the dapp addresses. + /// NOTE: this function is used strictly for testing. #[tracing::instrument(level = "trace", skip_all)] - pub async fn set_dapps( - &mut self, - dapp_addresses: Vec
, - ) -> Result<(), BrokerError> { + pub async fn dapps_set(&mut self, dapp_addresses: Vec
) { tracing::trace!(key = DAPPS_KEY, "setting key"); let _: () = self.connection.clone().del(DAPPS_KEY).await.unwrap(); for dapp_address in dapp_addresses { @@ -401,7 +423,26 @@ impl Broker { .await .unwrap(); } - Ok(()) + } + + /// Adds a dapp address (as a string). + /// NOTE: this function is used strictly for testing. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_add(&mut self, dapp_address: String) { + tracing::trace!(dapp = dapp_address, "adding dapp"); + self.connection + .clone() + .sadd(DAPPS_KEY, dapp_address) + .await + .unwrap() + } + + /// Gets the dapp addresses as strings. + /// NOTE: this function is used strictly for testing. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_members(&mut self) -> Vec { + tracing::trace!("getting dapps members"); + self.connection.clone().smembers(DAPPS_KEY).await.unwrap() } } diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 3668e978e..daf67e905 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use std::collections::HashMap; -use std::time::{Duration, Instant}; use backoff::ExponentialBackoff; use redis::aio::ConnectionManager; @@ -400,22 +399,15 @@ async fn test_it_consumes_from_multiple_streams() { let stream = streams.clone().into_keys().next().unwrap(); let expected_stream = stream.clone(); - // Sets a thread to produce an event in that stream after WAIT seconds. - const WAIT: u64 = 2; - let mut producer_broker = broker.clone(); - let handler = tokio::spawn(async move { - let duration = Duration::from_secs(WAIT); - tokio::time::sleep(duration).await; - let data = "final event".to_string(); - let payload = MockPayload { data }; - let _ = producer_broker - .produce(&stream, payload) - .await - .expect("failed to produce the final event"); - }); + // Produces the final event. + let data = "final event".to_string(); + let payload = MockPayload { data }; + let _ = broker + .produce(&stream, payload) + .await + .expect("failed to produce the final event"); // Consumes the final event. - let marker = Instant::now(); let mut streams_and_events = broker .consume_blocking_from_multiple_streams(streams) .await @@ -423,11 +415,6 @@ async fn test_it_consumes_from_multiple_streams() { assert_eq!(1, streams_and_events.len()); let (final_stream, _) = streams_and_events.pop().unwrap(); - // Asserts that the main thread blocked for at least WAIT seconds. - assert!(marker.elapsed().as_secs() >= WAIT); - // Asserts that the event came from the correct stream. assert_eq!(expected_stream, final_stream); - - handler.await.expect("failed to wait handler"); } diff --git a/offchain/test-fixtures/src/broker.rs b/offchain/test-fixtures/src/broker.rs index b2d435472..a384b757f 100644 --- a/offchain/test-fixtures/src/broker.rs +++ b/offchain/test-fixtures/src/broker.rs @@ -299,8 +299,18 @@ impl ClaimerMultidappBrokerFixture<'_> { } #[tracing::instrument(level = "trace", skip_all)] - pub async fn set_dapps(&self, dapps: Vec
) { - self.client.lock().await.set_dapps(dapps).await.unwrap() + pub async fn dapps_set(&self, dapps: Vec
) { + self.client.lock().await.dapps_set(dapps).await + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_add(&self, dapp: String) { + self.client.lock().await.dapps_add(dapp).await + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_members(&self) -> Vec { + self.client.lock().await.dapps_members().await } // Different from the default function,