Skip to content

Commit

Permalink
feat(claimer): add broker-listener
Browse files Browse the repository at this point in the history
Co-authored-by: Marcel Moura <[email protected]>
  • Loading branch information
renan061 and marcelstanley committed Oct 2, 2023
1 parent 99b6650 commit a801123
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 145 deletions.
3 changes: 3 additions & 0 deletions offchain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 5 additions & 12 deletions offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,11 @@ impl BrokerFacade {
last_id: &str,
) -> Result<Event<RollupsInput>> {
tracing::trace!(last_id, "consuming rollups input event");

loop {
let result = self
.client
.consume_blocking(&self.inputs_stream, last_id)
.await;
if matches!(result, Err(BrokerError::ConsumeTimeout)) {
tracing::trace!("consume timed out, retrying");
} else {
return result.context(BrokerInternalSnafu);
}
}
let result = self
.client
.consume_blocking(&self.inputs_stream, last_id)
.await;
return result.context(BrokerInternalSnafu);
}

/// Produce the rollups claim if it isn't in the stream yet
Expand Down
6 changes: 6 additions & 0 deletions offchain/authority-claimer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ serde_json.workspace = true
snafu.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing.workspace = true

[dev-dependencies]
test-fixtures = { path = "../test-fixtures" }

backoff = { workspace = true, features = ["tokio"] }
testcontainers.workspace = true
75 changes: 75 additions & 0 deletions offchain/authority-claimer/src/broker_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use std::time::Duration;

use backoff::ExponentialBackoffBuilder;
use rollups_events::{
BrokerConfig, BrokerEndpoint, BrokerError, DAppMetadata, RedactedUrl,
RollupsClaim, Url,
};
use snafu::Snafu;
use test_fixtures::BrokerFixture;
use testcontainers::clients::Cli;

use crate::listener::DefaultBrokerListener;

#[derive(Clone, Debug, Snafu)]
pub enum MockError {
EndError,
InternalError,
MockError,
}

pub async fn setup_broker(
docker: &Cli,
should_fail: bool,
) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> {
let fixture = BrokerFixture::setup(docker).await;

let redis_endpoint = if should_fail {
BrokerEndpoint::Single(RedactedUrl::new(
Url::parse("https://invalid.com").unwrap(),
))
} else {
fixture.redis_endpoint().clone()
};

let config = BrokerConfig {
redis_endpoint,
consume_timeout: 300000,
backoff: ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(1000))
.with_max_elapsed_time(Some(Duration::from_millis(3000)))
.build(),
};
let metadata = DAppMetadata {
chain_id: fixture.chain_id(),
dapp_address: fixture.dapp_address().clone(),
};
let broker = DefaultBrokerListener::new(config, metadata).await?;
Ok((fixture, broker))
}

pub async fn produce_rollups_claims(
fixture: &BrokerFixture<'_>,
n: usize,
epoch_index_start: usize,
) -> Vec<RollupsClaim> {
let mut rollups_claims = Vec::new();
for i in 0..n {
let mut rollups_claim = RollupsClaim::default();
rollups_claim.epoch_index = (i + epoch_index_start) as u64;
fixture.produce_rollups_claim(rollups_claim.clone()).await;
rollups_claims.push(rollups_claim);
}
rollups_claims
}

/// The last claim should trigger an `EndError` error.
pub async fn produce_last_claim(
fixture: &BrokerFixture<'_>,
epoch_index: usize,
) -> Vec<RollupsClaim> {
produce_rollups_claims(fixture, 1, epoch_index).await
}
13 changes: 6 additions & 7 deletions offchain/authority-claimer/src/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use rollups_events::RollupsClaim;
use snafu::Snafu;
use std::fmt::Debug;

/// The `DuplicateChecker` checks if a given claim was already submitted
/// to the blockchain.
/// The `DuplicateChecker` checks if a given claim was already submitted to the blockchain.
#[async_trait]
pub trait DuplicateChecker: Debug {
type Error: snafu::Error;
type Error: snafu::Error + 'static;

async fn is_duplicated_rollups_claim(
&self,
Expand All @@ -26,24 +25,24 @@ pub trait DuplicateChecker: Debug {
pub struct DefaultDuplicateChecker;

#[derive(Debug, Snafu)]
pub enum DefaultDuplicateCheckerError {
pub enum DuplicateCheckerError {
Todo,
}

impl DefaultDuplicateChecker {
pub fn new() -> Result<Self, DefaultDuplicateCheckerError> {
pub fn new() -> Result<Self, DuplicateCheckerError> {
todo!()
}
}

#[async_trait]
impl DuplicateChecker for DefaultDuplicateChecker {
type Error = DefaultDuplicateCheckerError;
type Error = DuplicateCheckerError;

async fn is_duplicated_rollups_claim(
&self,
_rollups_claim: &RollupsClaim,
) -> Result<bool, Self::Error> {
todo!()
Err(DuplicateCheckerError::Todo)
}
}
125 changes: 75 additions & 50 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,87 +3,112 @@

use async_trait::async_trait;
use snafu::ResultExt;
use std::fmt::Debug;
use tracing::{info, trace};

use crate::{
checker::DuplicateChecker, listener::BrokerListener,
sender::TransactionSender,
};

/// The `AuthorityClaimer` starts an event loop that waits for claim messages
/// The `Claimer` starts an event loop that waits for claim messages
/// from the broker, and then sends the claims to the blockchain. It checks to
/// see if the claim is duplicated before sending.
///
/// It uses three injected traits, `BrokerListener`, `DuplicateChecker`, and
/// `TransactionSender`, to, respectivelly, listen for messages, check for
/// duplicated claims, and send claims to the blockchain.
#[async_trait]
pub trait AuthorityClaimer {
async fn start<L, C, S>(
&self,
broker_listener: L,
duplicate_checker: C,
transaction_sender: S,
) -> Result<(), AuthorityClaimerError<L, C, S>>
where
L: BrokerListener + Send + Sync,
C: DuplicateChecker + Send + Sync,
S: TransactionSender + Send,
{
pub trait Claimer: Sized + Debug {
type Error: snafu::Error + 'static;

async fn start(mut self) -> Result<(), Self::Error>;
}

#[derive(Debug, snafu::Snafu)]
pub enum ClaimerError<
B: BrokerListener,
D: DuplicateChecker,
T: TransactionSender,
> {
#[snafu(display("duplicated claim error"))]
BrokerListenerError { source: B::Error },

#[snafu(display("duplicated claim error"))]
DuplicatedClaimError { source: D::Error },

#[snafu(display("transaction sender error"))]
TransactionSenderError { source: T::Error },
}

// ------------------------------------------------------------------------------------------------
// DefaultClaimer
// ------------------------------------------------------------------------------------------------

/// The `DefaultClaimer` must be injected with a
/// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`.
#[derive(Debug)]
pub struct DefaultClaimer<
B: BrokerListener,
D: DuplicateChecker,
T: TransactionSender,
> {
broker_listener: B,
duplicate_checker: D,
transaction_sender: T,
}

impl<B: BrokerListener, D: DuplicateChecker, T: TransactionSender>
DefaultClaimer<B, D, T>
{
pub fn new(
broker_listener: B,
duplicate_checker: D,
transaction_sender: T,
) -> Self {
Self {
broker_listener,
duplicate_checker,
transaction_sender,
}
}
}

#[async_trait]
impl<B, D, T> Claimer for DefaultClaimer<B, D, T>
where
B: BrokerListener + Send + Sync + 'static,
D: DuplicateChecker + Send + Sync + 'static,
T: TransactionSender + Send + 'static,
{
type Error = ClaimerError<B, D, T>;

async fn start(mut self) -> Result<(), Self::Error> {
trace!("Starting the authority claimer loop");
let mut transaction_sender = transaction_sender;
loop {
let rollups_claim = broker_listener
let rollups_claim = self
.broker_listener
.listen()
.await
.context(BrokerListenerSnafu)?;
trace!("Got a claim from the broker: {:?}", rollups_claim);

let is_duplicated_rollups_claim = duplicate_checker
let is_duplicated_rollups_claim = self
.duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicateCheckerSnafu)?;
.context(DuplicatedClaimSnafu)?;
if is_duplicated_rollups_claim {
trace!("It was a duplicated claim");
continue;
}

info!("Sending a new rollups claim");
transaction_sender = transaction_sender
.send_rollups_claim(rollups_claim)
self.transaction_sender = self
.transaction_sender
.send_rollups_claim_transaction(rollups_claim)
.await
.context(TransactionSenderSnafu)?
}
}
}

#[derive(Debug, snafu::Snafu)]
pub enum AuthorityClaimerError<
L: BrokerListener + 'static,
C: DuplicateChecker + 'static,
S: TransactionSender + 'static,
> {
#[snafu(display("broker listener error"))]
BrokerListenerError { source: L::Error },

#[snafu(display("duplicate checker error"))]
DuplicateCheckerError { source: C::Error },

#[snafu(display("transaction sender error"))]
TransactionSenderError { source: S::Error },
}

// ------------------------------------------------------------------------------------------------
// DefaultAuthorityClaimer
// ------------------------------------------------------------------------------------------------

#[derive(Default)]
pub struct DefaultAuthorityClaimer;

impl DefaultAuthorityClaimer {
pub fn new() -> Self {
Self
}
}

impl AuthorityClaimer for DefaultAuthorityClaimer {}
Loading

0 comments on commit a801123

Please sign in to comment.