diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index 3a45c50ca..4c1479a05 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -340,7 +340,7 @@ impl UdpPacketsListener { // Handling of connection events connection_event = self.connection_handler.recv() => { let Some((remote_addr, event)) = connection_event else { - tracing::debug!("connection handler closed"); + tracing::debug!(%self.this_addr, "connection handler closed"); return Ok(()); }; if let Some(_conn) = self.remote_connections.remove(&remote_addr) { @@ -1198,7 +1198,7 @@ mod test { #[tokio::test] async fn simulate_nat_traversal_drop_packet_ranges_of_peerb_killed() -> anyhow::Result<()> { - // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); let channels = Arc::new(DashMap::new()); let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default(), channels.clone()).await?; @@ -1210,23 +1210,25 @@ mod test { let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; - let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_a_conn).await??; - let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await; - conn.send("some data").await.unwrap(); + let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_a_conn).await??; + conn.send("some data").await.inspect_err(|error| { + tracing::error!(%error, "error while sending message to peer a"); + })?; + tracing::info!("Dropping peer b"); Ok::<_, anyhow::Error>(()) }); let peer_a = tokio::spawn(async move { let peer_b_conn = peer_a.connect(peer_b_pub, peer_b_addr).await; - let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_b_conn).await??; - let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await; + let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_b_conn).await??; + let b = tokio::time::timeout(Duration::from_secs(2), conn.recv()).await??; // we should receive the message - let b = conn.recv().await.unwrap(); - assert_eq!(b, b"some data"); - // as peer b will drop all packets after the third packet, the connection should be broken - tokio::time::sleep(Duration::from_secs(10)).await; + assert_eq!(&b[8..], b"some data"); + tracing::info!("Peer a received package from peer b"); + tokio::time::sleep(Duration::from_secs(3)).await; // conn should be broken as the remote peer cannot receive message and ping - conn.recv().await.unwrap_err(); + let res = conn.recv().await; + assert!(res.is_err()); Ok::<_, anyhow::Error>(()) }); @@ -1239,7 +1241,6 @@ mod test { #[tokio::test] async fn simulate_nat_traversal_drop_packet_ranges_of_peerb() -> anyhow::Result<()> { - // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); let channels = Arc::new(DashMap::new()); let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default(), channels.clone()).await?; @@ -1248,37 +1249,50 @@ mod test { let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; - let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_a_conn).await??; - let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await; - conn.send("some foo").await.unwrap(); - - tokio::time::sleep(Duration::from_secs(10)).await; - + let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_a_conn) + .await + .inspect_err(|_| tracing::error!("peer a timed out"))? + .inspect_err(|error| tracing::error!(%error, "error while connecting to peer a"))?; + tracing::info!("Connected peer b to peer a"); + conn.send("some foo").await.inspect_err(|error| { + tracing::error!(%error, "error while sending 1st message"); + })?; + tokio::time::sleep(Duration::from_secs(2)).await; // although we drop some packets, we still alive - conn.send("some data").await.unwrap(); - Ok::<_, anyhow::Error>(()) + conn.send("some data").await.inspect_err(|error| { + tracing::error!(%error, "error while sending 2nd message"); + })?; + let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await; + Ok::<_, anyhow::Error>(conn) }); let peer_a = tokio::spawn(async move { let peer_b_conn = peer_a.connect(peer_b_pub, peer_b_addr).await; - let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_b_conn).await??; - let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await; + let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_b_conn) + .await + .inspect_err(|_| tracing::error!("peer b timed out"))? + .inspect_err(|error| tracing::error!(%error, "error while connecting to peer b"))?; + tracing::info!("Connected peer a to peer b"); - tokio::time::sleep(Duration::from_secs(5)).await; // we should receive the message - let b = conn.recv().await.unwrap(); + let b = conn.recv().await.inspect_err(|error| { + tracing::error!(%error, "error while receiving 1st message"); + })?; assert_eq!(&b[8..], b"some foo"); - - tokio::time::sleep(Duration::from_secs(10)).await; // conn should not be broken - let b = conn.recv().await.unwrap(); + let b = conn.recv().await.inspect_err(|error| { + tracing::error!(%error, "error while receiving 2nd message"); + })?; assert_eq!(&b[8..], b"some data"); - Ok::<_, anyhow::Error>(()) + let _ = conn.send("complete").await.inspect_err(|error| { + tracing::error!(%error, "error while sending 3rd message"); + }); + Ok::<_, anyhow::Error>(conn) }); let (a, b) = tokio::try_join!(peer_a, peer_b)?; - a?; - b?; + let _ = a?; + let _ = b?; Ok(()) } diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 34dbaa01b..722316630 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -207,13 +207,13 @@ impl PeerConnection { // listen for incoming messages or receipts or wait until is time to do anything else again let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_millis(10))); - // #[cfg(debug_assertions)] - // const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2); - // #[cfg(not(debug_assertions))] + #[cfg(debug_assertions)] + const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2); + #[cfg(not(debug_assertions))] const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(20); - // #[cfg(debug_assertions)] - // const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(6); - // #[cfg(not(debug_assertions))] + #[cfg(debug_assertions)] + const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(6); + #[cfg(not(debug_assertions))] const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(60); let mut keep_alive = tokio::time::interval(KEEP_ALIVE_INTERVAL); @@ -306,7 +306,7 @@ impl PeerConnection { } _ = resend_check.take().unwrap_or(tokio::time::sleep(Duration::from_millis(10))) => { loop { - tracing::trace!(remote = ?self.remote_conn.remote_addr, "checking for resends"); + // tracing::trace!(remote = ?self.remote_conn.remote_addr, "checking for resends"); let maybe_resend = self.remote_conn .sent_tracker .lock()