Skip to content

Commit

Permalink
feat: build batch in the op-queue submit step
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Apr 24, 2024
1 parent 648bc6c commit 59f461c
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 73 deletions.
7 changes: 6 additions & 1 deletion rust/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ impl OpQueue {
mod test {
use super::*;
use crate::msg::pending_operation::PendingOperationResult;
use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain, MpmcChannel, TxOutcome, H256};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, MpmcChannel, TryBatchAs,
TxOutcome, H256,
};
use std::{
collections::VecDeque,
time::{Duration, Instant},
Expand All @@ -111,6 +114,8 @@ mod test {
}
}

impl TryBatchAs<HyperlaneMessage> for MockPendingOperation {}

#[async_trait::async_trait]
impl PendingOperation for MockPendingOperation {
fn id(&self) -> H256 {
Expand Down
28 changes: 18 additions & 10 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use ethers::utils::hex;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{
HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, TxOutcome, H256, U256,
BatchItem, ChainCommunicationError, ChainResult, HyperlaneChain, HyperlaneDomain,
HyperlaneMessage, Mailbox, TryBatchAs, TxOutcome, TxSubmissionData, H256, U256,
};
use prometheus::{IntCounter, IntGauge};
use tracing::{debug, error, info, instrument, trace, warn};
Expand Down Expand Up @@ -59,7 +60,7 @@ pub struct PendingMessage {
submitted: bool,
#[new(default)]
// needs to be vectorized
submission_data: Option<Box<SubmissionData>>,
submission_data: Option<Box<TxSubmissionData>>,
#[new(default)]
num_retries: u32,
#[new(value = "Instant::now()")]
Expand All @@ -70,12 +71,6 @@ pub struct PendingMessage {
submission_outcome: Option<TxOutcome>,
}

/// State for the next submission attempt generated by a prepare call.
pub struct SubmissionData {
metadata: Vec<u8>,
gas_limit: U256,
}

impl Debug for PendingMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// intentionally leaves out ctx
Expand Down Expand Up @@ -106,6 +101,19 @@ impl PartialEq for PendingMessage {

impl Eq for PendingMessage {}

impl TryBatchAs<HyperlaneMessage> for PendingMessage {
fn try_batch(&self) -> ChainResult<BatchItem<HyperlaneMessage>> {
match self.submission_data.as_ref() {
None => Err(ChainCommunicationError::BatchingFailed),
Some(data) => Ok(BatchItem::new(
self.message.clone(),
data.as_ref().clone(),
self.ctx.destination_mailbox.clone(),
)),
}
}
}

#[async_trait]
impl PendingOperation for PendingMessage {
fn id(&self) -> H256 {
Expand Down Expand Up @@ -236,7 +244,7 @@ impl PendingOperation for PendingMessage {
}
}

self.submission_data = Some(Box::new(SubmissionData {
self.submission_data = Some(Box::new(TxSubmissionData {
metadata,
gas_limit,
}));
Expand Down Expand Up @@ -331,7 +339,7 @@ impl PendingOperation for PendingMessage {
}

fn set_next_attempt_after(&mut self, delay: Duration) {
self.next_attempt_after = Some(Instant::now() + CONFIRM_DELAY);
self.next_attempt_after = Some(Instant::now() + delay);
}

fn reset_attempts(&mut self) {
Expand Down
6 changes: 3 additions & 3 deletions rust/agents/relayer/src/msg/pending_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::{
};

use async_trait::async_trait;
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, TxOutcome, H256};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, TryBatchAs, TxOutcome, H256};

use super::{op_queue::QueueOperation, pending_message::SubmissionData};
use super::op_queue::QueueOperation;

/// A pending operation that will be run by the submitter and cause a
/// transaction to be sent.
Expand All @@ -29,7 +29,7 @@ use super::{op_queue::QueueOperation, pending_message::SubmissionData};
/// responsible for checking if the operation has reached a point at which we
/// consider it safe from reorgs.
#[async_trait]
pub trait PendingOperation: Send + Sync + Debug {
pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Get the unique identifier for this operation.
fn id(&self) -> H256;

Expand Down
86 changes: 46 additions & 40 deletions rust/agents/relayer/src/msg/serial_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ use std::cmp::Reverse;
use std::time::Duration;

use derive_new::new;
use ethers::utils::hex;
use futures_util::future::try_join_all;
use prometheus::{IntCounter, IntGaugeVec};
use tokio::spawn;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument};
use tracing::{error, info, warn};

use hyperlane_base::CoreMetrics;
use hyperlane_core::{HyperlaneDomain, MpmcReceiver};
use hyperlane_core::{
BatchItem, ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneMessage,
MpmcReceiver, TxOutcome,
};

use crate::server::MessageRetryRequest;

Expand Down Expand Up @@ -127,7 +130,6 @@ impl SerialSubmitter {
spawn(submit_task(
domain.clone(),
rx_submit,
prepare_queue.clone(),
confirm_queue.clone(),
metrics.clone(),
batch_size,
Expand Down Expand Up @@ -174,7 +176,6 @@ async fn prepare_task(
metrics: SerialSubmitterMetrics,
batch_size: Option<u32>,
) {
let batch_size = batch_size.unwrap_or(1);
loop {
// Pick the next message to try preparing.
let next = prepare_queue.pop().await;
Expand Down Expand Up @@ -217,7 +218,6 @@ async fn prepare_task(
async fn submit_task(
domain: HyperlaneDomain,
mut rx_submit: mpsc::Receiver<QueueOperation>,
mut prepare_queue: OpQueue,
mut confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
batch_size: Option<u32>,
Expand All @@ -241,22 +241,19 @@ async fn submit_task(
Some(batch_size) => {
batch.add(op);
if batch.operations.len() == batch_size as usize {
batch
.submit(&mut prepare_queue, &mut confirm_queue, &metrics)
.await;
batch.submit(&mut confirm_queue, &metrics).await;
batch = OperationBatch::new();
}
}
None => {
submit_and_confirm_op(op, &mut prepare_queue, &mut confirm_queue, &metrics).await;
submit_and_confirm_op(op, &mut confirm_queue, &metrics).await;
}
}
}
}

async fn submit_and_confirm_op(
mut op: QueueOperation,
prepare_queue: &mut OpQueue,
confirm_queue: &mut OpQueue,
metrics: &SerialSubmitterMetrics,
) {
Expand All @@ -277,40 +274,49 @@ impl OperationBatch {
self.operations.push(op);
}

async fn submit(
self,
prepare_queue: &mut OpQueue,
confirm_queue: &mut OpQueue,
metrics: &SerialSubmitterMetrics,
) {
// without checking the concrete type, could have
// a TryInto<(&HyperlaneMessage, &SubmissionData)> supertrait on `PendingOperation`, which will only work for PendingMessage.
// later this may be convertible into a TryIntoBytes so it can be used universally
//
// Then we can call `mailbox.process_batch` with these (returns an error on non-ethereum chains, so we fall back to individual submits).
// We will then get a `tx_outcome` with the total gas expenditure
// We'll need to proportionally set `used_gas` based on the tx_outcome, so it can be updated in the confirm step
// which means we need to add a `set_transaction_outcome` fn to `PendingOperation`, and also `set_next_attempt_after(CONFIRM_DELAY);`

// Then we increment `metrics.ops_submitted` by the number of operations in the batch and push them to the confirm queue

// if self.domain == KnownHyperlaneDomain::Ethereum.into() {
// self.submit_ethereum().await
// } else {
// self.submit_serially().await
// }
self.submit_serially(prepare_queue, confirm_queue, metrics)
.await;
async fn submit(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
match self.try_submit_as_batch(metrics).await {
Ok(outcome) => {
// TODO: use the `tx_outcome` with the total gas expenditure
// We'll need to proportionally set `used_gas` based on the tx_outcome, so it can be updated in the confirm step
// which means we need to add a `set_transaction_outcome` fn to `PendingOperation`, and also `set_next_attempt_after(CONFIRM_DELAY);`
info!(outcome=?outcome, batch=?self.operations, "Submitted transaction batch");
return;
}
Err(e) => {
warn!(error=?e, batch=?self.operations, "Error when submitting batch. Falling back to serial submission.");
}
}
self.submit_serially(confirm_queue, metrics).await;
}

async fn submit_serially(
self,
prepare_queue: &mut OpQueue,
confirm_queue: &mut OpQueue,
async fn try_submit_as_batch(
&self,
metrics: &SerialSubmitterMetrics,
) {
) -> ChainResult<TxOutcome> {
let batch = self
.operations
.iter()
.map(|op| op.try_batch())
.collect::<ChainResult<Vec<BatchItem<HyperlaneMessage>>>>()?;
// We already assume that the relayer submits to a single mailbox per destination.
// So it's fine to use the first item in the batch to get the mailbox.

let Some(first_item) = batch.first() else {
return Err(ChainCommunicationError::BatchIsEmpty);
};
let mailbox = first_item.mailbox.clone();

// We use the estimated gas limit from the prior call to
// `process_estimate_costs` to avoid a second gas estimation.
let outcome = mailbox.process_batch(&batch).await?;
metrics.ops_submitted.inc_by(self.operations.len() as u64);
Ok(outcome)
}

async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
for op in self.operations.into_iter() {
submit_and_confirm_op(op, prepare_queue, confirm_queue, metrics).await;
submit_and_confirm_op(op, confirm_queue, metrics).await;
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions rust/chains/hyperlane-cosmos/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use tendermint::abci::EventAttribute;

use crate::utils::{CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64};
use hyperlane_core::{
utils::bytes_to_hex, ChainResult, HyperlaneChain, HyperlaneContract, HyperlaneDomain,
HyperlaneMessage, HyperlaneProvider, Indexer, LogMeta, Mailbox, TxCostEstimate, TxOutcome,
H256, U256,
utils::bytes_to_hex, BatchItem, ChainResult, HyperlaneChain, HyperlaneContract,
HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexer, LogMeta, Mailbox,
TxCostEstimate, TxOutcome, H256, U256,
};
use hyperlane_core::{
ChainCommunicationError, ContractLocator, Decode, RawHyperlaneMessage, SequenceAwareIndexer,
Expand Down Expand Up @@ -207,7 +207,7 @@ impl Mailbox for CosmosMailbox {
#[instrument(err, ret, skip(self))]
async fn process_batch(
&self,
_messages: Vec<(&HyperlaneMessage, &[u8], Option<U256>)>,
messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome> {
todo!()
}
Expand Down
14 changes: 11 additions & 3 deletions rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ethers::prelude::Middleware;
use ethers_contract::builders::ContractCall;
use ethers_contract::Multicall;
use futures_util::future::join_all;
use hyperlane_core::BatchItem;
use tracing::instrument;

use hyperlane_core::{
Expand Down Expand Up @@ -366,7 +367,7 @@ where
#[instrument(skip(self, messages))]
async fn process_batch(
&self,
messages: Vec<(&HyperlaneMessage, &[u8], Option<U256>)>,
messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome> {
let multicall = build_multicall(self.provider.clone(), &self.conn).await;
let Some(mut multicall) = multicall else {
Expand All @@ -377,8 +378,15 @@ where
};
let contract_call_futures = messages
.into_iter()
.map(|(message, metadata, tx_gas_limit)| {
self.process_contract_call(message, metadata, tx_gas_limit)
.map(|batch| async {
// move ownership of the batch inside the closure
let batch = batch.clone();
self.process_contract_call(
&batch.data,
&batch.submission_data.metadata,
Some(batch.submission_data.gas_limit),
)
.await
})
.collect::<Vec<_>>();
let contract_calls = join_all(contract_call_futures)
Expand Down
3 changes: 2 additions & 1 deletion rust/chains/hyperlane-fuel/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::ops::RangeInclusive;

use async_trait::async_trait;
use fuels::prelude::{Bech32ContractId, WalletUnlocked};
use hyperlane_core::BatchItem;
use tracing::instrument;

use hyperlane_core::{
Expand Down Expand Up @@ -108,7 +109,7 @@ impl Mailbox for FuelMailbox {
#[instrument(err, ret, skip(self))]
async fn process_batch(
&self,
_messages: Vec<(&HyperlaneMessage, &[u8], Option<U256>)>,
messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome> {
todo!()
}
Expand Down
11 changes: 6 additions & 5 deletions rust/chains/hyperlane-sealevel/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use jsonrpc_core::futures_util::TryFutureExt;
use tracing::{debug, info, instrument, warn};

use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, ChainCommunicationError, ChainResult, Checkpoint,
ContractLocator, Decode as _, Encode as _, FixedPointNumber, HyperlaneAbi, HyperlaneChain,
HyperlaneContract, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexer, LogMeta,
Mailbox, MerkleTreeHook, SequenceAwareIndexer, TxCostEstimate, TxOutcome, H256, H512, U256,
accumulator::incremental::IncrementalMerkle, BatchItem, ChainCommunicationError, ChainResult,
Checkpoint, ContractLocator, Decode as _, Encode as _, FixedPointNumber, HyperlaneAbi,
HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider,
Indexer, LogMeta, Mailbox, MerkleTreeHook, SequenceAwareIndexer, TxCostEstimate, TxOutcome,
H256, H512, U256,
};
use hyperlane_sealevel_interchain_security_module_interface::{
InterchainSecurityModuleInstruction, VerifyInstruction,
Expand Down Expand Up @@ -478,7 +479,7 @@ impl Mailbox for SealevelMailbox {
#[instrument(err, ret, skip(self))]
async fn process_batch(
&self,
_messages: Vec<(&HyperlaneMessage, &[u8], Option<U256>)>,
messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome> {
todo!()
}
Expand Down
6 changes: 6 additions & 0 deletions rust/hyperlane-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ pub enum ChainCommunicationError {
/// No signer is available and was required for the operation
#[error("Signer unavailable")]
SignerUnavailable,
/// Batching transaction failed
#[error("Batching transaction failed")]
BatchingFailed,
/// Cannot submit empty batch
#[error("Cannot submit empty batch")]
BatchIsEmpty,
/// Failed to parse strings or integers
#[error("Data parsing error {0:?}")]
StrOrIntParseError(#[from] StrOrIntParseError),
Expand Down
6 changes: 3 additions & 3 deletions rust/hyperlane-core/src/traits/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::num::NonZeroU64;
use async_trait::async_trait;

use crate::{
traits::TxOutcome, utils::domain_hash, ChainResult, HyperlaneContract, HyperlaneMessage,
TxCostEstimate, H256, U256,
traits::TxOutcome, utils::domain_hash, BatchItem, ChainResult, HyperlaneContract,
HyperlaneMessage, TxCostEstimate, H256, U256,
};

/// Interface for the Mailbox chain contract. Allows abstraction over different
Expand Down Expand Up @@ -43,7 +43,7 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug {
/// Process a message with a proof against the provided signed checkpoint
async fn process_batch(
&self,
messages: Vec<(&HyperlaneMessage, &[u8], Option<U256>)>,
messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome>;

/// Estimate transaction costs to process a message.
Expand Down
Loading

0 comments on commit 59f461c

Please sign in to comment.