Skip to content

Commit

Permalink
Revert "Fix #47: Handle WsManager Drop and close the ping and reader …
Browse files Browse the repository at this point in the history
…tasks (#51)"

This reverts commit 8c65509.
  • Loading branch information
lmlmt authored Sep 23, 2024
1 parent 741d541 commit bec4a66
Showing 1 changed file with 14 additions and 55 deletions.
69 changes: 14 additions & 55 deletions src/ws/ws_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,13 @@ use crate::{
Error, Notification, UserFills, UserFundings, UserNonFundingLedgerUpdates,
};
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use log::{error, warn};
use log::error;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{
net::TcpStream,
runtime::Runtime,
spawn,
sync::{mpsc::UnboundedSender, Mutex},
task::JoinHandle,
time,
};
use tokio_tungstenite::{
Expand All @@ -36,9 +27,6 @@ struct SubscriptionData {
subscription_id: u32,
}
pub(crate) struct WsManager {
stop_flag: Arc<AtomicBool>,
reader_handle: Option<JoinHandle<()>>,
ping_handle: Option<JoinHandle<()>>,
writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, protocol::Message>>>,
subscriptions: Arc<Mutex<HashMap<String, Vec<SubscriptionData>>>>,
subscription_id: u32,
Expand Down Expand Up @@ -96,8 +84,6 @@ impl WsManager {
const SEND_PING_INTERVAL: u64 = 50;

pub(crate) async fn new(url: String) -> Result<WsManager> {
let stop_flag = Arc::new(AtomicBool::new(false));

let (ws_stream, _) = connect_async(url.clone())
.await
.map_err(|e| Error::Websocket(e.to_string()))?;
Expand All @@ -109,28 +95,21 @@ impl WsManager {
let subscriptions = Arc::new(Mutex::new(subscriptions_map));
let subscriptions_copy = Arc::clone(&subscriptions);

let reader_handle = {
let stop_flag = Arc::clone(&stop_flag);
let reader_fut = async move {
// TODO: reconnect
while !stop_flag.load(Ordering::Relaxed) {
let data = reader.next().await;
if let Err(err) =
WsManager::parse_and_send_data(data, &subscriptions_copy).await
{
error!("Error processing data received by WS manager reader: {err}");
}
let reader_fut = async move {
// TODO: reconnect
loop {
let data = reader.next().await;
if let Err(err) = WsManager::parse_and_send_data(data, &subscriptions_copy).await {
error!("Error processing data received by WS manager reader: {err}");
}
warn!("ws message reader task stopped");
};
spawn(reader_fut)
}
};
spawn(reader_fut);

let ping_handle = {
let stop_flag = Arc::clone(&stop_flag);
{
let writer = Arc::clone(&writer);
let ping_fut = async move {
while !stop_flag.load(Ordering::Relaxed) {
loop {
match serde_json::to_string(&Ping { method: "ping" }) {
Ok(payload) => {
let mut writer = writer.lock().await;
Expand All @@ -142,15 +121,11 @@ impl WsManager {
}
time::sleep(Duration::from_secs(Self::SEND_PING_INTERVAL)).await;
}
warn!("ws ping task stopped");
};
spawn(ping_fut)
};
spawn(ping_fut);
}

Ok(WsManager {
stop_flag,
reader_handle: Some(reader_handle),
ping_handle: Some(ping_handle),
writer,
subscriptions,
subscription_id: 0,
Expand Down Expand Up @@ -380,19 +355,3 @@ impl WsManager {
Ok(())
}
}

impl Drop for WsManager {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::Relaxed);

let rt = Runtime::new().unwrap();

if let Some(task) = self.reader_handle.take() {
rt.block_on(task).unwrap();
}

if let Some(task) = self.ping_handle.take() {
rt.block_on(task).unwrap();
}
}
}

0 comments on commit bec4a66

Please sign in to comment.