Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Aug 5, 2024
1 parent 93f822a commit 4d54ca4
Show file tree
Hide file tree
Showing 13 changed files with 889 additions and 142 deletions.
1 change: 1 addition & 0 deletions offchain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl BrokerFacade {
let client = Broker::new(config).await.context(BrokerInternalSnafu)?;
let inputs_stream = RollupsInputsStream::new(&dapp_metadata);
let outputs_stream = RollupsOutputsStream::new(&dapp_metadata);
let claims_stream = RollupsClaimsStream::new(dapp_metadata.chain_id);
let claims_stream = RollupsClaimsStream::new(&dapp_metadata);
Ok(Self {
client,
inputs_stream,
Expand Down
103 changes: 98 additions & 5 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

use async_trait::async_trait;
use snafu::ResultExt;
use std::fmt::Debug;
use tracing::{info, trace};
use std::{collections::HashMap, fmt::Debug};
use tracing::{debug, info};

use rollups_events::Address;

use crate::{
checker::DuplicateChecker, listener::BrokerListener,
Expand All @@ -31,6 +33,9 @@ pub enum ClaimerError<
D: DuplicateChecker,
T: TransactionSender,
> {
#[snafu(display("invalid app address {:?}", app_address))]
InvalidAppAddress { app_address: Address },

#[snafu(display("broker listener error"))]
BrokerListenerError { source: B::Error },

Expand Down Expand Up @@ -84,25 +89,113 @@ where
type Error = ClaimerError<B, D, T>;

async fn start(mut self) -> Result<(), Self::Error> {
trace!("Starting the authority claimer loop");
debug!("Starting the authority claimer loop");
loop {
let rollups_claim = self
.broker_listener
.listen()
.await
.context(BrokerListenerSnafu)?;
trace!("Got a claim from the broker: {:?}", rollups_claim);
debug!("Got a claim from the broker: {:?}", rollups_claim);

let is_duplicated_rollups_claim = self
.duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicatedClaimSnafu)?;
if is_duplicated_rollups_claim {
trace!("It was a duplicated claim");
debug!("It was a duplicated claim");
continue;
}

info!("Sending a new rollups claim");
self.transaction_sender = self
.transaction_sender
.send_rollups_claim_transaction(rollups_claim)
.await
.context(TransactionSenderSnafu)?
}
}
}

// ------------------------------------------------------------------------------------------------
// MultidappClaimer
// ------------------------------------------------------------------------------------------------

/// The `MultidappClaimer` must be injected with a `BrokerListener`, a map of `Address` to
/// `DuplicateChecker`, and a `TransactionSender`.
#[derive(Debug)]
pub struct MultidappClaimer<
B: BrokerListener,
D: DuplicateChecker,
T: TransactionSender,
> {
broker_listener: B,
duplicate_checkers: HashMap<Address, D>,
transaction_sender: T,
}

impl<B: BrokerListener, D: DuplicateChecker, T: TransactionSender>
MultidappClaimer<B, D, T>
{
pub fn new(
broker_listener: B,
duplicate_checkers: HashMap<Address, D>,
transaction_sender: T,
) -> Self {
Self {
broker_listener,
duplicate_checkers,
transaction_sender,
}
}
}

#[async_trait]
impl<B, D, T> Claimer for MultidappClaimer<B, D, T>
where
B: BrokerListener + Send + Sync + 'static,
D: DuplicateChecker + Send + Sync + 'static,
T: TransactionSender + Send + 'static,
{
type Error = ClaimerError<B, D, T>;

async fn start(mut self) -> Result<(), Self::Error> {
debug!("Starting the multidapp authority claimer loop");
loop {
// Listens for claims from multiple dapps.
let rollups_claim = self
.broker_listener
.listen()
.await
.context(BrokerListenerSnafu)?;
let dapp_address = rollups_claim.dapp_address.clone();
debug!(
"Got a claim from the broker for {:?}: {:?}",
dapp_address, rollups_claim
);

// Gets the duplicate checker for the dapp.
let duplicate_checker = self
.duplicate_checkers
.get_mut(&dapp_address)
.ok_or(ClaimerError::InvalidAppAddress {
app_address: dapp_address.clone(),
})?;

// Checks for duplicates.
let is_duplicated_rollups_claim = duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicatedClaimSnafu)?;

// If it is a duplicate, the loop continues.
if is_duplicated_rollups_claim {
debug!("It was a duplicated claim");
continue;
}

// Sends the claim.
info!("Sending a new rollups claim");
self.transaction_sender = self
.transaction_sender
Expand Down
66 changes: 53 additions & 13 deletions offchain/authority-claimer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ pub mod metrics;
pub mod sender;
pub mod signer;

use config::Config;
use claimer::MultidappClaimer;
use config::{AuthorityClaimerConfig, Config};
use listener::MultidappBrokerListener;
use rollups_events::Address;
use snafu::Error;
use tracing::trace;

Expand All @@ -28,13 +31,43 @@ pub async fn run(config: Config) -> Result<(), Box<dyn Error>> {
http_server::start(config.http_server_config, metrics.clone().into());

let config = config.authority_claimer_config;
let chain_id = config.tx_manager_config.chain_id;

let claimer = create_default_claimer(metrics, config).await?;

let claimer_handle = claimer.start();

// Starting the HTTP server and the claimer loop.
tokio::select! {
ret = http_server_handle => { ret? }
ret = claimer_handle => { ret? }
};

unreachable!()
}

async fn create_default_claimer(
metrics: AuthorityClaimerMetrics,
config: AuthorityClaimerConfig,
) -> Result<
DefaultClaimer<
DefaultBrokerListener,
DefaultDuplicateChecker,
DefaultTransactionSender,
>,
Box<dyn Error>,
> {
// Creating the broker listener.
trace!("Creating the broker listener");
let broker_listener =
DefaultBrokerListener::new(config.broker_config.clone(), chain_id)
.await?;

let chain_id = config.tx_manager_config.chain_id;
let dapp_address = Address::default(); // TODO

let broker_listener = DefaultBrokerListener::new(
config.broker_config.clone(),
chain_id,
dapp_address,
)
.await?;

// Creating the duplicate checker.
trace!("Creating the duplicate checker");
Expand All @@ -52,19 +85,26 @@ pub async fn run(config: Config) -> Result<(), Box<dyn Error>> {
DefaultTransactionSender::new(config.clone(), chain_id, metrics)
.await?;

// Creating the claimer loop.
// Creating the claimer.
let claimer = DefaultClaimer::new(
broker_listener,
duplicate_checker,
transaction_sender,
);
let claimer_handle = claimer.start();

// Starting the HTTP server and the claimer loop.
tokio::select! {
ret = http_server_handle => { ret? }
ret = claimer_handle => { ret? }
};
Ok(claimer)
}

unreachable!()
async fn create_multidapp_claimer(
_metrics: AuthorityClaimerMetrics,
_config: AuthorityClaimerConfig,
) -> Result<
MultidappClaimer<
MultidappBrokerListener,
DefaultDuplicateChecker,
DefaultTransactionSender,
>,
Box<dyn Error>,
> {
todo!()
}
Loading

0 comments on commit 4d54ca4

Please sign in to comment.