diff --git a/rust/agents/relayer/src/msg/op_queue.rs b/rust/agents/relayer/src/msg/op_queue.rs index bb96910994..7f79a79eab 100644 --- a/rust/agents/relayer/src/msg/op_queue.rs +++ b/rust/agents/relayer/src/msg/op_queue.rs @@ -88,7 +88,10 @@ impl OpQueue { mod test { use super::*; use crate::msg::pending_operation::PendingOperationResult; - use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain, MpmcChannel, TxOutcome, H256}; + use hyperlane_core::{ + HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, MpmcChannel, TryBatchAs, + TxOutcome, H256, + }; use std::{ collections::VecDeque, time::{Duration, Instant}, @@ -111,6 +114,8 @@ mod test { } } + impl TryBatchAs for MockPendingOperation {} + #[async_trait::async_trait] impl PendingOperation for MockPendingOperation { fn id(&self) -> H256 { diff --git a/rust/agents/relayer/src/msg/pending_message.rs b/rust/agents/relayer/src/msg/pending_message.rs index d9e65b4b0a..6ee7961ef5 100644 --- a/rust/agents/relayer/src/msg/pending_message.rs +++ b/rust/agents/relayer/src/msg/pending_message.rs @@ -10,7 +10,8 @@ use ethers::utils::hex; use eyre::Result; use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics}; use hyperlane_core::{ - HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, TxOutcome, H256, U256, + BatchItem, ChainCommunicationError, ChainResult, HyperlaneChain, HyperlaneDomain, + HyperlaneMessage, Mailbox, TryBatchAs, TxOutcome, TxSubmissionData, H256, U256, }; use prometheus::{IntCounter, IntGauge}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -59,7 +60,7 @@ pub struct PendingMessage { submitted: bool, #[new(default)] // needs to be vectorized - submission_data: Option>, + submission_data: Option>, #[new(default)] num_retries: u32, #[new(value = "Instant::now()")] @@ -70,12 +71,6 @@ pub struct PendingMessage { submission_outcome: Option, } -/// State for the next submission attempt generated by a prepare call. -pub struct SubmissionData { - metadata: Vec, - gas_limit: U256, -} - impl Debug for PendingMessage { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { // intentionally leaves out ctx @@ -106,6 +101,19 @@ impl PartialEq for PendingMessage { impl Eq for PendingMessage {} +impl TryBatchAs for PendingMessage { + fn try_batch(&self) -> ChainResult> { + match self.submission_data.as_ref() { + None => Err(ChainCommunicationError::BatchingFailed), + Some(data) => Ok(BatchItem::new( + self.message.clone(), + data.as_ref().clone(), + self.ctx.destination_mailbox.clone(), + )), + } + } +} + #[async_trait] impl PendingOperation for PendingMessage { fn id(&self) -> H256 { @@ -236,7 +244,7 @@ impl PendingOperation for PendingMessage { } } - self.submission_data = Some(Box::new(SubmissionData { + self.submission_data = Some(Box::new(TxSubmissionData { metadata, gas_limit, })); @@ -331,7 +339,7 @@ impl PendingOperation for PendingMessage { } fn set_next_attempt_after(&mut self, delay: Duration) { - self.next_attempt_after = Some(Instant::now() + CONFIRM_DELAY); + self.next_attempt_after = Some(Instant::now() + delay); } fn reset_attempts(&mut self) { diff --git a/rust/agents/relayer/src/msg/pending_operation.rs b/rust/agents/relayer/src/msg/pending_operation.rs index 16fbbfdeee..fe43bac4a8 100644 --- a/rust/agents/relayer/src/msg/pending_operation.rs +++ b/rust/agents/relayer/src/msg/pending_operation.rs @@ -5,9 +5,9 @@ use std::{ }; use async_trait::async_trait; -use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, TxOutcome, H256}; +use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, TryBatchAs, TxOutcome, H256}; -use super::{op_queue::QueueOperation, pending_message::SubmissionData}; +use super::op_queue::QueueOperation; /// A pending operation that will be run by the submitter and cause a /// transaction to be sent. @@ -29,7 +29,7 @@ use super::{op_queue::QueueOperation, pending_message::SubmissionData}; /// responsible for checking if the operation has reached a point at which we /// consider it safe from reorgs. #[async_trait] -pub trait PendingOperation: Send + Sync + Debug { +pub trait PendingOperation: Send + Sync + Debug + TryBatchAs { /// Get the unique identifier for this operation. fn id(&self) -> H256; diff --git a/rust/agents/relayer/src/msg/serial_submitter.rs b/rust/agents/relayer/src/msg/serial_submitter.rs index 7ce3a7cd50..0da1461e90 100644 --- a/rust/agents/relayer/src/msg/serial_submitter.rs +++ b/rust/agents/relayer/src/msg/serial_submitter.rs @@ -2,7 +2,6 @@ use std::cmp::Reverse; use std::time::Duration; use derive_new::new; -use ethers::utils::hex; use futures_util::future::try_join_all; use prometheus::{IntCounter, IntGaugeVec}; use tokio::spawn; @@ -10,9 +9,13 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument}; +use tracing::{error, info, warn}; use hyperlane_base::CoreMetrics; -use hyperlane_core::{HyperlaneDomain, MpmcReceiver}; +use hyperlane_core::{ + BatchItem, ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneMessage, + MpmcReceiver, TxOutcome, +}; use crate::server::MessageRetryRequest; @@ -127,7 +130,6 @@ impl SerialSubmitter { spawn(submit_task( domain.clone(), rx_submit, - prepare_queue.clone(), confirm_queue.clone(), metrics.clone(), batch_size, @@ -174,7 +176,6 @@ async fn prepare_task( metrics: SerialSubmitterMetrics, batch_size: Option, ) { - let batch_size = batch_size.unwrap_or(1); loop { // Pick the next message to try preparing. let next = prepare_queue.pop().await; @@ -217,7 +218,6 @@ async fn prepare_task( async fn submit_task( domain: HyperlaneDomain, mut rx_submit: mpsc::Receiver, - mut prepare_queue: OpQueue, mut confirm_queue: OpQueue, metrics: SerialSubmitterMetrics, batch_size: Option, @@ -241,14 +241,12 @@ async fn submit_task( Some(batch_size) => { batch.add(op); if batch.operations.len() == batch_size as usize { - batch - .submit(&mut prepare_queue, &mut confirm_queue, &metrics) - .await; + batch.submit(&mut confirm_queue, &metrics).await; batch = OperationBatch::new(); } } None => { - submit_and_confirm_op(op, &mut prepare_queue, &mut confirm_queue, &metrics).await; + submit_and_confirm_op(op, &mut confirm_queue, &metrics).await; } } } @@ -256,7 +254,6 @@ async fn submit_task( async fn submit_and_confirm_op( mut op: QueueOperation, - prepare_queue: &mut OpQueue, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics, ) { @@ -277,40 +274,49 @@ impl OperationBatch { self.operations.push(op); } - async fn submit( - self, - prepare_queue: &mut OpQueue, - confirm_queue: &mut OpQueue, - metrics: &SerialSubmitterMetrics, - ) { - // without checking the concrete type, could have - // a TryInto<(&HyperlaneMessage, &SubmissionData)> supertrait on `PendingOperation`, which will only work for PendingMessage. - // later this may be convertible into a TryIntoBytes so it can be used universally - // - // Then we can call `mailbox.process_batch` with these (returns an error on non-ethereum chains, so we fall back to individual submits). - // We will then get a `tx_outcome` with the total gas expenditure - // We'll need to proportionally set `used_gas` based on the tx_outcome, so it can be updated in the confirm step - // which means we need to add a `set_transaction_outcome` fn to `PendingOperation`, and also `set_next_attempt_after(CONFIRM_DELAY);` - - // Then we increment `metrics.ops_submitted` by the number of operations in the batch and push them to the confirm queue - - // if self.domain == KnownHyperlaneDomain::Ethereum.into() { - // self.submit_ethereum().await - // } else { - // self.submit_serially().await - // } - self.submit_serially(prepare_queue, confirm_queue, metrics) - .await; + async fn submit(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) { + match self.try_submit_as_batch(metrics).await { + Ok(outcome) => { + // TODO: use the `tx_outcome` with the total gas expenditure + // We'll need to proportionally set `used_gas` based on the tx_outcome, so it can be updated in the confirm step + // which means we need to add a `set_transaction_outcome` fn to `PendingOperation`, and also `set_next_attempt_after(CONFIRM_DELAY);` + info!(outcome=?outcome, batch=?self.operations, "Submitted transaction batch"); + return; + } + Err(e) => { + warn!(error=?e, batch=?self.operations, "Error when submitting batch. Falling back to serial submission."); + } + } + self.submit_serially(confirm_queue, metrics).await; } - async fn submit_serially( - self, - prepare_queue: &mut OpQueue, - confirm_queue: &mut OpQueue, + async fn try_submit_as_batch( + &self, metrics: &SerialSubmitterMetrics, - ) { + ) -> ChainResult { + let batch = self + .operations + .iter() + .map(|op| op.try_batch()) + .collect::>>>()?; + // We already assume that the relayer submits to a single mailbox per destination. + // So it's fine to use the first item in the batch to get the mailbox. + + let Some(first_item) = batch.first() else { + return Err(ChainCommunicationError::BatchIsEmpty); + }; + let mailbox = first_item.mailbox.clone(); + + // We use the estimated gas limit from the prior call to + // `process_estimate_costs` to avoid a second gas estimation. + let outcome = mailbox.process_batch(&batch).await?; + metrics.ops_submitted.inc_by(self.operations.len() as u64); + Ok(outcome) + } + + async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) { for op in self.operations.into_iter() { - submit_and_confirm_op(op, prepare_queue, confirm_queue, metrics).await; + submit_and_confirm_op(op, confirm_queue, metrics).await; } } } diff --git a/rust/chains/hyperlane-cosmos/src/mailbox.rs b/rust/chains/hyperlane-cosmos/src/mailbox.rs index 034ff4f9f8..fbd47f2301 100644 --- a/rust/chains/hyperlane-cosmos/src/mailbox.rs +++ b/rust/chains/hyperlane-cosmos/src/mailbox.rs @@ -24,9 +24,9 @@ use tendermint::abci::EventAttribute; use crate::utils::{CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64}; use hyperlane_core::{ - utils::bytes_to_hex, ChainResult, HyperlaneChain, HyperlaneContract, HyperlaneDomain, - HyperlaneMessage, HyperlaneProvider, Indexer, LogMeta, Mailbox, TxCostEstimate, TxOutcome, - H256, U256, + utils::bytes_to_hex, BatchItem, ChainResult, HyperlaneChain, HyperlaneContract, + HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexer, LogMeta, Mailbox, + TxCostEstimate, TxOutcome, H256, U256, }; use hyperlane_core::{ ChainCommunicationError, ContractLocator, Decode, RawHyperlaneMessage, SequenceAwareIndexer, @@ -207,7 +207,7 @@ impl Mailbox for CosmosMailbox { #[instrument(err, ret, skip(self))] async fn process_batch( &self, - _messages: Vec<(&HyperlaneMessage, &[u8], Option)>, + messages: &[BatchItem], ) -> ChainResult { todo!() } diff --git a/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs b/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs index fc6f8038a1..9ccc3df23a 100644 --- a/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs +++ b/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs @@ -13,6 +13,7 @@ use ethers::prelude::Middleware; use ethers_contract::builders::ContractCall; use ethers_contract::Multicall; use futures_util::future::join_all; +use hyperlane_core::BatchItem; use tracing::instrument; use hyperlane_core::{ @@ -366,7 +367,7 @@ where #[instrument(skip(self, messages))] async fn process_batch( &self, - messages: Vec<(&HyperlaneMessage, &[u8], Option)>, + messages: &[BatchItem], ) -> ChainResult { let multicall = build_multicall(self.provider.clone(), &self.conn).await; let Some(mut multicall) = multicall else { @@ -377,8 +378,15 @@ where }; let contract_call_futures = messages .into_iter() - .map(|(message, metadata, tx_gas_limit)| { - self.process_contract_call(message, metadata, tx_gas_limit) + .map(|batch| async { + // move ownership of the batch inside the closure + let batch = batch.clone(); + self.process_contract_call( + &batch.data, + &batch.submission_data.metadata, + Some(batch.submission_data.gas_limit), + ) + .await }) .collect::>(); let contract_calls = join_all(contract_call_futures) diff --git a/rust/chains/hyperlane-fuel/src/mailbox.rs b/rust/chains/hyperlane-fuel/src/mailbox.rs index f652314a8a..aa90b48684 100644 --- a/rust/chains/hyperlane-fuel/src/mailbox.rs +++ b/rust/chains/hyperlane-fuel/src/mailbox.rs @@ -5,6 +5,7 @@ use std::ops::RangeInclusive; use async_trait::async_trait; use fuels::prelude::{Bech32ContractId, WalletUnlocked}; +use hyperlane_core::BatchItem; use tracing::instrument; use hyperlane_core::{ @@ -108,7 +109,7 @@ impl Mailbox for FuelMailbox { #[instrument(err, ret, skip(self))] async fn process_batch( &self, - _messages: Vec<(&HyperlaneMessage, &[u8], Option)>, + messages: &[BatchItem], ) -> ChainResult { todo!() } diff --git a/rust/chains/hyperlane-sealevel/src/mailbox.rs b/rust/chains/hyperlane-sealevel/src/mailbox.rs index 4b9248582f..1440f1ca38 100644 --- a/rust/chains/hyperlane-sealevel/src/mailbox.rs +++ b/rust/chains/hyperlane-sealevel/src/mailbox.rs @@ -8,10 +8,11 @@ use jsonrpc_core::futures_util::TryFutureExt; use tracing::{debug, info, instrument, warn}; use hyperlane_core::{ - accumulator::incremental::IncrementalMerkle, ChainCommunicationError, ChainResult, Checkpoint, - ContractLocator, Decode as _, Encode as _, FixedPointNumber, HyperlaneAbi, HyperlaneChain, - HyperlaneContract, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexer, LogMeta, - Mailbox, MerkleTreeHook, SequenceAwareIndexer, TxCostEstimate, TxOutcome, H256, H512, U256, + accumulator::incremental::IncrementalMerkle, BatchItem, ChainCommunicationError, ChainResult, + Checkpoint, ContractLocator, Decode as _, Encode as _, FixedPointNumber, HyperlaneAbi, + HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, + Indexer, LogMeta, Mailbox, MerkleTreeHook, SequenceAwareIndexer, TxCostEstimate, TxOutcome, + H256, H512, U256, }; use hyperlane_sealevel_interchain_security_module_interface::{ InterchainSecurityModuleInstruction, VerifyInstruction, @@ -478,7 +479,7 @@ impl Mailbox for SealevelMailbox { #[instrument(err, ret, skip(self))] async fn process_batch( &self, - _messages: Vec<(&HyperlaneMessage, &[u8], Option)>, + messages: &[BatchItem], ) -> ChainResult { todo!() } diff --git a/rust/hyperlane-core/src/error.rs b/rust/hyperlane-core/src/error.rs index 3eb1ea5b58..ff4693163f 100644 --- a/rust/hyperlane-core/src/error.rs +++ b/rust/hyperlane-core/src/error.rs @@ -82,6 +82,12 @@ pub enum ChainCommunicationError { /// No signer is available and was required for the operation #[error("Signer unavailable")] SignerUnavailable, + /// Batching transaction failed + #[error("Batching transaction failed")] + BatchingFailed, + /// Cannot submit empty batch + #[error("Cannot submit empty batch")] + BatchIsEmpty, /// Failed to parse strings or integers #[error("Data parsing error {0:?}")] StrOrIntParseError(#[from] StrOrIntParseError), diff --git a/rust/hyperlane-core/src/traits/mailbox.rs b/rust/hyperlane-core/src/traits/mailbox.rs index a453ffd070..db359d2111 100644 --- a/rust/hyperlane-core/src/traits/mailbox.rs +++ b/rust/hyperlane-core/src/traits/mailbox.rs @@ -4,8 +4,8 @@ use std::num::NonZeroU64; use async_trait::async_trait; use crate::{ - traits::TxOutcome, utils::domain_hash, ChainResult, HyperlaneContract, HyperlaneMessage, - TxCostEstimate, H256, U256, + traits::TxOutcome, utils::domain_hash, BatchItem, ChainResult, HyperlaneContract, + HyperlaneMessage, TxCostEstimate, H256, U256, }; /// Interface for the Mailbox chain contract. Allows abstraction over different @@ -43,7 +43,7 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug { /// Process a message with a proof against the provided signed checkpoint async fn process_batch( &self, - messages: Vec<(&HyperlaneMessage, &[u8], Option)>, + messages: &[BatchItem], ) -> ChainResult; /// Estimate transaction costs to process a message. diff --git a/rust/hyperlane-core/src/types/mod.rs b/rust/hyperlane-core/src/types/mod.rs index 2f348c7224..b54a47cd21 100644 --- a/rust/hyperlane-core/src/types/mod.rs +++ b/rust/hyperlane-core/src/types/mod.rs @@ -14,6 +14,7 @@ pub use checkpoint::*; pub use log_metadata::*; pub use merkle_tree::*; pub use message::*; +pub use transaction::*; use crate::{Decode, Encode, HyperlaneProtocolError, Sequenced}; @@ -26,6 +27,7 @@ mod log_metadata; mod merkle_tree; mod message; mod serialize; +mod transaction; /// Unified 32-byte identifier with convenience tooling for handling /// 20-byte ids (e.g ethereum addresses) diff --git a/rust/hyperlane-core/src/types/transaction.rs b/rust/hyperlane-core/src/types/transaction.rs new file mode 100644 index 0000000000..4f87d0c6de --- /dev/null +++ b/rust/hyperlane-core/src/types/transaction.rs @@ -0,0 +1,26 @@ +use std::sync::Arc; + +use crate::{ChainResult, Mailbox, U256}; +use derive_new::new; + +/// State for the next submission attempt generated by a prepare call. +#[derive(Clone, Debug)] +pub struct TxSubmissionData { + pub metadata: Vec, + pub gas_limit: U256, +} + +#[derive(new, Clone, Debug)] +pub struct BatchItem { + pub data: T, + pub submission_data: TxSubmissionData, + pub mailbox: Arc, +} + +// Need to define a trait instead of using TryInto because the latter is not +// object safe +pub trait TryBatchAs { + fn try_batch(&self) -> ChainResult> { + Err(crate::ChainCommunicationError::BatchingFailed) + } +} diff --git a/rust/hyperlane-test/src/mocks/mailbox.rs b/rust/hyperlane-test/src/mocks/mailbox.rs index f543857d36..52b0b249f5 100644 --- a/rust/hyperlane-test/src/mocks/mailbox.rs +++ b/rust/hyperlane-test/src/mocks/mailbox.rs @@ -46,9 +46,9 @@ mock! { tx_gas_limit: Option, ) -> ChainResult {} - pub fn process_batch<'a, 'b>( + pub fn process_batch( &mut self, - messages: Vec<(&'a HyperlaneMessage, &'b [u8], Option)>, + messages: &[BatchItem], ) -> ChainResult {} pub fn process_estimate_costs( @@ -100,7 +100,7 @@ impl Mailbox for MockMailboxContract { async fn process_batch( &self, - messages: Vec<(&HyperlaneMessage, &[u8], Option)>, + messages: &[BatchItem], ) -> ChainResult { self.process_batch(messages).await }