Skip to content

Commit

Permalink
fixup! feat(offchain): implement the broker-listener for the authorit…
Browse files Browse the repository at this point in the history
…y-claimer
  • Loading branch information
marcelstanley committed Sep 15, 2023
1 parent b411b17 commit b60f6d6
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 36 deletions.
17 changes: 5 additions & 12 deletions offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,11 @@ impl BrokerFacade {
last_id: &str,
) -> Result<Event<RollupsInput>> {
tracing::trace!(last_id, "consuming rollups input event");

loop {
let result = self
.client
.consume_blocking_deprecated(&self.inputs_stream, last_id)
.await;
if matches!(result, Err(BrokerError::ConsumeTimeout)) {
tracing::trace!("consume timed out, retrying");
} else {
return result.context(BrokerInternalSnafu);
}
}
let result = self
.client
.consume_blocking(&self.inputs_stream, last_id)
.await;
return result.context(BrokerInternalSnafu);
}

/// Produce the rollups claim if it isn't in the stream yet
Expand Down
4 changes: 2 additions & 2 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
use rollups_events::RollupsClaim;
use snafu::ResultExt;
use std::fmt::Debug;
use tracing::{info, trace};
use tracing::info;

use crate::{checker::DuplicateChecker, sender::TransactionSender};

Expand Down Expand Up @@ -65,7 +65,7 @@ where
.await
.context(DuplicatedClaimSnafu)?;
if is_duplicated_rollups_claim {
trace!("It was a duplicated claim");
info!("Duplicate claim will not be sent");
return Ok(self);
}

Expand Down
11 changes: 4 additions & 7 deletions offchain/rollups-events/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ impl Broker {
}
}

/// Deprecated, use `consume_blocking` instead
#[tracing::instrument(level = "trace", skip_all)]
pub async fn consume_blocking_deprecated<S: BrokerStream>(
async fn _consume_blocking<S: BrokerStream>(
&mut self,
stream: &S,
last_consumed_id: &str,
Expand Down Expand Up @@ -223,8 +222,8 @@ impl Broker {

/// Consume the next event in stream
///
/// This function blocks until a new event is available,
/// and it retries timeouts instead of returning an error.
/// This function blocks until a new event is available
/// and retries whenever a timeout happens instead of returning an error.
///
/// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`.
#[tracing::instrument(level = "trace", skip_all)]
Expand All @@ -234,9 +233,7 @@ impl Broker {
last_consumed_id: &str,
) -> Result<Event<S::Payload>, BrokerError> {
loop {
let result = self
.consume_blocking_deprecated(stream, last_consumed_id)
.await;
let result = self._consume_blocking(stream, last_consumed_id).await;

if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
Expand Down
16 changes: 2 additions & 14 deletions offchain/rollups-events/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async fn test_it_consumes_events() {
let mut last_id = INITIAL_ID.to_owned();
for i in 0..N {
let event = broker
.consume_blocking_deprecated(&MockStream {}, &last_id)
.consume_blocking(&MockStream {}, &last_id)
.await
.expect("failed to consume");
assert_eq!(event.id, format!("1-{}", i));
Expand Down Expand Up @@ -237,7 +237,7 @@ async fn test_it_blocks_until_event_is_produced() {
// In the main thread, wait for the expected event
let mut broker = state.create_broker().await;
let event = broker
.consume_blocking_deprecated(&MockStream {}, "0")
.consume_blocking(&MockStream {}, "0")
.await
.expect("failed to consume event");
assert_eq!(event.id, "1-0");
Expand Down Expand Up @@ -286,15 +286,3 @@ async fn test_it_does_not_block_when_consuming_empty_stream() {
.expect("failed to peek");
assert!(matches!(event, None));
}

#[test_log::test(tokio::test)]
async fn test_it_times_out_when_no_event_is_produced() {
let docker = Cli::default();
let state = TestState::setup(&docker).await;
let mut broker = state.create_broker().await;
let err = broker
.consume_blocking_deprecated(&MockStream {}, "0")
.await
.expect_err("consume event worked but it should have failed");
assert!(matches!(err, BrokerError::ConsumeTimeout));
}
2 changes: 1 addition & 1 deletion offchain/test-fixtures/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl BrokerFixture<'_> {
.client
.lock()
.await
.consume_blocking_deprecated(&self.claims_stream, &last_id)
.consume_blocking(&self.claims_stream, &last_id)
.await
.expect("failed to consume claim");
claims.push(event.payload);
Expand Down

0 comments on commit b60f6d6

Please sign in to comment.