Skip to content

Commit

Permalink
fix: e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Apr 23, 2024
1 parent de61dd7 commit 648bc6c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 63 deletions.
22 changes: 10 additions & 12 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use async_trait::async_trait;
use derive_new::new;
use ethers::utils::hex;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{
Expand Down Expand Up @@ -283,11 +284,6 @@ impl PendingOperation for PendingMessage {
self.on_reprepare()
});

let submission_outcome = self
.submission_outcome
.take()
.expect("Pending message must be submitted before it can be confirmed");

if !self.is_ready() {
return PendingOperationResult::NotReady;
}
Expand All @@ -312,16 +308,18 @@ impl PendingOperation for PendingMessage {
self.reset_attempts();
PendingOperationResult::Success
} else {
if let Err(e) = self
.ctx
.origin_gas_payment_enforcer
.record_tx_outcome(&self.message, submission_outcome.clone())
{
error!(error=?e, "Error when recording tx outcome");
if let Some(outcome) = &self.submission_outcome {
if let Err(e) = self
.ctx
.origin_gas_payment_enforcer
.record_tx_outcome(&self.message, outcome.clone())
{
error!(error=?e, "Error when recording tx outcome");
}
}
warn!(
tx_outcome=?self.submission_outcome,
message_id=self.message.id().to_string(),
message_id=hex::encode(self.message.id()),
"Transaction attempting to process message either reverted or was reorged"
);
self.on_reprepare()
Expand Down
99 changes: 48 additions & 51 deletions rust/agents/relayer/src/msg/serial_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cmp::Reverse;
use std::time::Duration;

use derive_new::new;
use futures::future::join_all;
use ethers::utils::hex;
use futures_util::future::try_join_all;
use prometheus::{IntCounter, IntGaugeVec};
use tokio::spawn;
Expand All @@ -12,7 +12,7 @@ use tokio::time::sleep;
use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument};

use hyperlane_base::CoreMetrics;
use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain, MpmcReceiver};
use hyperlane_core::{HyperlaneDomain, MpmcReceiver};

use crate::server::MessageRetryRequest;

Expand Down Expand Up @@ -176,56 +176,39 @@ async fn prepare_task(
) {
let batch_size = batch_size.unwrap_or(1);
loop {
// Pop messages here according to the configured batch.
let mut batch = vec![];
for _ in 0..batch_size {
let next = prepare_queue.pop().await;
if let Some(Reverse(op)) = next {
batch.push(op);
} else {
break;
}
}
// Pick the next message to try preparing.
let next = prepare_queue.pop().await;

if batch.is_empty() {
let Some(Reverse(mut op)) = next else {
// queue is empty so give some time before checking again to prevent burning CPU
sleep(Duration::from_millis(200)).await;
continue;
}

let mut task_prep_futures = vec![];
let op_refs = batch.iter_mut().map(|op| op.as_mut()).collect::<Vec<_>>();
for op in op_refs {
trace!(?op, "Preparing operation");
debug_assert_eq!(*op.destination_domain(), domain);
task_prep_futures.push(op.prepare());
}
};

let res = join_all(task_prep_futures).await;
trace!(?op, "Preparing operation");
debug_assert_eq!(*op.destination_domain(), domain);

for (op, prepare_result) in batch.into_iter().zip(res.into_iter()) {
match prepare_result {
PendingOperationResult::Success => {
debug!(?op, "Operation prepared");
metrics.ops_prepared.inc();
// this send will pause this task if the submitter is not ready to accept yet
if let Err(err) = tx_submit.send(op).await {
tracing::error!(error=?err, "Failed to send prepared operation to submitter");
}
}
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
prepare_queue.push(op).await;
sleep(Duration::from_millis(200)).await;
}
PendingOperationResult::Reprepare => {
metrics.ops_failed.inc();
prepare_queue.push(op).await;
}
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
match op.prepare().await {
PendingOperationResult::Success => {
debug!(?op, "Operation prepared");
metrics.ops_prepared.inc();
// this send will pause this task if the submitter is not ready to accept yet
if let Err(err) = tx_submit.send(op).await {
tracing::error!(error=?err, "Failed to send prepared operation to submitter");
}
}
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
prepare_queue.push(op).await;
sleep(Duration::from_millis(200)).await;
}
PendingOperationResult::Reprepare => {
metrics.ops_failed.inc();
prepare_queue.push(op).await;
}
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
}
}
}
}
Expand All @@ -234,7 +217,7 @@ async fn prepare_task(
async fn submit_task(
domain: HyperlaneDomain,
mut rx_submit: mpsc::Receiver<QueueOperation>,
prepare_queue: OpQueue,
mut prepare_queue: OpQueue,
mut confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
batch_size: Option<u32>,
Expand All @@ -258,19 +241,22 @@ async fn submit_task(
Some(batch_size) => {
batch.add(op);
if batch.operations.len() == batch_size as usize {
batch.submit(&mut confirm_queue, &metrics).await;
batch
.submit(&mut prepare_queue, &mut confirm_queue, &metrics)
.await;
batch = OperationBatch::new();
}
}
None => {
submit_and_confirm_op(op, &mut confirm_queue, &metrics).await;
submit_and_confirm_op(op, &mut prepare_queue, &mut confirm_queue, &metrics).await;
}
}
}
}

async fn submit_and_confirm_op(
mut op: QueueOperation,
prepare_queue: &mut OpQueue,
confirm_queue: &mut OpQueue,
metrics: &SerialSubmitterMetrics,
) {
Expand All @@ -291,7 +277,12 @@ impl OperationBatch {
self.operations.push(op);
}

async fn submit(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
async fn submit(
self,
prepare_queue: &mut OpQueue,
confirm_queue: &mut OpQueue,
metrics: &SerialSubmitterMetrics,
) {
// without checking the concrete type, could have
// a TryInto<(&HyperlaneMessage, &SubmissionData)> supertrait on `PendingOperation`, which will only work for PendingMessage.
// later this may be convertible into a TryIntoBytes so it can be used universally
Expand All @@ -308,12 +299,18 @@ impl OperationBatch {
// } else {
// self.submit_serially().await
// }
self.submit_serially(confirm_queue, metrics).await;
self.submit_serially(prepare_queue, confirm_queue, metrics)
.await;
}

async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
async fn submit_serially(
self,
prepare_queue: &mut OpQueue,
confirm_queue: &mut OpQueue,
metrics: &SerialSubmitterMetrics,
) {
for op in self.operations.into_iter() {
submit_and_confirm_op(op, confirm_queue, metrics).await;
submit_and_confirm_op(op, prepare_queue, confirm_queue, metrics).await;
}
}
}
Expand Down

0 comments on commit 648bc6c

Please sign in to comment.