diff --git a/hyperspace/core/src/lib.rs b/hyperspace/core/src/lib.rs index 0fdf9c9d5..87ab948e6 100644 --- a/hyperspace/core/src/lib.rs +++ b/hyperspace/core/src/lib.rs @@ -16,6 +16,7 @@ use futures::{future::ready, StreamExt}; use primitives::Chain; +use std::convert::Infallible; pub mod chain; pub mod command; @@ -30,6 +31,7 @@ use events::{has_packet_events, parse_events}; use futures::TryFutureExt; use ibc::events::IbcEvent; use metrics::handler::MetricsHandler; +use tokio::task::JoinHandle; #[derive(Copy, Debug, Clone)] pub enum Mode { @@ -53,19 +55,40 @@ where let (mut chain_a_finality, mut chain_b_finality) = (chain_a.finality_notifications().await?, chain_b.finality_notifications().await?); - // loop forever - loop { - tokio::select! { - // new finality event from chain A - result = chain_a_finality.next() => { - process_finality_event!(chain_a, chain_b, chain_a_metrics, mode, result, chain_a_finality, chain_b_finality) - } - // new finality event from chain B - result = chain_b_finality.next() => { - process_finality_event!(chain_b, chain_a, chain_b_metrics, mode, result, chain_b_finality, chain_a_finality) - } + let mut chain_a_cl = chain_a.clone(); + let mut chain_b_cl = chain_b.clone(); + + let jh1: JoinHandle> = tokio::spawn(async move { + loop { + let result = chain_a_finality.next().await; + process_finality_event!( + chain_a_cl, + chain_b_cl, + chain_a_metrics, + mode, + result, + chain_a_finality + ) } - } + }); + + let jh2: JoinHandle> = tokio::spawn(async move { + loop { + let result = chain_b_finality.next().await; + process_finality_event!( + chain_b, + chain_a, + chain_b_metrics, + mode, + result, + chain_b_finality + ) + } + }); + + jh1.await??; + jh2.await??; + Ok(()) } pub async fn fish(chain_a: A, chain_b: B) -> Result<(), anyhow::Error> diff --git a/hyperspace/core/src/macros.rs b/hyperspace/core/src/macros.rs index 97098de35..8a082c258 100644 --- a/hyperspace/core/src/macros.rs +++ b/hyperspace/core/src/macros.rs @@ -14,12 +14,12 @@ #[macro_export] macro_rules! process_finality_event { - ($source:ident, $sink:ident, $metrics:expr, $mode:ident, $result:ident, $stream_source:ident, $stream_sink:ident) => { + ($source:ident, $sink:ident, $metrics:expr, $mode:ident, $result:ident, $stream:ident) => { match $result { // stream closed None => { log::warn!("Stream closed for {}", $source.name()); - $stream_source = loop { + $stream = loop { match $source.finality_notifications().await { Ok(stream) => break stream, Err(e) => { @@ -28,15 +28,6 @@ macro_rules! process_finality_event { }, }; }; - $stream_sink = loop { - match $sink.finality_notifications().await { - Ok(stream) => break stream, - Err(e) => { - log::error!("Failed to get finality notifications for {} {:?}. Trying again in 30 seconds...", $sink.name(), e); - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - }, - }; - }; }, Some(finality_event) => { log::info!("======================================================="); diff --git a/hyperspace/core/src/queue.rs b/hyperspace/core/src/queue.rs index c0ff36afd..073a5375e 100644 --- a/hyperspace/core/src/queue.rs +++ b/hyperspace/core/src/queue.rs @@ -15,6 +15,7 @@ use ibc_proto::google::protobuf::Any; use metrics::handler::MetricsHandler; use primitives::Chain; +use tokio::task::JoinHandle; /// This sends messages to the sink chain in a gas-aware manner. pub async fn flush_message_batch( @@ -32,7 +33,11 @@ pub async fn flush_message_batch( log::debug!(target: "hyperspace", "Outgoing messages weight: {} block max weight: {}", batch_weight, block_max_weight); let ratio = (batch_weight / block_max_weight) as usize; if ratio == 0 { - sink.submit(msgs).await?; + let sink = sink.clone(); + let _join_handler: JoinHandle> = tokio::spawn(async move { + sink.submit(msgs).await?; + Ok(()) + }); return Ok(()) } @@ -51,10 +56,14 @@ pub async fn flush_message_batch( ); let chunk_size = (msgs.len() / chunk).max(1); // TODO: return number of failed messages and record it to metrics - for batch in msgs.chunks(chunk_size) { - // send out batches. - sink.submit(batch.to_vec()).await?; - } + let sink = sink.clone(); + let _join_handler: JoinHandle> = tokio::spawn(async move { + for batch in msgs.chunks(chunk_size) { + // send out batches. + sink.submit(batch.to_vec()).await?; + } + Ok(()) + }); Ok(()) } diff --git a/hyperspace/primitives/src/lib.rs b/hyperspace/primitives/src/lib.rs index 42188f3ab..ad7fa470d 100644 --- a/hyperspace/primitives/src/lib.rs +++ b/hyperspace/primitives/src/lib.rs @@ -98,7 +98,7 @@ pub fn apply_prefix(mut commitment_prefix: Vec, path: impl Into>) -> #[async_trait::async_trait] pub trait IbcProvider { /// Finality event type, passed on to [`Chain::query_latest_ibc_events`] - type FinalityEvent: Debug; + type FinalityEvent: Debug + Send; /// A representation of the transaction id for the chain type TransactionId: Debug; /// Asset Id