diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 19103244d..c15222283 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -88,6 +88,7 @@ impl BrokerListener for DefaultBrokerListener { pub struct MultidappBrokerListener { broker: Broker, streams: HashMap, // stream => last-claim-id + buffer: HashMap, chain_id: u64, } @@ -102,9 +103,11 @@ impl MultidappBrokerListener { ); let broker = Broker::new(broker_config).await?; let streams = HashMap::new(); + let buffer = HashMap::new(); Ok(Self { broker, streams, + buffer, chain_id, }) } @@ -116,29 +119,62 @@ impl MultidappBrokerListener { async fn update_streams(&mut self) -> Result<(), BrokerListenerError> { let initial_id = INITIAL_ID.to_string(); - let streams: Vec<_> = - self.broker.get_dapps().await.context(BrokerSnafu)?; - tracing::info!("Got the following dapps: {:?}", self.streams); + // Gets the dapps from the broker. + let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?; + if dapps.is_empty() { + return Err(BrokerListenerError::NoApplicationsConfigured); + } + tracing::info!("Got the following dapps from Redis: {:?}", dapps); - let streams: Vec<_> = streams + // Converts dapps to streams. + let streams: Vec<_> = dapps .into_iter() .map(|dapp_address| { - let dapp_metadata = &DAppMetadata { + RollupsClaimsStream::new(&DAppMetadata { chain_id: self.chain_id, dapp_address, - }; - let stream = RollupsClaimsStream::new(dapp_metadata); - let id = self.streams.get(&stream).unwrap_or(&initial_id); - (stream, id.clone()) + }) }) .collect(); - if streams.is_empty() { - return Err(BrokerListenerError::NoApplicationsConfigured); + + // Removes obsolete dapps from the buffer, if any. + for key in self.buffer.clone().keys() { + if !streams.contains(key) { + self.buffer.remove(key); + } } + // Adds the last consumed ids. + let streams: Vec<_> = streams + .into_iter() + .map(|stream| { + let id = self.streams.get(&stream).unwrap_or(&initial_id); + (stream, id.to_string()) + }) + .collect(); + self.streams = HashMap::from_iter(streams); Ok(()) } + + async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> { + let streams_and_events: Vec<_> = self + .broker + .consume_blocking_from_multiple_streams(self.streams.clone()) + .await + .context(BrokerSnafu)?; + + for (stream, event) in streams_and_events { + // Updates the last-consumed-id from the stream. + let replaced = self.streams.insert(stream.clone(), event.id); + assert!(replaced.is_some()); + + let replaced = self.buffer.insert(stream, event.payload); + assert!(replaced.is_none()); + } + + Ok(()) + } } #[async_trait] @@ -146,21 +182,17 @@ impl BrokerListener for MultidappBrokerListener { type Error = BrokerListenerError; async fn listen(&mut self) -> Result { - tracing::trace!("Waiting for claim"); - self.update_streams().await?; - let (stream, event) = self - .broker - .consume_blocking_from_multiple_streams(self.streams.clone()) - .await - .context(BrokerSnafu)?; - - // Updates the last-consumed-id from the stream. - let replaced = self.streams.insert(stream.clone(), event.id); - assert!(replaced.is_some()); + tracing::trace!("Waiting for a claim"); + if self.buffer.is_empty() { + self.fill_buffer().await?; + } - Ok(event.payload) + let buffer = self.buffer.clone(); + let (stream, rollups_claim) = buffer.into_iter().next().unwrap(); + self.buffer.remove(&stream); + Ok(rollups_claim) } } @@ -675,4 +707,78 @@ mod tests { broker_listener_thread.await.unwrap(); } + + #[tokio::test] + async fn multidapp_listen_buffer_order() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![1, 1, 1, 1, 1, 2, 1, 2, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + + let mut buffers = vec![ + vec![0, 1, 2], // + vec![1, 2], + vec![1], + vec![1], + vec![1], + vec![1], + ]; + + for buffer in buffers.iter_mut() { + for _ in 0..buffer.len() { + println!("Buffer: {:?}", buffer); + let result = listener.listen().await; + assert!(result.is_ok(), "{:?}", result.unwrap_err()); + let dapp_address = result.unwrap().dapp_address; + let index = dapps + .iter() + .position(|address| *address == dapp_address) + .unwrap(); + let index = buffer.iter().position(|i| *i == index).unwrap(); + buffer.remove(index); + } + assert!(buffer.is_empty()); + println!("Emptied one of the buffers"); + } + } + + #[tokio::test] + async fn multidapp_listen_buffer_change() { + let docker = Cli::default(); + let (fixture, mut listener, mut dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 2, 2, 0, 1, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + + // Removes the last dapp. + assert!(dapps.pop().is_some()); + fixture.set_dapps(dapps.clone()).await; + + let mut buffers = vec![ + vec![0, 1], // + vec![0], + ]; + + for buffer in buffers.iter_mut() { + for _ in 0..buffer.len() { + println!("Buffer: {:?}", buffer); + let result = listener.listen().await; + assert!(result.is_ok(), "{:?}", result.unwrap_err()); + let dapp_address = result.unwrap().dapp_address; + let index = dapps + .iter() + .position(|address| *address == dapp_address) + .unwrap(); + let index = buffer.iter().position(|i| *i == index).unwrap(); + buffer.remove(index); + } + assert!(buffer.is_empty()); + println!("Emptied one of the buffers"); + } + } } diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index d44d4fb5c..41dd00db3 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -292,7 +292,7 @@ impl Broker { &mut self, streams: &Vec, last_consumed_ids: &Vec, - ) -> Result<(S, Event), BrokerError> { + ) -> Result)>, BrokerError> { let reply = retry(self.backoff.clone(), || async { let stream_keys: Vec = streams .iter() @@ -318,16 +318,21 @@ impl Broker { return Err(BrokerError::ConsumeTimeout); } - tracing::trace!("checking if any events were received"); + tracing::trace!("getting the consumed events"); + let mut response: Vec<(S, Event)> = vec![]; for mut stream_key in reply.keys { if let Some(event) = stream_key.ids.pop() { tracing::trace!("parsing received event"); let stream = S::from_key(stream_key.key); let event = event.try_into()?; - return Ok((stream, event)); + response.push((stream, event)); } } - return Err(BrokerError::FailedToConsume); + if response.is_empty() { + Err(BrokerError::FailedToConsume) + } else { + Ok(response) + } } /// Consume the next event from one of the streams. @@ -342,7 +347,7 @@ impl Broker { >( &mut self, streams: HashMap, // streams to last-consumed-ids - ) -> Result<(S, Event), BrokerError> { + ) -> Result)>, BrokerError> { let (streams, last_consumed_ids): (Vec<_>, Vec<_>) = streams.into_iter().map(identity).unzip();