From 46c8bea7b801ec6c9e30a3c34c4f472f18f8ab11 Mon Sep 17 00:00:00 2001 From: David Braden Date: Wed, 14 Aug 2024 09:16:03 -0600 Subject: [PATCH] connection buffer and close fixes (#104) --- Cargo.lock | 13 +- Cargo.toml | 4 +- crates/tx5-connection/Cargo.toml | 1 + crates/tx5-connection/src/conn.rs | 287 +++++++++++++++++++++------- crates/tx5-connection/src/lib.rs | 71 +++++-- crates/tx5-connection/src/webrtc.rs | 39 +++- 6 files changed, 313 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b81a2b1..643ed4ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2166,9 +2166,9 @@ dependencies = [ [[package]] name = "sbd-client" -version = "0.0.5-alpha" +version = "0.0.6-alpha" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bff8b5e22ff3d13f41b3b8318dcfb5303eb9903399ce82d25bcbf84742a0509" +checksum = "66f0b06ca514d8666ff371c63a5913e3f1740312c13768f301d0e5578a03c7e2" dependencies = [ "base64 0.22.1", "ed25519-dalek", @@ -2185,9 +2185,9 @@ dependencies = [ [[package]] name = "sbd-e2e-crypto-client" -version = "0.0.5-alpha" +version = "0.0.6-alpha" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7f9e9ed5e1e817c0b57cf9625b0a823109267d3bfb06d5feec69e626bde59e8" +checksum = "d257a338fa0fca74d013b69a9e49f3683a4766bcadd4fe583779783bec48dede" dependencies = [ "sbd-client", "sodoken", @@ -2197,9 +2197,9 @@ dependencies = [ [[package]] name = "sbd-server" -version = "0.0.5-alpha" +version = "0.0.6-alpha" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39c6ba716e669f658476cf3c0924b8970f23937601e8c5bb8d03f1a7a8b638ea" +checksum = "fb12c0cb502dd2998fb3560e3999f58ac819719fd6513763029608dcf77f4a88" dependencies = [ "anstyle", "base64 0.22.1", @@ -2846,6 +2846,7 @@ name = "tx5-connection" version = "0.1.1-beta" dependencies = [ "bit_field", + "futures", "rand", "sbd-server", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 963263f2..031a74da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,8 +45,8 @@ ring = "0.16.20" rustls = { version = "0.20.0", features = [ "dangerous_configuration" ] } rustls-native-certs = "0.6.2" rustls-pemfile = "1.0.2" -sbd-e2e-crypto-client = "0.0.5-alpha" -sbd-server = "0.0.5-alpha" +sbd-e2e-crypto-client = "0.0.6-alpha" +sbd-server = "0.0.6-alpha" serde = { version = "1.0.160", features = [ "derive", "rc" ] } serde_json = { version = "1.0.96", features = [ "preserve_order" ] } sha2 = "0.10.6" diff --git a/crates/tx5-connection/Cargo.toml b/crates/tx5-connection/Cargo.toml index 9a9288e2..19af3251 100644 --- a/crates/tx5-connection/Cargo.toml +++ b/crates/tx5-connection/Cargo.toml @@ -21,6 +21,7 @@ backend-webrtc-rs = [ ] [dependencies] bit_field = { workspace = true } +futures = { workspace = true } tokio = { workspace = true, features = [ "full" ] } tracing = { workspace = true } tx5-core = { workspace = true } diff --git a/crates/tx5-connection/src/conn.rs b/crates/tx5-connection/src/conn.rs index ddf93760..81ca31c3 100644 --- a/crates/tx5-connection/src/conn.rs +++ b/crates/tx5-connection/src/conn.rs @@ -1,15 +1,21 @@ use super::*; use std::sync::atomic::Ordering; +/// Message count allowed to accumulate on the boundary switching to webrtc. +/// This prevents potentially faster to transfer webrtc messages from being +/// delivered out of order before trailing slower sbd messages are received. +const MAX_WEBRTC_BUF: usize = 512; + pub(crate) enum ConnCmd { SigRecv(tx5_signal::SignalMessage), SendMessage(Vec), WebrtcMessage(Vec), WebrtcReady, + WebrtcClosed, } /// Receive messages from a tx5 connection. -pub struct ConnRecv(tokio::sync::mpsc::Receiver>); +pub struct ConnRecv(CloseRecv>); impl ConnRecv { /// Receive up to 16KiB of message data. @@ -33,14 +39,20 @@ pub struct Conn { hub_cmd_send: tokio::sync::mpsc::Sender, } -impl Drop for Conn { - fn drop(&mut self) { - tracing::debug!( +macro_rules! netaudit { + ($lvl:ident, $($all:tt)*) => { + ::tracing::event!( target: "NETAUDIT", - pub_key = ?self.pub_key, + ::tracing::Level::$lvl, m = "tx5-connection", - a = "drop", + $($all)* ); + }; +} + +impl Drop for Conn { + fn drop(&mut self) { + netaudit!(DEBUG, pub_key = ?self.pub_key, a = "drop"); self.conn_task.abort(); self.keepalive_task.abort(); @@ -67,12 +79,11 @@ impl Conn { config: Arc, hub_cmd_send: tokio::sync::mpsc::Sender, ) -> (Arc, ConnRecv, CloseSend) { - tracing::debug!( - target: "NETAUDIT", + netaudit!( + DEBUG, webrtc_config = String::from_utf8_lossy(&webrtc_config).to_string(), ?pub_key, ?is_polite, - m = "tx5-connection", a = "open", ); @@ -113,7 +124,7 @@ impl Conn { let ready2 = ready.clone(); let client2 = client.clone(); let pub_key2 = pub_key.clone(); - let cmd_send3 = cmd_send.clone(); + let mut cmd_send3 = cmd_send.clone(); let conn_task = tokio::task::spawn(async move { let client = match client2.upgrade() { Some(client) => client, @@ -163,6 +174,11 @@ impl Conn { ConnCmd::WebrtcReady => { webrtc_ready2.close(); } + ConnCmd::WebrtcClosed => { + // only emitted by the webrtc module + // which at this point hasn't yet been initialized + unreachable!() + } } if got_peer_res && sent_our_res { break; @@ -196,22 +212,26 @@ impl Conn { let pub_key3 = pub_key2.clone(); let _webrtc_task = AbortTask(tokio::task::spawn(async move { use webrtc::WebrtcEvt::*; + cmd_send3.set_close_on_drop(true); while let Some(evt) = webrtc_recv.recv().await { match evt { GeneratedOffer(offer) => { - tracing::trace!( - target: "NETAUDIT", + netaudit!( + TRACE, pub_key = ?pub_key3, offer = String::from_utf8_lossy(&offer).to_string(), - m = "tx5-connection", a = "send_offer", ); if let Some(client) = client3.upgrade() { - if client - .send_offer(&pub_key3, offer) - .await - .is_err() + if let Err(err) = + client.send_offer(&pub_key3, offer).await { + netaudit!( + DEBUG, + pub_key = ?pub_key3, + ?err, + a = "webrtc send_offer error", + ); break; } } else { @@ -219,19 +239,22 @@ impl Conn { } } GeneratedAnswer(answer) => { - tracing::trace!( - target: "NETAUDIT", + netaudit!( + TRACE, pub_key = ?pub_key3, offer = String::from_utf8_lossy(&answer).to_string(), - m = "tx5-connection", a = "send_answer", ); if let Some(client) = client3.upgrade() { - if client - .send_answer(&pub_key3, answer) - .await - .is_err() + if let Err(err) = + client.send_answer(&pub_key3, answer).await { + netaudit!( + DEBUG, + pub_key = ?pub_key3, + ?err, + a = "webrtc send_answer error", + ); break; } } else { @@ -239,19 +262,22 @@ impl Conn { } } GeneratedIce(ice) => { - tracing::trace!( - target: "NETAUDIT", + netaudit!( + TRACE, pub_key = ?pub_key3, offer = String::from_utf8_lossy(&ice).to_string(), - m = "tx5-connection", a = "send_ice", ); if let Some(client) = client3.upgrade() { - if client - .send_ice(&pub_key3, ice) - .await - .is_err() + if let Err(err) = + client.send_ice(&pub_key3, ice).await { + netaudit!( + DEBUG, + pub_key = ?pub_key3, + ?err, + a = "webrtc send_ice error", + ); break; } } else { @@ -264,6 +290,11 @@ impl Conn { .await .is_err() { + netaudit!( + DEBUG, + pub_key = ?pub_key3, + a = "webrtc cmd closed", + ); break; } } @@ -273,11 +304,18 @@ impl Conn { .await .is_err() { + netaudit!( + DEBUG, + pub_key = ?pub_key3, + a = "webrtc cmd closed", + ); break; } } } } + + let _ = cmd_send3.send(ConnCmd::WebrtcClosed).await; })); msg_send.set_close_on_drop(true); @@ -285,19 +323,59 @@ impl Conn { let mut recv_over_webrtc = false; let mut send_over_webrtc = false; - while let Ok(Some(cmd)) = - tokio::time::timeout(config.max_idle, cmd_recv.recv()).await - { - match cmd { + loop { + let cmd = + tokio::time::timeout(config.max_idle, cmd_recv.recv()) + .await; + + let cmd = match cmd { + Err(_) => { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: connection idle", + ); + break; + } + Ok(cmd) => cmd, + }; + + let cmd = match cmd { + None => { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: cmd_recv stream complete", + ); + break; + } + Some(cmd) => cmd, + }; + + let mut slow_task = "unknown"; + + if breakable_timeout!(match cmd { ConnCmd::SigRecv(sig) => { use tx5_signal::SignalMessage::*; match sig { // invalid - HandshakeReq(_) | HandshakeRes(_) => break, + HandshakeReq(_) | HandshakeRes(_) => { + slow_task = "sig-handshake"; + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: unexpected handshake msg", + ); + break; + } Message(msg) => { + slow_task = "sig-message"; if recv_over_webrtc { - // invalid signal message - // after webrtc ready received + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: unexpected sbd msg after recv_over_webrtc", + ); break; } else { recv_msg_count2 @@ -307,58 +385,87 @@ impl Conn { Ordering::Relaxed, ); if msg_send.send(msg).await.is_err() { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: msg_send closed", + ); break; } } } Offer(offer) => { - tracing::trace!( - target: "NETAUDIT", + slow_task = "sig-offer"; + netaudit!( + TRACE, pub_key = ?pub_key2, offer = String::from_utf8_lossy(&offer).to_string(), - m = "tx5-connection", a = "recv_offer", ); - if webrtc.in_offer(offer).await.is_err() { + if let Err(err) = webrtc.in_offer(offer).await { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + ?err, + a = "close: webrtc in_offer error", + ); break; } } Answer(answer) => { - tracing::trace!( - target: "NETAUDIT", + slow_task = "sig-answer"; + netaudit!( + TRACE, pub_key = ?pub_key2, offer = String::from_utf8_lossy(&answer).to_string(), - m = "tx5-connection", a = "recv_answer", ); - if webrtc.in_answer(answer).await.is_err() { + if let Err(err) = webrtc.in_answer(answer).await + { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + ?err, + a = "close: webrtc in_answer error", + ); break; } } Ice(ice) => { - tracing::trace!( - target: "NETAUDIT", + slow_task = "sig-ice"; + netaudit!( + TRACE, pub_key = ?pub_key2, offer = String::from_utf8_lossy(&ice).to_string(), - m = "tx5-connection", a = "recv_ice", ); - if webrtc.in_ice(ice).await.is_err() { + if let Err(err) = webrtc.in_ice(ice).await { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + ?err, + a = "close: webrtc in_ice error", + ); break; } } WebrtcReady => { + slow_task = "sig-webrtc-ready"; recv_over_webrtc = true; - tracing::debug!( - target: "NETAUDIT", + netaudit!( + DEBUG, pub_key = ?pub_key2, - m = "tx5-connection", a = "recv_over_webrtc", ); for msg in webrtc_message_buffer.drain(..) { // don't bump send metrics here, // we bumped them on receive if msg_send.send(msg).await.is_err() { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: msg_send closed", + ); break; } } @@ -367,62 +474,110 @@ impl Conn { } } ConnCmd::SendMessage(msg) => { + slow_task = "send-message"; send_msg_count2.fetch_add(1, Ordering::Relaxed); send_byte_count2 .fetch_add(msg.len() as u64, Ordering::Relaxed); if send_over_webrtc { - if webrtc.message(msg).await.is_err() { + if let Err(err) = webrtc.message(msg).await { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + ?err, + a = "close: webrtc message error", + ); break; } } else if let Some(client) = client2.upgrade() { - if client - .send_message(&pub_key2, msg) - .await - .is_err() + if let Err(err) = + client.send_message(&pub_key2, msg).await { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + ?err, + a = "close: sbd client send error", + ); break; } } else { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: sbd client closed", + ); break; } } ConnCmd::WebrtcMessage(msg) => { + slow_task = "webrtc-message"; recv_msg_count2.fetch_add(1, Ordering::Relaxed); recv_byte_count2 .fetch_add(msg.len() as u64, Ordering::Relaxed); if recv_over_webrtc { if msg_send.send(msg).await.is_err() { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: msg_send closed", + ); break; } } else { webrtc_message_buffer.push(msg); - if webrtc_message_buffer.len() > 32 { + if webrtc_message_buffer.len() > MAX_WEBRTC_BUF { // prevent memory fillup + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: webrtc buffer overflow", + ); break; } } } ConnCmd::WebrtcReady => { + slow_task = "webrtc-ready"; if let Some(client) = client2.upgrade() { - if client - .send_webrtc_ready(&pub_key2) - .await - .is_err() + if let Err(err) = + client.send_webrtc_ready(&pub_key2).await { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + ?err, + a = "close: sbd client ready error", + ); break; } } else { + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: sbd client closed", + ); break; } send_over_webrtc = true; - tracing::debug!( - target: "NETAUDIT", + netaudit!( + DEBUG, pub_key = ?pub_key2, - m = "tx5-connection", a = "send_over_webrtc", ); webrtc_ready2.close(); } + ConnCmd::WebrtcClosed => { + slow_task = "webrtc-closed"; + netaudit!( + DEBUG, + pub_key = ?pub_key2, + a = "close: webrtc closed", + ); + break; + } + }).is_err() { + tracing::warn!(slow_task, "slow app on conn loop task"); + break; } } diff --git a/crates/tx5-connection/src/lib.rs b/crates/tx5-connection/src/lib.rs index f6685cc5..41da361b 100644 --- a/crates/tx5-connection/src/lib.rs +++ b/crates/tx5-connection/src/lib.rs @@ -33,6 +33,26 @@ compile_error!("Must specify exactly 1 webrtc backend"); #[cfg(feature = "backend-go-pion")] pub use tx5_go_pion::Tx5InitConfig; +/// Grace time to allow a slow app to catch up before we close a +/// connection to prevent our memory from filling up with backlogged +/// message data. +const SLOW_APP_TO: std::time::Duration = std::time::Duration::from_millis(99); + +macro_rules! breakable_timeout { + ($($t:tt)*) => { + tokio::time::timeout( + $crate::SLOW_APP_TO, + async { + loop { + {$($t)*} + break; + } + std::io::Result::Ok(()) + } + ).await + }; +} + use std::collections::HashMap; use std::future::Future; use std::io::{Error, ErrorKind, Result}; @@ -49,8 +69,17 @@ impl Drop for AbortTask { } } +struct CloseRecv(futures::channel::mpsc::Receiver); + +impl CloseRecv { + pub async fn recv(&mut self) -> Option { + use futures::stream::StreamExt; + self.0.next().await + } +} + struct CloseSend { - sender: Arc>>>, + sender: Arc>>>, close_on_drop: bool, } @@ -66,20 +95,23 @@ impl Clone for CloseSend { impl Drop for CloseSend { fn drop(&mut self) { if self.close_on_drop { - self.sender.lock().unwrap().take(); + let s = self.sender.lock().unwrap().take(); + if let Some(mut s) = s { + s.close_channel(); + } } } } impl CloseSend { - pub fn channel() -> (Self, tokio::sync::mpsc::Receiver) { - let (s, r) = tokio::sync::mpsc::channel(32); + pub fn channel() -> (Self, CloseRecv) { + let (s, r) = futures::channel::mpsc::channel(32); ( Self { sender: Arc::new(Mutex::new(Some(s))), close_on_drop: false, }, - r, + CloseRecv(r), ) } @@ -91,10 +123,11 @@ impl CloseSend { &self, t: T, ) -> impl Future> + 'static + Send { + use futures::sink::SinkExt; let s = self.sender.lock().unwrap().clone(); async move { match s { - Some(s) => { + Some(mut s) => { s.send(t).await.map_err(|_| ErrorKind::BrokenPipe.into()) } None => Err(ErrorKind::BrokenPipe.into()), @@ -106,25 +139,23 @@ impl CloseSend { &self, t: T, ) -> impl Future> + 'static + Send { - // Grace time to allow a slow app to catch up before we close a - // connection to prevent our memory from filling up with backlogged - // message data. - const SLOW_APP_TO: std::time::Duration = - std::time::Duration::from_millis(99); + use futures::sink::SinkExt; let s = self.sender.lock().unwrap().clone(); async move { match s { - Some(s) => match s.send_timeout(t, SLOW_APP_TO).await { - Err( - tokio::sync::mpsc::error::SendTimeoutError::Timeout(_), - ) => { - tracing::warn!("Closing connection due to slow app"); - Err(ErrorKind::TimedOut.into()) + Some(mut s) => { + match tokio::time::timeout(SLOW_APP_TO, s.send(t)).await { + Err(_) => { + tracing::warn!( + "Closing connection due to slow app" + ); + Err(ErrorKind::TimedOut.into()) + } + Ok(Err(_)) => Err(ErrorKind::BrokenPipe.into()), + Ok(Ok(_)) => Ok(()), } - Err(_) => Err(ErrorKind::BrokenPipe.into()), - Ok(_) => Ok(()), - }, + } None => Err(ErrorKind::BrokenPipe.into()), } } diff --git a/crates/tx5-connection/src/webrtc.rs b/crates/tx5-connection/src/webrtc.rs index a113dae9..f0528807 100644 --- a/crates/tx5-connection/src/webrtc.rs +++ b/crates/tx5-connection/src/webrtc.rs @@ -1,4 +1,4 @@ -use crate::{AbortTask, CloseSend}; +use crate::{AbortTask, CloseRecv, CloseSend}; use std::io::{Error, Result}; pub enum WebrtcEvt { @@ -35,7 +35,7 @@ impl Webrtc { is_polite: bool, config: Vec, send_buffer: usize, - ) -> (Self, tokio::sync::mpsc::Receiver) { + ) -> (Self, CloseRecv) { let (mut cmd_send, cmd_recv) = CloseSend::channel(); let (mut evt_send, evt_recv) = CloseSend::channel(); @@ -99,7 +99,7 @@ async fn task( send_buffer: usize, mut evt_send: CloseSend, cmd_send: CloseSend, - mut cmd_recv: tokio::sync::mpsc::Receiver, + mut cmd_recv: CloseRecv, ) -> Result<()> { evt_send.set_close_on_drop(true); @@ -148,9 +148,17 @@ async fn task( offer = Some(o); } - while let Some(cmd) = cmd_recv.recv().await { - match cmd { + loop { + let cmd = match cmd_recv.recv().await { + None => break, + Some(cmd) => cmd, + }; + + let mut slow_task = "unknown"; + + match breakable_timeout!(match cmd { Cmd::InOffer(o) => { + slow_task = "in-offer"; if is_polite && !did_handshake { peer.set_remote_description(o).await?; let mut a = peer.create_answer(b"{}".to_vec()).await?; @@ -162,6 +170,7 @@ async fn task( } } Cmd::InAnswer(a) => { + slow_task = "in-answer"; if !is_polite && !did_handshake { if let Some(o) = offer.take() { peer.set_local_description(o).await?; @@ -171,12 +180,15 @@ async fn task( } } Cmd::InIce(i) => { + slow_task = "in-ice"; let _ = peer.add_ice_candidate(i).await; } Cmd::GeneratedIce(ice) => { + slow_task = "gen-ice"; evt_send.send(WebrtcEvt::GeneratedIce(ice)).await?; } Cmd::DataChan(d, dr) => { + slow_task = "data-chan"; if data.is_none() { d.set_buffered_amount_low_threshold(send_buffer)?; data = Some(d); @@ -184,6 +196,7 @@ async fn task( } } Cmd::SendMessage(msg, resp) => { + slow_task = "send-msg"; if let Some(d) = &data { let amt = match d.send(msg).await { Ok(amt) => amt, @@ -200,15 +213,25 @@ async fn task( } } Cmd::RecvMessage(msg) => { + slow_task = "recv-msg"; evt_send.send(WebrtcEvt::Message(msg)).await?; } Cmd::DataChanOpen => { + slow_task = "chan-open"; evt_send.send(WebrtcEvt::Ready).await?; } Cmd::BufferedAmountLow => { + slow_task = "buf-low"; pend_buffer.clear(); } - } + }) { + Err(_) => { + let err = format!("slow app on webrtc loop task: {slow_task}"); + tracing::warn!("{err}"); + Err(Error::other(err)) + } + Ok(r) => r, + }?; } Ok(()) @@ -220,10 +243,10 @@ fn spawn_data_chan( tx5_go_pion::DataChannelEvent, >, ) -> Option>> { - cmd_send.set_close_on_drop(true); - use tx5_go_pion::DataChannelEvent as Evt; Some(AbortTask(tokio::task::spawn(async move { + cmd_send.set_close_on_drop(true); + // Receiving on the unbounded data channel receiver has a real // chance to fill our memory with message data. // We give a small chance for the app to catch up, otherwise