Skip to content

Commit

Permalink
fixup! feat: add the MultidappClaimer as an experimental feature for …
Browse files Browse the repository at this point in the history
…sunodo
  • Loading branch information
renan061 committed Aug 16, 2024
1 parent 212bcb9 commit f9610fe
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 28 deletions.
152 changes: 129 additions & 23 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl BrokerListener for DefaultBrokerListener {
pub struct MultidappBrokerListener {
broker: Broker,
streams: HashMap<RollupsClaimsStream, String>, // stream => last-claim-id
buffer: HashMap<RollupsClaimsStream, RollupsClaim>,
chain_id: u64,
}

Expand All @@ -102,9 +103,11 @@ impl MultidappBrokerListener {
);
let broker = Broker::new(broker_config).await?;
let streams = HashMap::new();
let buffer = HashMap::new();
Ok(Self {
broker,
streams,
buffer,
chain_id,
})
}
Expand All @@ -116,51 +119,80 @@ impl MultidappBrokerListener {
async fn update_streams(&mut self) -> Result<(), BrokerListenerError> {
let initial_id = INITIAL_ID.to_string();

let streams: Vec<_> =
self.broker.get_dapps().await.context(BrokerSnafu)?;
tracing::info!("Got the following dapps: {:?}", self.streams);
// Gets the dapps from the broker.
let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?;
if dapps.is_empty() {
return Err(BrokerListenerError::NoApplicationsConfigured);
}
tracing::info!("Got the following dapps from Redis: {:?}", dapps);

let streams: Vec<_> = streams
// Converts dapps to streams.
let streams: Vec<_> = dapps
.into_iter()
.map(|dapp_address| {
let dapp_metadata = &DAppMetadata {
RollupsClaimsStream::new(&DAppMetadata {
chain_id: self.chain_id,
dapp_address,
};
let stream = RollupsClaimsStream::new(dapp_metadata);
let id = self.streams.get(&stream).unwrap_or(&initial_id);
(stream, id.clone())
})
})
.collect();
if streams.is_empty() {
return Err(BrokerListenerError::NoApplicationsConfigured);

// Removes obsolete dapps from the buffer, if any.
for key in self.buffer.clone().keys() {
if !streams.contains(key) {
self.buffer.remove(key);
}
}

// Adds the last consumed ids.
let streams: Vec<_> = streams
.into_iter()
.map(|stream| {
let id = self.streams.get(&stream).unwrap_or(&initial_id);
(stream, id.to_string())
})
.collect();

self.streams = HashMap::from_iter(streams);
Ok(())
}

async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> {
let streams_and_events: Vec<_> = self
.broker
.consume_blocking_from_multiple_streams(self.streams.clone())
.await
.context(BrokerSnafu)?;

for (stream, event) in streams_and_events {
// Updates the last-consumed-id from the stream.
let replaced = self.streams.insert(stream.clone(), event.id);
assert!(replaced.is_some());

let replaced = self.buffer.insert(stream, event.payload);
assert!(replaced.is_none());
}

Ok(())
}
}

#[async_trait]
impl BrokerListener for MultidappBrokerListener {
type Error = BrokerListenerError;

async fn listen(&mut self) -> Result<RollupsClaim, Self::Error> {
tracing::trace!("Waiting for claim");

self.update_streams().await?;

let (stream, event) = self
.broker
.consume_blocking_from_multiple_streams(self.streams.clone())
.await
.context(BrokerSnafu)?;

// Updates the last-consumed-id from the stream.
let replaced = self.streams.insert(stream.clone(), event.id);
assert!(replaced.is_some());
tracing::trace!("Waiting for a claim");
if self.buffer.is_empty() {
self.fill_buffer().await?;
}

Ok(event.payload)
let buffer = self.buffer.clone();
let (stream, rollups_claim) = buffer.into_iter().next().unwrap();
self.buffer.remove(&stream);
Ok(rollups_claim)
}
}

Expand Down Expand Up @@ -675,4 +707,78 @@ mod tests {

broker_listener_thread.await.unwrap();
}

#[tokio::test]
async fn multidapp_listen_buffer_order() {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();

let mut epochs = vec![0; dapps.len()];
let indexes = vec![1, 1, 1, 1, 1, 2, 1, 2, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;

let mut buffers = vec![
vec![0, 1, 2], //
vec![1, 2],
vec![1],
vec![1],
vec![1],
vec![1],
];

for buffer in buffers.iter_mut() {
for _ in 0..buffer.len() {
println!("Buffer: {:?}", buffer);
let result = listener.listen().await;
assert!(result.is_ok(), "{:?}", result.unwrap_err());
let dapp_address = result.unwrap().dapp_address;
let index = dapps
.iter()
.position(|address| *address == dapp_address)
.unwrap();
let index = buffer.iter().position(|i| *i == index).unwrap();
buffer.remove(index);
}
assert!(buffer.is_empty());
println!("Emptied one of the buffers");
}
}

#[tokio::test]
async fn multidapp_listen_buffer_change() {
let docker = Cli::default();
let (fixture, mut listener, mut dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();

let mut epochs = vec![0; dapps.len()];
let indexes = vec![2, 2, 2, 0, 1, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;

// Removes the last dapp.
assert!(dapps.pop().is_some());
fixture.set_dapps(dapps.clone()).await;

let mut buffers = vec![
vec![0, 1], //
vec![0],
];

for buffer in buffers.iter_mut() {
for _ in 0..buffer.len() {
println!("Buffer: {:?}", buffer);
let result = listener.listen().await;
assert!(result.is_ok(), "{:?}", result.unwrap_err());
let dapp_address = result.unwrap().dapp_address;
let index = dapps
.iter()
.position(|address| *address == dapp_address)
.unwrap();
let index = buffer.iter().position(|i| *i == index).unwrap();
buffer.remove(index);
}
assert!(buffer.is_empty());
println!("Emptied one of the buffers");
}
}
}
15 changes: 10 additions & 5 deletions offchain/rollups-events/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Broker {
&mut self,
streams: &Vec<S>,
last_consumed_ids: &Vec<String>,
) -> Result<(S, Event<S::Payload>), BrokerError> {
) -> Result<Vec<(S, Event<S::Payload>)>, BrokerError> {
let reply = retry(self.backoff.clone(), || async {
let stream_keys: Vec<String> = streams
.iter()
Expand All @@ -318,16 +318,21 @@ impl Broker {
return Err(BrokerError::ConsumeTimeout);
}

tracing::trace!("checking if any events were received");
tracing::trace!("getting the consumed events");
let mut response: Vec<(S, Event<S::Payload>)> = vec![];
for mut stream_key in reply.keys {
if let Some(event) = stream_key.ids.pop() {
tracing::trace!("parsing received event");
let stream = S::from_key(stream_key.key);
let event = event.try_into()?;
return Ok((stream, event));
response.push((stream, event));
}
}
return Err(BrokerError::FailedToConsume);
if response.is_empty() {
Err(BrokerError::FailedToConsume)
} else {
Ok(response)
}
}

/// Consume the next event from one of the streams.
Expand All @@ -342,7 +347,7 @@ impl Broker {
>(
&mut self,
streams: HashMap<S, String>, // streams to last-consumed-ids
) -> Result<(S, Event<S::Payload>), BrokerError> {
) -> Result<Vec<(S, Event<S::Payload>)>, BrokerError> {
let (streams, last_consumed_ids): (Vec<_>, Vec<_>) =
streams.into_iter().map(identity).unzip();

Expand Down

0 comments on commit f9610fe

Please sign in to comment.