Skip to content

Commit

Permalink
chore: auto-resubsribe when no msg arrived
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Oct 14, 2023
1 parent 39f9d5d commit 2539223
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
7 changes: 6 additions & 1 deletion cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ async fn run_cmd(
let mut checkers = HashMap::new();
let (price_send, price_recv) = memory::channel(price_stream_throttle_period());

let unhealthy_msg_interval = price_server
.health
.unhealthy_msg_interval_price
.to_std()
.expect("Could not convert Duration to_std");
if exchanges
.okex
.as_ref()
Expand All @@ -143,7 +148,7 @@ async fn run_cmd(
let price_send = price_send.clone();
handles.push(tokio::spawn(async move {
let _ = okex_send.try_send(
okex_price::run(price_send)
okex_price::run(price_send, unhealthy_msg_interval)
.await
.context("Okex Price Feed error"),
);
Expand Down
4 changes: 4 additions & 0 deletions okex-price/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ pub enum PriceFeedError {
InitialFullLoad,
#[error("PriceFeedError: CheckSumValidation - Can't validate accuracy of depth data")]
CheckSumValidation,
#[error("PriceFeedError: StreamEnded - Stream ended unexpectedly")]
StreamEnded,
#[error("PriceFeedError: StreamStalled - Stream ended unexpectedly")]
StreamStalled,
}
34 changes: 28 additions & 6 deletions okex-price/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ pub mod price_feed;

use futures::StreamExt;
use shared::{payload::*, pubsub::*};
use tokio::join;
use tokio::{
join,
time::{timeout, Duration},
};

pub use error::*;
pub use okex_shared::*;
Expand All @@ -18,6 +21,7 @@ pub use price_feed::*;

pub async fn run(
price_stream_publisher: memory::Publisher<PriceStreamPayload>,
unhealthy_msg_interval: std::time::Duration,
) -> Result<(), PriceFeedError> {
let _ = tokio::spawn(async move {
loop {
Expand All @@ -31,7 +35,11 @@ pub async fn run(
let order_book_publisher = price_stream_publisher.clone();
let order_book_task = tokio::spawn(async move {
loop {
let _res = order_book_subscription(order_book_publisher.clone()).await;
let _res = order_book_subscription(
order_book_publisher.clone(),
unhealthy_msg_interval,
)
.await;
tokio::time::sleep(std::time::Duration::from_secs(5_u64)).await;
}
});
Expand All @@ -46,6 +54,7 @@ pub async fn run(

async fn order_book_subscription(
publisher: memory::Publisher<PriceStreamPayload>,
unhealthy_msg_interval: std::time::Duration,
) -> Result<(), PriceFeedError> {
let mut stream = subscribe_btc_usd_swap_order_book().await?;
let full_load = stream.next().await.ok_or(PriceFeedError::InitialFullLoad)?;
Expand All @@ -55,10 +64,23 @@ async fn order_book_subscription(
let (send, recv) = tokio::sync::oneshot::channel();

tokio::spawn(async move {
while let Some(book) = stream.next().await {
if let Err(e) = okex_order_book_received(&publisher, book, cache.clone()).await {
let _ = send.send(e);
break;
loop {
match timeout(unhealthy_msg_interval, stream.next()).await {
Ok(Some(book)) => {
if let Err(e) = okex_order_book_received(&publisher, book, cache.clone()).await
{
let _ = send.send(e);
break;
}
}
Ok(None) => {
let _ = send.send(PriceFeedError::StreamEnded);
break;
}
Err(_) => {
let _ = send.send(PriceFeedError::StreamEnded);
break;
}
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion okex-price/tests/price_feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn publishes_to_price_stream() -> anyhow::Result<()> {
memory::channel(chrono::Duration::from_std(std::time::Duration::from_secs(2)).unwrap());

let _ = tokio::spawn(async move {
let _res = okex_price::run(tick_send).await;
let _res = okex_price::run(tick_send, std::time::Duration::from_secs(20)).await;
});

let recv = tick_recv.next().await.expect("expected price tick");
Expand Down

0 comments on commit 2539223

Please sign in to comment.