Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Aug 16, 2024
1 parent 54c0204 commit e1758e1
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 58 deletions.
95 changes: 66 additions & 29 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ pub trait BrokerListener: Debug {
pub enum BrokerListenerError {
#[snafu(display("broker error"))]
BrokerError { source: BrokerError },

#[snafu(display("no applications configured"))]
NoApplicationsConfigured,
}

// ------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -121,9 +118,7 @@ impl MultidappBrokerListener {

// Gets the dapps from the broker.
let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?;
if dapps.is_empty() {
return Err(BrokerListenerError::NoApplicationsConfigured);
}
assert!(!dapps.is_empty());
tracing::info!(
"Got the following dapps from key \"{}\": {:?}",
rollups_events::DAPPS_KEY,
Expand Down Expand Up @@ -161,13 +156,17 @@ impl MultidappBrokerListener {
Ok(())
}

async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> {
let streams_and_events: Vec<_> = self
// Returns true if it succeeded in filling the buffer and false otherwise.
async fn fill_buffer(&mut self) -> Result<bool, BrokerListenerError> {
let streams_and_events = self
.broker
.consume_blocking_from_multiple_streams(self.streams.clone())
.await
.context(BrokerSnafu)?;
.await;
if let Err(BrokerError::FailedToConsume) = streams_and_events {
return Ok(false);
}

let streams_and_events = streams_and_events.context(BrokerSnafu)?;
for (stream, event) in streams_and_events {
// Updates the last-consumed-id from the stream.
let replaced = self.streams.insert(stream.clone(), event.id);
Expand All @@ -177,7 +176,7 @@ impl MultidappBrokerListener {
assert!(replaced.is_none());
}

Ok(())
Ok(true)
}
}

Expand All @@ -190,7 +189,13 @@ impl BrokerListener for MultidappBrokerListener {

tracing::trace!("Waiting for a claim");
if self.buffer.is_empty() {
self.fill_buffer().await?;
loop {
if self.fill_buffer().await? {
break;
} else {
self.update_streams().await?;
}
}
}

let buffer = self.buffer.clone();
Expand All @@ -206,6 +211,7 @@ impl BrokerListener for MultidappBrokerListener {

#[cfg(test)]
mod tests {
use serial_test::serial;
use std::{collections::HashMap, time::Duration};
use testcontainers::clients::Cli;

Expand All @@ -217,7 +223,7 @@ mod tests {
RollupsClaim, RollupsClaimsStream, Url,
};

use crate::listener::{BrokerListener, BrokerListenerError};
use crate::listener::BrokerListener;

use super::{DefaultBrokerListener, MultidappBrokerListener};

Expand Down Expand Up @@ -373,9 +379,9 @@ mod tests {
> {
let chain_id: u64 = 0;
let dapp_addresses: Vec<Address> = vec![
[3; 20].into(), //
[5; 20].into(), //
[7; 20].into(), //
[3; 20].into(), //
[5; 20].into(), //
[10; 20].into(), //
];
let dapps: Vec<_> = dapp_addresses
.clone()
Expand All @@ -385,7 +391,7 @@ mod tests {

let fixture =
ClaimerMultidappBrokerFixture::setup(docker, dapps.clone()).await;
fixture.set_dapps(dapp_addresses.clone()).await;
fixture.dapps_set(dapp_addresses.clone()).await;

let redis_endpoint = if should_fail {
BrokerEndpoint::Single(RedactedUrl::new(
Expand Down Expand Up @@ -490,30 +496,61 @@ mod tests {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();
fixture.set_dapps(vec![]).await;
fixture.dapps_set(vec![]).await;
let mut epochs = vec![0; dapps.len()];
let indexes = vec![0, 1, 2];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;
let result = listener.listen().await;

let thread = tokio::spawn(async move {
let _ = listener.listen().await;
unreachable!();
});
let result = tokio::time::timeout(Duration::from_secs(3), thread).await;
assert!(result.is_err());
assert_eq!(
BrokerListenerError::NoApplicationsConfigured.to_string(),
result.unwrap_err().to_string()
);
}

#[tokio::test]
async fn multidapp_listen_with_one_dapp() {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();
fixture.set_dapps(vec![dapps.get(0).unwrap().clone()]).await;
fixture.dapps_set(vec![dapps.get(0).unwrap().clone()]).await;
let mut epochs = vec![0; dapps.len()];
let indexes = vec![2, 1, 1, 2, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;
assert_listen(&mut listener, &dapps, &vec![0]).await;
}

#[serial]
#[tokio::test]
async fn multidapp_listen_with_duplicate_dapps() {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();
fixture.dapps_set(vec![]).await;

// Initializes with 0 addresses in the set.
assert_eq!(0, fixture.dapps_members().await.len());

// We add a lowercase and an uppercase version of the same address.
let dapp: Address = [10; 20].into();
fixture.dapps_add(dapp.to_string().to_lowercase()).await;
fixture.dapps_add(dapp.to_string().to_uppercase()).await;

// We now have 2 addresses in the set.
assert_eq!(2, fixture.dapps_members().await.len());

// We then produce some claims and listen for them.
let mut epochs = vec![0; dapps.len()];
let indexes = vec![2, 2, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;
let indexes = vec![2, 2];
assert_listen(&mut listener, &dapps, &indexes).await;

// Now we have 1 address because one of the duplicates got deleted.
assert_eq!(1, fixture.dapps_members().await.len());
}

#[tokio::test]
async fn multidapp_listen_with_changing_dapps() {
let docker = Cli::default();
Expand Down Expand Up @@ -554,7 +591,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(first_batch_dapps.clone()).await;
fixture.dapps_set(first_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &first_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand All @@ -575,7 +612,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(second_batch_dapps.clone()).await;
fixture.dapps_set(second_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &second_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand All @@ -596,7 +633,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(third_batch_dapps.clone()).await;
fixture.dapps_set(third_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &third_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand All @@ -617,7 +654,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(fourth_batch_dapps.clone()).await;
fixture.dapps_set(fourth_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &fourth_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand Down Expand Up @@ -761,7 +798,7 @@ mod tests {

// Removes the last dapp.
assert!(dapps.pop().is_some());
fixture.set_dapps(dapps.clone()).await;
fixture.dapps_set(dapps.clone()).await;

let mut buffers = vec![
vec![0, 1], //
Expand Down
95 changes: 68 additions & 27 deletions offchain/rollups-events/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ impl Broker {

/// Consume the next event from one of the streams.
///
/// This function blocks until a new event is available in one of the streams,
/// and retries whenever a timeout happens instead of returning an error.
/// This function blocks until a new event is available in one of the streams.
/// It timeouts with BrokerError::FailedToConsume.
///
/// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`.
#[tracing::instrument(level = "trace", skip_all)]
Expand All @@ -352,45 +352,67 @@ impl Broker {
let (streams, last_consumed_ids): (Vec<_>, Vec<_>) =
streams.into_iter().map(identity).unzip();

loop {
let result = self
._consume_blocking_from_multiple_streams(
&streams,
&last_consumed_ids,
)
.await;
let result = self
._consume_blocking_from_multiple_streams(
&streams,
&last_consumed_ids,
)
.await;

if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
} else {
return result;
}
if let Err(BrokerError::ConsumeTimeout) = result {
Err(BrokerError::FailedToConsume)
} else {
result
}
}

/// Gets the dapp addresses.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
retry(self.backoff.clone(), || async {
pub async fn _get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
let reply = retry(self.backoff.clone(), || async {
tracing::trace!(key = DAPPS_KEY, "getting key");
let reply: Vec<String> =
self.connection.clone().smembers(DAPPS_KEY).await?;
let dapp_addresses: Vec<Address> = reply
.iter()
.map(|s| Address::from_str(s).unwrap())
.collect();

let mut dapp_addresses: Vec<Address> = vec![];
for value in reply {
let normalized = value.to_lowercase();
let dapp_address = Address::from_str(&normalized).unwrap();
if dapp_addresses.contains(&dapp_address) {
let _: () =
self.connection.clone().srem(DAPPS_KEY, value).await?;
} else {
dapp_addresses.push(dapp_address);
}
}

Ok(dapp_addresses)
})
.await
.context(ConnectionSnafu)
.context(ConnectionSnafu)?;

if reply.is_empty() {
Err(BrokerError::ConsumeTimeout)
} else {
Ok(reply)
}
}

/// Gets the dapp addresses.
pub async fn get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
loop {
let result = self._get_dapps().await;
if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
} else {
return result;
}
}
}

/// Sets the dapp addresses.
/// NOTE: this function is used strictly for testing.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn set_dapps(
&mut self,
dapp_addresses: Vec<Address>,
) -> Result<(), BrokerError> {
pub async fn dapps_set(&mut self, dapp_addresses: Vec<Address>) {
tracing::trace!(key = DAPPS_KEY, "setting key");
let _: () = self.connection.clone().del(DAPPS_KEY).await.unwrap();
for dapp_address in dapp_addresses {
Expand All @@ -401,7 +423,26 @@ impl Broker {
.await
.unwrap();
}
Ok(())
}

/// Adds a dapp address (as a string).
/// NOTE: this function is used strictly for testing.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn dapps_add(&mut self, dapp_address: String) {
tracing::trace!(dapp = dapp_address, "adding dapp");
self.connection
.clone()
.sadd(DAPPS_KEY, dapp_address)
.await
.unwrap()
}

/// Gets the dapp addresses as strings.
/// NOTE: this function is used strictly for testing.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn dapps_members(&mut self) -> Vec<String> {
tracing::trace!("getting dapps members");
self.connection.clone().smembers(DAPPS_KEY).await.unwrap()
}
}

Expand Down
14 changes: 12 additions & 2 deletions offchain/test-fixtures/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,18 @@ impl ClaimerMultidappBrokerFixture<'_> {
}

#[tracing::instrument(level = "trace", skip_all)]
pub async fn set_dapps(&self, dapps: Vec<Address>) {
self.client.lock().await.set_dapps(dapps).await.unwrap()
pub async fn dapps_set(&self, dapps: Vec<Address>) {
self.client.lock().await.dapps_set(dapps).await
}

#[tracing::instrument(level = "trace", skip_all)]
pub async fn dapps_add(&self, dapp: String) {
self.client.lock().await.dapps_add(dapp).await
}

#[tracing::instrument(level = "trace", skip_all)]
pub async fn dapps_members(&self) -> Vec<String> {
self.client.lock().await.dapps_members().await
}

// Different from the default function,
Expand Down

0 comments on commit e1758e1

Please sign in to comment.