Skip to content

Commit

Permalink
fixup! feat: add the MultidappClaimer as an experimental feature for …
Browse files Browse the repository at this point in the history
…sunodo
  • Loading branch information
renan061 committed Aug 16, 2024
1 parent 54c0204 commit 444255e
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 84 deletions.
3 changes: 2 additions & 1 deletion offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ impl BrokerFacade {

tracing::trace!(rollups_claim.epoch_index,
?rollups_claim.epoch_hash,
"producing rollups claim"
"producing rollups claim for stream {:?}",
self.claims_stream,
);

let last_claim_event = self
Expand Down
6 changes: 5 additions & 1 deletion offchain/advance-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,15 @@ impl Runner {
.context(ProduceOutputsSnafu)?;
tracing::trace!("produced outputs in broker stream");

let dapp_address = rollups_claim.dapp_address.clone();
self.broker
.produce_rollups_claim(rollups_claim)
.await
.context(ProduceClaimSnafu)?;
tracing::info!("produced epoch claim in broker stream");
tracing::info!(
"produced epoch claim in broker stream for dapp address {:?}",
dapp_address
);
}
Err(source) => {
if let ServerManagerError::EmptyEpochError { .. } = source {
Expand Down
10 changes: 6 additions & 4 deletions offchain/authority-claimer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ Instead of using evironment variables,
This key holds a Redis Set value.
You must use commands such as SADD and SREM to manipulate the list of addresses.
Addresses are encoded as hex strings without the leading `"0x"`.
Example address value: `"0202020202020202020202020202020202020202"`.
Redis values are case sensitive, so addresses must be in lowercase format.
Example address value: `"0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a"`.

You must set `experimental-dapp-addresses-config` **before** starting the `authority-claimer`.
The `authority-claimer` stops with an error if the list is empty.
You may rewrite the list of addresses at any time, the claimer will adjust accordingly.
You may rewrite the list of addresses at any time,
the claimer will adjust accordingly.
The list of addresses can be empty at any time,
the claimer will wait until an application address is added to the set to resume operations.
93 changes: 64 additions & 29 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ pub trait BrokerListener: Debug {
pub enum BrokerListenerError {
#[snafu(display("broker error"))]
BrokerError { source: BrokerError },

#[snafu(display("no applications configured"))]
NoApplicationsConfigured,
}

// ------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -121,9 +118,7 @@ impl MultidappBrokerListener {

// Gets the dapps from the broker.
let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?;
if dapps.is_empty() {
return Err(BrokerListenerError::NoApplicationsConfigured);
}
assert!(!dapps.is_empty());
tracing::info!(
"Got the following dapps from key \"{}\": {:?}",
rollups_events::DAPPS_KEY,
Expand Down Expand Up @@ -161,13 +156,17 @@ impl MultidappBrokerListener {
Ok(())
}

async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> {
let streams_and_events: Vec<_> = self
// Returns true if it succeeded in filling the buffer and false otherwise.
async fn fill_buffer(&mut self) -> Result<bool, BrokerListenerError> {
let streams_and_events = self
.broker
.consume_blocking_from_multiple_streams(self.streams.clone())
.await
.context(BrokerSnafu)?;
.await;
if let Err(BrokerError::FailedToConsume) = streams_and_events {
return Ok(false);
}

let streams_and_events = streams_and_events.context(BrokerSnafu)?;
for (stream, event) in streams_and_events {
// Updates the last-consumed-id from the stream.
let replaced = self.streams.insert(stream.clone(), event.id);
Expand All @@ -177,7 +176,7 @@ impl MultidappBrokerListener {
assert!(replaced.is_none());
}

Ok(())
Ok(true)
}
}

Expand All @@ -190,7 +189,13 @@ impl BrokerListener for MultidappBrokerListener {

tracing::trace!("Waiting for a claim");
if self.buffer.is_empty() {
self.fill_buffer().await?;
loop {
if self.fill_buffer().await? {
break;
} else {
self.update_streams().await?;
}
}
}

let buffer = self.buffer.clone();
Expand All @@ -217,7 +222,7 @@ mod tests {
RollupsClaim, RollupsClaimsStream, Url,
};

use crate::listener::{BrokerListener, BrokerListenerError};
use crate::listener::BrokerListener;

use super::{DefaultBrokerListener, MultidappBrokerListener};

Expand Down Expand Up @@ -373,9 +378,9 @@ mod tests {
> {
let chain_id: u64 = 0;
let dapp_addresses: Vec<Address> = vec![
[3; 20].into(), //
[5; 20].into(), //
[7; 20].into(), //
[3; 20].into(), //
[5; 20].into(), //
[10; 20].into(), //
];
let dapps: Vec<_> = dapp_addresses
.clone()
Expand All @@ -385,7 +390,7 @@ mod tests {

let fixture =
ClaimerMultidappBrokerFixture::setup(docker, dapps.clone()).await;
fixture.set_dapps(dapp_addresses.clone()).await;
fixture.dapps_set(dapp_addresses.clone()).await;

let redis_endpoint = if should_fail {
BrokerEndpoint::Single(RedactedUrl::new(
Expand Down Expand Up @@ -490,30 +495,60 @@ mod tests {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();
fixture.set_dapps(vec![]).await;
fixture.dapps_set(vec![]).await;
let mut epochs = vec![0; dapps.len()];
let indexes = vec![0, 1, 2];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;
let result = listener.listen().await;

let thread = tokio::spawn(async move {
let _ = listener.listen().await;
unreachable!();
});
let result = tokio::time::timeout(Duration::from_secs(3), thread).await;
assert!(result.is_err());
assert_eq!(
BrokerListenerError::NoApplicationsConfigured.to_string(),
result.unwrap_err().to_string()
);
}

#[tokio::test]
async fn multidapp_listen_with_one_dapp() {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();
fixture.set_dapps(vec![dapps.get(0).unwrap().clone()]).await;
fixture.dapps_set(vec![dapps.get(0).unwrap().clone()]).await;
let mut epochs = vec![0; dapps.len()];
let indexes = vec![2, 1, 1, 2, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;
assert_listen(&mut listener, &dapps, &vec![0]).await;
}

#[tokio::test]
async fn multidapp_listen_with_duplicate_dapps() {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();
fixture.dapps_set(vec![]).await;

// Initializes with 0 addresses in the set.
assert_eq!(0, fixture.dapps_members().await.len());

// We add a lowercase and an uppercase version of the same address.
let dapp: Address = [10; 20].into();
fixture.dapps_add(dapp.to_string().to_lowercase()).await;
fixture.dapps_add(dapp.to_string().to_uppercase()).await;

// We now have 2 addresses in the set.
assert_eq!(2, fixture.dapps_members().await.len());

// We then produce some claims and listen for them.
let mut epochs = vec![0; dapps.len()];
let indexes = vec![2, 2, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;
let indexes = vec![2, 2];
assert_listen(&mut listener, &dapps, &indexes).await;

// Now we have 1 address because one of the duplicates got deleted.
assert_eq!(1, fixture.dapps_members().await.len());
}

#[tokio::test]
async fn multidapp_listen_with_changing_dapps() {
let docker = Cli::default();
Expand Down Expand Up @@ -554,7 +589,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(first_batch_dapps.clone()).await;
fixture.dapps_set(first_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &first_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand All @@ -575,7 +610,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(second_batch_dapps.clone()).await;
fixture.dapps_set(second_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &second_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand All @@ -596,7 +631,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(third_batch_dapps.clone()).await;
fixture.dapps_set(third_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &third_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand All @@ -617,7 +652,7 @@ mod tests {
println!("=== Epochs: {:?}", epochs);

println!("--- Setting dapps...");
fixture.set_dapps(fourth_batch_dapps.clone()).await;
fixture.dapps_set(fourth_batch_dapps.clone()).await;
assert_listen(&mut listener, &dapps, &fourth_batch).await;
let mut dapps = streams_to_vec(&listener.streams);
dapps.sort();
Expand Down Expand Up @@ -761,7 +796,7 @@ mod tests {

// Removes the last dapp.
assert!(dapps.pop().is_some());
fixture.set_dapps(dapps.clone()).await;
fixture.dapps_set(dapps.clone()).await;

let mut buffers = vec![
vec![0, 1], //
Expand Down
95 changes: 68 additions & 27 deletions offchain/rollups-events/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ impl Broker {

/// Consume the next event from one of the streams.
///
/// This function blocks until a new event is available in one of the streams,
/// and retries whenever a timeout happens instead of returning an error.
/// This function blocks until a new event is available in one of the streams.
/// It timeouts with BrokerError::FailedToConsume.
///
/// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`.
#[tracing::instrument(level = "trace", skip_all)]
Expand All @@ -352,45 +352,67 @@ impl Broker {
let (streams, last_consumed_ids): (Vec<_>, Vec<_>) =
streams.into_iter().map(identity).unzip();

loop {
let result = self
._consume_blocking_from_multiple_streams(
&streams,
&last_consumed_ids,
)
.await;
let result = self
._consume_blocking_from_multiple_streams(
&streams,
&last_consumed_ids,
)
.await;

if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
} else {
return result;
}
if let Err(BrokerError::ConsumeTimeout) = result {
Err(BrokerError::FailedToConsume)
} else {
result
}
}

/// Gets the dapp addresses.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
retry(self.backoff.clone(), || async {
pub async fn _get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
let reply = retry(self.backoff.clone(), || async {
tracing::trace!(key = DAPPS_KEY, "getting key");
let reply: Vec<String> =
self.connection.clone().smembers(DAPPS_KEY).await?;
let dapp_addresses: Vec<Address> = reply
.iter()
.map(|s| Address::from_str(s).unwrap())
.collect();

let mut dapp_addresses: Vec<Address> = vec![];
for value in reply {
let normalized = value.to_lowercase();
let dapp_address = Address::from_str(&normalized).unwrap();
if dapp_addresses.contains(&dapp_address) {
let _: () =
self.connection.clone().srem(DAPPS_KEY, value).await?;
} else {
dapp_addresses.push(dapp_address);
}
}

Ok(dapp_addresses)
})
.await
.context(ConnectionSnafu)
.context(ConnectionSnafu)?;

if reply.is_empty() {
Err(BrokerError::ConsumeTimeout)
} else {
Ok(reply)
}
}

/// Gets the dapp addresses.
pub async fn get_dapps(&mut self) -> Result<Vec<Address>, BrokerError> {
loop {
let result = self._get_dapps().await;
if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
} else {
return result;
}
}
}

/// Sets the dapp addresses.
/// NOTE: this function is used strictly for testing.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn set_dapps(
&mut self,
dapp_addresses: Vec<Address>,
) -> Result<(), BrokerError> {
pub async fn dapps_set(&mut self, dapp_addresses: Vec<Address>) {
tracing::trace!(key = DAPPS_KEY, "setting key");
let _: () = self.connection.clone().del(DAPPS_KEY).await.unwrap();
for dapp_address in dapp_addresses {
Expand All @@ -401,7 +423,26 @@ impl Broker {
.await
.unwrap();
}
Ok(())
}

/// Adds a dapp address (as a string).
/// NOTE: this function is used strictly for testing.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn dapps_add(&mut self, dapp_address: String) {
tracing::trace!(dapp = dapp_address, "adding dapp");
self.connection
.clone()
.sadd(DAPPS_KEY, dapp_address)
.await
.unwrap()
}

/// Gets the dapp addresses as strings.
/// NOTE: this function is used strictly for testing.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn dapps_members(&mut self) -> Vec<String> {
tracing::trace!("getting dapps members");
self.connection.clone().smembers(DAPPS_KEY).await.unwrap()
}
}

Expand Down
Loading

0 comments on commit 444255e

Please sign in to comment.