From 2539223857c31490821ee3afc7689b8b83901174 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sat, 14 Oct 2023 22:55:08 +0200 Subject: [PATCH] chore: auto-resubsribe when no msg arrived --- cli/src/app.rs | 7 ++++++- okex-price/src/error.rs | 4 ++++ okex-price/src/lib.rs | 34 ++++++++++++++++++++++++++++------ okex-price/tests/price_feed.rs | 2 +- 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/cli/src/app.rs b/cli/src/app.rs index 6ce7f04bd..a4f897a3f 100644 --- a/cli/src/app.rs +++ b/cli/src/app.rs @@ -131,6 +131,11 @@ async fn run_cmd( let mut checkers = HashMap::new(); let (price_send, price_recv) = memory::channel(price_stream_throttle_period()); + let unhealthy_msg_interval = price_server + .health + .unhealthy_msg_interval_price + .to_std() + .expect("Could not convert Duration to_std"); if exchanges .okex .as_ref() @@ -143,7 +148,7 @@ async fn run_cmd( let price_send = price_send.clone(); handles.push(tokio::spawn(async move { let _ = okex_send.try_send( - okex_price::run(price_send) + okex_price::run(price_send, unhealthy_msg_interval) .await .context("Okex Price Feed error"), ); diff --git a/okex-price/src/error.rs b/okex-price/src/error.rs index d0c275d71..ff6493ad3 100644 --- a/okex-price/src/error.rs +++ b/okex-price/src/error.rs @@ -35,4 +35,8 @@ pub enum PriceFeedError { InitialFullLoad, #[error("PriceFeedError: CheckSumValidation - Can't validate accuracy of depth data")] CheckSumValidation, + #[error("PriceFeedError: StreamEnded - Stream ended unexpectedly")] + StreamEnded, + #[error("PriceFeedError: StreamStalled - Stream ended unexpectedly")] + StreamStalled, } diff --git a/okex-price/src/lib.rs b/okex-price/src/lib.rs index 062beafbd..0d5a98623 100644 --- a/okex-price/src/lib.rs +++ b/okex-price/src/lib.rs @@ -9,7 +9,10 @@ pub mod price_feed; use futures::StreamExt; use shared::{payload::*, pubsub::*}; -use tokio::join; +use tokio::{ + join, + time::{timeout, Duration}, +}; pub use error::*; pub use okex_shared::*; @@ -18,6 +21,7 @@ pub use price_feed::*; pub async fn run( price_stream_publisher: memory::Publisher, + unhealthy_msg_interval: std::time::Duration, ) -> Result<(), PriceFeedError> { let _ = tokio::spawn(async move { loop { @@ -31,7 +35,11 @@ pub async fn run( let order_book_publisher = price_stream_publisher.clone(); let order_book_task = tokio::spawn(async move { loop { - let _res = order_book_subscription(order_book_publisher.clone()).await; + let _res = order_book_subscription( + order_book_publisher.clone(), + unhealthy_msg_interval, + ) + .await; tokio::time::sleep(std::time::Duration::from_secs(5_u64)).await; } }); @@ -46,6 +54,7 @@ pub async fn run( async fn order_book_subscription( publisher: memory::Publisher, + unhealthy_msg_interval: std::time::Duration, ) -> Result<(), PriceFeedError> { let mut stream = subscribe_btc_usd_swap_order_book().await?; let full_load = stream.next().await.ok_or(PriceFeedError::InitialFullLoad)?; @@ -55,10 +64,23 @@ async fn order_book_subscription( let (send, recv) = tokio::sync::oneshot::channel(); tokio::spawn(async move { - while let Some(book) = stream.next().await { - if let Err(e) = okex_order_book_received(&publisher, book, cache.clone()).await { - let _ = send.send(e); - break; + loop { + match timeout(unhealthy_msg_interval, stream.next()).await { + Ok(Some(book)) => { + if let Err(e) = okex_order_book_received(&publisher, book, cache.clone()).await + { + let _ = send.send(e); + break; + } + } + Ok(None) => { + let _ = send.send(PriceFeedError::StreamEnded); + break; + } + Err(_) => { + let _ = send.send(PriceFeedError::StreamEnded); + break; + } } } }); diff --git a/okex-price/tests/price_feed.rs b/okex-price/tests/price_feed.rs index 1352f5f1b..777a30a9a 100644 --- a/okex-price/tests/price_feed.rs +++ b/okex-price/tests/price_feed.rs @@ -56,7 +56,7 @@ async fn publishes_to_price_stream() -> anyhow::Result<()> { memory::channel(chrono::Duration::from_std(std::time::Duration::from_secs(2)).unwrap()); let _ = tokio::spawn(async move { - let _res = okex_price::run(tick_send).await; + let _res = okex_price::run(tick_send, std::time::Duration::from_secs(20)).await; }); let recv = tick_recv.next().await.expect("expected price tick");