From a424c2aa1782c2fc9a43073712403924f413ef25 Mon Sep 17 00:00:00 2001 From: "nacho.d.g" Date: Sat, 19 Oct 2024 20:03:02 +0200 Subject: [PATCH] fix: resend checks should be faster to make transport more efficient (#1271) --- Cargo.lock | 88 ++++++ crates/core/Cargo.toml | 1 + .../core/src/node/network_bridge/handshake.rs | 4 +- .../core/src/transport/connection_handler.rs | 276 +++++++++--------- .../src/{transport.rs => transport/mod.rs} | 0 crates/core/src/transport/peer_connection.rs | 4 +- 6 files changed, 236 insertions(+), 137 deletions(-) rename crates/core/src/{transport.rs => transport/mod.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index e439b3ee0..4f26cd8c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -668,6 +668,45 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "console-api" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ed14aa9c9f927213c6e4f3ef75faaad3406134efe84ba2cb7983431d5f0931" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e3a111a37f3333946ebf9da370ba5c5577b18eb342ec683eb488dd21980302" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "hyper-util", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -826,6 +865,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam" version = "0.8.4" @@ -1294,6 +1342,16 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "flate2" +version = "1.0.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.0" @@ -1354,6 +1412,7 @@ dependencies = [ "chacha20poly1305", "chrono", "clap", + "console-subscriber", "cookie", "crossbeam", "ctrlc", @@ -1705,6 +1764,19 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.4.0" @@ -1904,6 +1976,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "1.4.1" @@ -3148,6 +3226,15 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "prost-types" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -4428,6 +4515,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index bf95ee7c6..49c61f4cf 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -88,6 +88,7 @@ pico-args = "0.5" statrs = "0.17" tempfile = "3" tracing = "0.1" +console-subscriber = { version = "0.4" } [features] default = ["redb", "trace", "websocket"] diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index 70a11e347..57e111aff 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -1659,7 +1659,7 @@ mod tests { if i > 3 { // Create the successful connection let (remote, ev) = tokio::time::timeout( - Duration::from_secs(1), + Duration::from_secs(2), test.transport.outbound_recv.recv(), ) .await? @@ -1693,7 +1693,7 @@ mod tests { let mut conn_count = 0; let mut gw_rejected = false; for conn_num in 3..Ring::DEFAULT_MAX_HOPS_TO_LIVE { - let event = tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events()) + let event = tokio::time::timeout(Duration::from_secs(2), handler.wait_for_events()) .await??; match event { Event::OutboundConnectionSuccessful { diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index 46b375eda..6daca0419 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -803,29 +803,25 @@ struct InboundRemoteConnection { #[cfg(test)] mod test { + #![allow(clippy::single_range_in_vec_init)] + use std::{ - collections::HashMap, net::Ipv4Addr, ops::Range, - sync::{ - atomic::{AtomicU16, AtomicU64, AtomicUsize, Ordering}, - OnceLock, - }, + sync::atomic::{AtomicU16, AtomicU64, AtomicUsize, Ordering}, }; + use dashmap::DashMap; use futures::{stream::FuturesOrdered, TryStreamExt}; use rand::{Rng, SeedableRng}; use serde::{de::DeserializeOwned, Serialize}; - use tokio::sync::{Mutex, RwLock}; + use tokio::sync::Mutex; use tracing::info; use super::*; use crate::transport::packet_data::MAX_DATA_SIZE; - #[allow(clippy::type_complexity)] - static CHANNELS: OnceLock< - Arc)>>>>, - > = OnceLock::new(); + type Channels = Arc)>>>; #[derive(Default, Clone)] enum PacketDropPolicy { @@ -834,8 +830,6 @@ mod test { ReceiveAll, /// Drop the packets randomly based on the factor Factor(f64), - /// Drop packets fall in the given range - Range(Range), /// Drop packets with the given ranges Ranges(Vec>), } @@ -845,25 +839,28 @@ mod test { this: SocketAddr, packet_drop_policy: PacketDropPolicy, num_packets_sent: AtomicUsize, - rng: Mutex, + rng: std::sync::Mutex, + channels: Channels, } impl MockSocket { - async fn test_config(packet_drop_policy: PacketDropPolicy, addr: SocketAddr) -> Self { - let channels = CHANNELS - .get_or_init(|| Arc::new(RwLock::new(HashMap::new()))) - .clone(); + async fn test_config( + packet_drop_policy: PacketDropPolicy, + addr: SocketAddr, + channels: Channels, + ) -> Self { let (outbound, inbound) = mpsc::unbounded_channel(); - channels.write().await.insert(addr, outbound); + channels.insert(addr, outbound); static SEED: AtomicU64 = AtomicU64::new(0xfeedbeef); MockSocket { inbound: Mutex::new(inbound), this: addr, packet_drop_policy, num_packets_sent: AtomicUsize::new(0), - rng: Mutex::new(rand::rngs::SmallRng::seed_from_u64( + rng: std::sync::Mutex::new(rand::rngs::SmallRng::seed_from_u64( SEED.fetch_add(1, std::sync::atomic::Ordering::SeqCst), )), + channels, } } } @@ -894,12 +891,6 @@ mod test { return Ok(buf.len()); } } - PacketDropPolicy::Range(r) => { - if r.contains(&packet_idx) { - tracing::trace!(id=%packet_idx, %self.this, "drop packet"); - return Ok(buf.len()); - } - } PacketDropPolicy::Ranges(ranges) => { if ranges.iter().any(|r| r.contains(&packet_idx)) { tracing::trace!(id=%packet_idx, %self.this, "drop packet"); @@ -909,11 +900,7 @@ mod test { } assert!(self.this != target, "cannot send to self"); - let channels = CHANNELS - .get_or_init(|| Arc::new(RwLock::new(HashMap::new()))) - .clone(); - let channels = channels.read().await; - let Some(sender) = channels.get(&target) else { + let Some(sender) = self.channels.get(&target).map(|v| v.value().clone()) else { return Ok(0); }; // tracing::trace!(?target, ?self.this, "sending packet to remote"); @@ -927,30 +914,22 @@ mod test { impl Drop for MockSocket { fn drop(&mut self) { - let channels = CHANNELS - .get_or_init(|| Arc::new(RwLock::new(HashMap::new()))) - .clone(); - loop { - if let Ok(mut channels) = channels.try_write() { - channels.remove(&self.this); - break; - } - // unorthodox blocking here but shouldn't be a problem for testing - std::thread::sleep(Duration::from_millis(1)); - } + self.channels.remove(&self.this); } } async fn set_peer_connection( packet_drop_policy: PacketDropPolicy, + channels: Channels, ) -> anyhow::Result<(TransportPublicKey, OutboundConnectionHandler, SocketAddr)> { - set_peer_connection_in(packet_drop_policy, false) + set_peer_connection_in(packet_drop_policy, false, channels) .await .map(|(pk, (o, _), s)| (pk, o, s)) } async fn set_gateway_connection( packet_drop_policy: PacketDropPolicy, + channels: Channels, ) -> Result< ( TransportPublicKey, @@ -959,12 +938,13 @@ mod test { ), anyhow::Error, > { - set_peer_connection_in(packet_drop_policy, true).await + set_peer_connection_in(packet_drop_policy, true, channels).await } async fn set_peer_connection_in( packet_drop_policy: PacketDropPolicy, gateway: bool, + channels: Channels, ) -> Result< ( TransportPublicKey, @@ -979,7 +959,12 @@ mod test { let peer_pub = peer_keypair.public.clone(); let port = PORT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let socket = Arc::new( - MockSocket::test_config(packet_drop_policy, (Ipv4Addr::LOCALHOST, port).into()).await, + MockSocket::test_config( + packet_drop_policy, + (Ipv4Addr::LOCALHOST, port).into(), + channels, + ) + .await, ); let (peer_conn, inbound_conn) = OutboundConnectionHandler::new_test( (Ipv4Addr::LOCALHOST, port).into(), @@ -1025,9 +1010,10 @@ mod test { assert_eq!(generators.len(), config.peers); let mut peer_keys_and_addr = vec![]; let mut peer_conns = vec![]; + let channels = Arc::new(DashMap::new()); for _ in 0..config.peers { let (peer_pub, peer, peer_addr) = - set_peer_connection(config.packet_drop_policy.clone()).await?; + set_peer_connection(config.packet_drop_policy.clone(), channels.clone()).await?; peer_keys_and_addr.push((peer_pub, peer_addr)); peer_conns.push(peer); } @@ -1130,8 +1116,11 @@ mod test { #[tokio::test] async fn simulate_nat_traversal() -> anyhow::Result<()> { // crate::config::set_logger(); - let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default()).await?; - let (peer_b_pub, mut peer_b, peer_b_addr) = set_peer_connection(Default::default()).await?; + let channels = Arc::new(DashMap::new()); + let (peer_a_pub, mut peer_a, peer_a_addr) = + set_peer_connection(Default::default(), channels.clone()).await?; + let (peer_b_pub, mut peer_b, peer_b_addr) = + set_peer_connection(Default::default(), channels).await?; let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; @@ -1154,10 +1143,11 @@ mod test { #[tokio::test] async fn simulate_nat_traversal_drop_first_packets_for_all() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); + let channels = Arc::new(DashMap::new()); let (peer_a_pub, mut peer_a, peer_a_addr) = - set_peer_connection(PacketDropPolicy::Range(0..1)).await?; + set_peer_connection(PacketDropPolicy::Ranges(vec![0..1]), channels.clone()).await?; let (peer_b_pub, mut peer_b, peer_b_addr) = - set_peer_connection(PacketDropPolicy::Range(0..1)).await?; + set_peer_connection(PacketDropPolicy::Ranges(vec![0..1]), channels).await?; let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; @@ -1180,9 +1170,11 @@ mod test { #[tokio::test] async fn simulate_nat_traversal_drop_first_packets_of_peerb() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); - let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default()).await?; + let channels = Arc::new(DashMap::new()); + let (peer_a_pub, mut peer_a, peer_a_addr) = + set_peer_connection(Default::default(), channels.clone()).await?; let (peer_b_pub, mut peer_b, peer_b_addr) = - set_peer_connection(PacketDropPolicy::Range(0..1)).await?; + set_peer_connection(PacketDropPolicy::Ranges(vec![0..1]), channels).await?; let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; @@ -1207,9 +1199,14 @@ mod test { #[tokio::test] async fn simulate_nat_traversal_drop_packet_ranges_of_peerb_killed() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); - let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default()).await?; - let (peer_b_pub, mut peer_b, peer_b_addr) = - set_peer_connection(PacketDropPolicy::Ranges(vec![0..1, 3..usize::MAX])).await?; + let channels = Arc::new(DashMap::new()); + let (peer_a_pub, mut peer_a, peer_a_addr) = + set_peer_connection(Default::default(), channels.clone()).await?; + let (peer_b_pub, mut peer_b, peer_b_addr) = set_peer_connection( + PacketDropPolicy::Ranges(vec![0..1, 3..usize::MAX]), + channels, + ) + .await?; let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; @@ -1242,10 +1239,12 @@ mod test { #[tokio::test] async fn simulate_nat_traversal_drop_packet_ranges_of_peerb() -> anyhow::Result<()> { - // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); - let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default()).await?; + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); + let channels = Arc::new(DashMap::new()); + let (peer_a_pub, mut peer_a, peer_a_addr) = + set_peer_connection(Default::default(), channels.clone()).await?; let (peer_b_pub, mut peer_b, peer_b_addr) = - set_peer_connection(PacketDropPolicy::Ranges(vec![0..1, 3..9])).await?; + set_peer_connection(PacketDropPolicy::Ranges(vec![0..1, 3..9]), channels).await?; let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; @@ -1287,10 +1286,11 @@ mod test { #[tokio::test] async fn simulate_gateway_connection() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); + let channels = Arc::new(DashMap::new()); let (_peer_a_pub, mut peer_a, _peer_a_addr) = - set_peer_connection(Default::default()).await?; + set_peer_connection(Default::default(), channels.clone()).await?; let (gw_pub, (_oc, mut gw_conn), gw_addr) = - set_gateway_connection(Default::default()).await?; + set_gateway_connection(Default::default(), channels).await?; let gw = tokio::spawn(async move { let gw_conn = gw_conn.recv(); @@ -1315,10 +1315,11 @@ mod test { #[tokio::test] async fn simulate_gateway_connection_drop_first_packets_of_gateway() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); + let channels = Arc::new(DashMap::new()); let (_peer_a_pub, mut peer_a, _peer_a_addr) = - set_peer_connection(Default::default()).await?; + set_peer_connection(Default::default(), channels.clone()).await?; let (gw_pub, (_oc, mut gw_conn), gw_addr) = - set_gateway_connection(PacketDropPolicy::Range(0..1)).await?; + set_gateway_connection(PacketDropPolicy::Ranges(vec![0..1]), channels.clone()).await?; let gw = tokio::spawn(async move { let gw_conn = gw_conn.recv(); @@ -1343,10 +1344,11 @@ mod test { #[tokio::test] async fn simulate_gateway_connection_drop_first_packets_for_all() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); + let channels = Arc::new(DashMap::new()); let (_peer_a_pub, mut peer_a, _peer_a_addr) = - set_peer_connection(PacketDropPolicy::Range(0..1)).await?; + set_peer_connection(PacketDropPolicy::Ranges(vec![0..1]), channels.clone()).await?; let (gw_pub, (_oc, mut gw_conn), gw_addr) = - set_gateway_connection(PacketDropPolicy::Range(0..1)).await?; + set_gateway_connection(PacketDropPolicy::Ranges(vec![0..1]), channels).await?; let gw = tokio::spawn(async move { let gw_conn = gw_conn.recv(); @@ -1371,10 +1373,11 @@ mod test { #[tokio::test] async fn simulate_gateway_connection_drop_first_packets_of_peer() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE)); + let channels = Arc::new(DashMap::new()); let (_peer_a_pub, mut peer_a, _peer_a_addr) = - set_peer_connection(PacketDropPolicy::Range(0..1)).await?; + set_peer_connection(PacketDropPolicy::Ranges(vec![0..1]), channels.clone()).await?; let (gw_pub, (_oc, mut gw_conn), gw_addr) = - set_gateway_connection(Default::default()).await?; + set_gateway_connection(Default::default(), channels).await?; let gw = tokio::spawn(async move { let gw_conn = gw_conn.recv(); @@ -1432,8 +1435,11 @@ mod test { #[tokio::test] async fn simulate_send_max_short_message() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::ERROR)); - let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default()).await?; - let (peer_b_pub, mut peer_b, peer_b_addr) = set_peer_connection(Default::default()).await?; + let channels = Arc::new(DashMap::new()); + let (peer_a_pub, mut peer_a, peer_a_addr) = + set_peer_connection(Default::default(), channels.clone()).await?; + let (peer_b_pub, mut peer_b, peer_b_addr) = + set_peer_connection(Default::default(), channels).await?; let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; @@ -1460,46 +1466,35 @@ mod test { Ok(()) } - #[test] - #[should_panic] - fn simulate_send_max_short_message_plus_1() { - // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::ERROR)); - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { - let (peer_a_pub, mut peer_a, peer_a_addr) = - set_peer_connection(Default::default()).await?; - let (peer_b_pub, mut peer_b, peer_b_addr) = - set_peer_connection(Default::default()).await?; - - let peer_b = tokio::spawn(async move { - let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; - let mut conn = - tokio::time::timeout(Duration::from_secs(500), peer_a_conn).await??; - let data = vec![0u8; 1433]; - let data = - tokio::task::spawn_blocking(move || bincode::serialize(&data).unwrap()) - .await - .unwrap(); - conn.outbound_short_message(data).await?; - Ok::<_, anyhow::Error>(()) - }); + #[tokio::test] + async fn simulate_send_max_short_message_plus_1() -> anyhow::Result<()> { + let channels = Arc::new(DashMap::new()); + let (peer_a_pub, mut peer_a, peer_a_addr) = + set_peer_connection(Default::default(), channels.clone()).await?; + let (peer_b_pub, mut peer_b, peer_b_addr) = + set_peer_connection(Default::default(), channels).await?; - let peer_a = tokio::spawn(async move { - let peer_b_conn = peer_a.connect(peer_b_pub, peer_b_addr).await; - let mut conn = - tokio::time::timeout(Duration::from_secs(500), peer_b_conn).await??; - let msg = conn.recv().await?; - assert!(msg.len() <= MAX_DATA_SIZE); - Ok::<_, anyhow::Error>(()) - }); + let peer_a = tokio::spawn(async move { + let peer_b_conn = peer_a.connect(peer_b_pub, peer_b_addr).await; + let mut conn = tokio::time::timeout(Duration::from_secs(1), peer_b_conn).await??; + let _ = tokio::time::timeout(Duration::from_secs(1), conn.recv()).await??; + Ok::<_, anyhow::Error>(()) + }); - let (a, b) = tokio::try_join!(peer_a, peer_b)?; - a?; - b?; - Result::<(), anyhow::Error>::Ok(()) - }) - .unwrap(); + let peer_b = tokio::spawn(async move { + let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; + let mut conn = tokio::time::timeout(Duration::from_secs(1), peer_a_conn).await??; + let data = vec![0u8; MAX_DATA_SIZE + 1]; + let data = + tokio::task::spawn_blocking(move || bincode::serialize(&data).unwrap()).await?; + conn.outbound_short_message(data).await?; + Ok::<_, anyhow::Error>(()) + }); + + let (a, b) = tokio::join!(peer_a, peer_b); + assert!(a?.is_err()); + assert!(b.is_err()); + Ok(()) } #[tokio::test] @@ -1534,11 +1529,8 @@ mod test { .await } - // #[ignore] - #[tokio::test(flavor = "multi_thread", worker_threads = 5)] - // #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn simulate_packet_dropping() -> anyhow::Result<()> { - // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::INFO)); #[derive(Clone, Copy)] struct TestData(&'static str); @@ -1563,32 +1555,50 @@ mod test { let mut tests = FuturesOrdered::new(); let mut rng = rand::rngs::StdRng::seed_from_u64(3); - for factor in std::iter::repeat(()) - .map(|_| rng.gen::()) - .filter(|x| *x > 0.05 && *x < 0.25) - .take(5) - { - let wait_time = Duration::from_secs((((factor * 5.0 + 1.0) * 15.0) + 10.0) as u64); - println!( - "packet loss factor: {factor} (wait time: {wait_time})", - wait_time = wait_time.as_secs() - ); - tests.push_back(tokio::spawn(run_test( - TestConfig { - packet_drop_policy: PacketDropPolicy::Factor(factor), - wait_time, - ..Default::default() - }, - vec![TestData("foo"), TestData("bar")], - ))); - } let mut test_no = 0; - while let Some(result) = tests.next().await { - result?.inspect_err(|_| { - tracing::error!(%test_no, "error in test"); - })?; - test_no += 1; + for _ in 0..2 { + for factor in std::iter::repeat(()) + .map(|_| rng.gen::()) + .filter(|x| *x > 0.05 && *x < 0.25) + .take(3) + { + let wait_time = Duration::from_secs(((factor * 5.0 * 15.0) + 15.0) as u64); + tracing::info!( + "test #{test_no}: packet loss factor: {factor} (wait time: {wait_time})", + wait_time = wait_time.as_secs() + ); + + let now = std::time::Instant::now(); + tests.push_back(tokio::spawn( + run_test( + TestConfig { + packet_drop_policy: PacketDropPolicy::Factor(factor), + wait_time, + ..Default::default() + }, + vec![TestData("foo"), TestData("bar")], + ) + .inspect(move |r| { + let msg = if r.is_ok() { + format!("successfully, total time: {}s (t/o: {}s, factor: {factor:.3})", now.elapsed().as_secs(), wait_time.as_secs()) + } else { + format!("with error, total time: {}s (t/o: {}s, factor: {factor:.3})", now.elapsed().as_secs(), wait_time.as_secs()) + }; + if r.is_err() { + tracing::error!("test #{test_no} finished {}", msg); + } else { + tracing::info!("test #{test_no} finished {}", msg); + } + }), + )); + test_no += 1; + } + + while let Some(result) = tests.next().await { + result??; + } } + Ok(()) } } diff --git a/crates/core/src/transport.rs b/crates/core/src/transport/mod.rs similarity index 100% rename from crates/core/src/transport.rs rename to crates/core/src/transport/mod.rs diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 06eefcaf4..34dbaa01b 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -205,7 +205,7 @@ impl PeerConnection { #[instrument(name = "peer_connection", skip(self))] pub async fn recv(&mut self) -> Result> { // listen for incoming messages or receipts or wait until is time to do anything else again - let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_secs(1))); + let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_millis(10))); // #[cfg(debug_assertions)] // const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2); @@ -304,7 +304,7 @@ impl PeerConnection { tracing::trace!(remote = ?self.remote_conn.remote_addr, "sending keep-alive"); self.noop(vec![]).await?; } - _ = resend_check.take().unwrap_or(tokio::time::sleep(Duration::from_secs(5))) => { + _ = resend_check.take().unwrap_or(tokio::time::sleep(Duration::from_millis(10))) => { loop { tracing::trace!(remote = ?self.remote_conn.remote_addr, "checking for resends"); let maybe_resend = self.remote_conn