Skip to content

Commit

Permalink
Resend checks should be more frequent to return faster
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Oct 19, 2024
1 parent 0330c7d commit aa1e147
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 50 deletions.
88 changes: 88 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pico-args = "0.5"
statrs = "0.17"
tempfile = "3"
tracing = "0.1"
console-subscriber = { version = "0.4" }

[features]
default = ["redb", "trace", "websocket"]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ mod tests {
if i > 3 {
// Create the successful connection
let (remote, ev) = tokio::time::timeout(
Duration::from_secs(1),
Duration::from_secs(2),
test.transport.outbound_recv.recv(),
)
.await?
Expand Down Expand Up @@ -1693,7 +1693,7 @@ mod tests {
let mut conn_count = 0;
let mut gw_rejected = false;
for conn_num in 3..Ring::DEFAULT_MAX_HOPS_TO_LIVE {
let event = tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events())
let event = tokio::time::timeout(Duration::from_secs(2), handler.wait_for_events())
.await??;
match event {
Event::OutboundConnectionSuccessful {
Expand Down
89 changes: 43 additions & 46 deletions crates/core/src/transport/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1529,10 +1529,8 @@ mod test {
.await
}

// #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn simulate_packet_dropping() -> anyhow::Result<()> {
crate::config::set_logger(Some(tracing::level_filters::LevelFilter::INFO), None);
#[derive(Clone, Copy)]
struct TestData(&'static str);

Expand All @@ -1543,7 +1541,7 @@ mod test {
}

fn gen_msg(&mut self) -> Self::Message {
self.0.repeat(500)
self.0.repeat(1000)
}

fn assert_message_ok(&self, _: usize, msg: Self::Message) -> bool {
Expand All @@ -1555,53 +1553,52 @@ mod test {
}
}

const BYTES_SENT: u64 = 3 * 1000 * 10;

let mut tests = FuturesOrdered::new();
let mut rng = rand::rngs::StdRng::seed_from_u64(3);
for (i, factor) in std::iter::repeat(())
.map(|_| rng.gen::<f64>())
.filter(|x| *x > 0.05 && *x < 0.25)
.skip(1)
.take(1)
.enumerate()
{
let wait_time = Duration::from_secs((((factor * 5.0 + 1.0) * 15.0) + 10.0) as u64);
tracing::debug!(
"test #{i}: packet loss factor: {factor} (wait time: {wait_time})",
wait_time = wait_time.as_secs()
);
tests.push_back(tokio::spawn(
run_test(
TestConfig {
packet_drop_policy: PacketDropPolicy::Factor(factor),
wait_time,
..Default::default()
},
vec![TestData("foo"), TestData("bar")],
)
.inspect(move |r| {
tracing::debug!(
"test #{i} finished {}",
if r.is_ok() {
"successfully".to_owned()
let mut test_no = 0;
for _ in 0..2 {
for factor in std::iter::repeat(())
.map(|_| rng.gen::<f64>())
.filter(|x| *x > 0.05 && *x < 0.25)
.take(3)
{
let wait_time = Duration::from_secs(((factor * 5.0 * 15.0) + 15.0) as u64);
tracing::info!(
"test #{test_no}: packet loss factor: {factor} (wait time: {wait_time})",
wait_time = wait_time.as_secs()
);

let now = std::time::Instant::now();
tests.push_back(tokio::spawn(
run_test(
TestConfig {
packet_drop_policy: PacketDropPolicy::Factor(factor),
wait_time,
..Default::default()
},
vec![TestData("foo"), TestData("bar")],
)
.inspect(move |r| {
let msg = if r.is_ok() {
format!("successfully, total time: {}s (t/o: {}s, factor: {factor:.3})", now.elapsed().as_secs(), wait_time.as_secs())
} else {
format!(
"with error, rate: {} bytes/sec",
BYTES_SENT / wait_time.as_secs()
)
format!("with error, total time: {}s (t/o: {}s, factor: {factor:.3})", now.elapsed().as_secs(), wait_time.as_secs())
};
if r.is_err() {
tracing::error!("test #{test_no} finished {}", msg);
} else {
tracing::info!("test #{test_no} finished {}", msg);
}
);
}),
));
}
let mut test_no = 0;
while let Some(result) = tests.next().await {
result?.inspect_err(|_| {
tracing::error!(%test_no, "error in test");
})?;
test_no += 1;
}),
));
test_no += 1;
}

while let Some(result) = tests.next().await {
result??;
}
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions crates/core/src/transport/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl PeerConnection {
#[instrument(name = "peer_connection", skip(self))]
pub async fn recv(&mut self) -> Result<Vec<u8>> {
// listen for incoming messages or receipts or wait until is time to do anything else again
let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_secs(1)));
let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_millis(10)));

// #[cfg(debug_assertions)]
// const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -304,7 +304,7 @@ impl PeerConnection {
tracing::trace!(remote = ?self.remote_conn.remote_addr, "sending keep-alive");
self.noop(vec![]).await?;
}
_ = resend_check.take().unwrap_or(tokio::time::sleep(Duration::from_secs(5))) => {
_ = resend_check.take().unwrap_or(tokio::time::sleep(Duration::from_millis(10))) => {
loop {
tracing::trace!(remote = ?self.remote_conn.remote_addr, "checking for resends");
let maybe_resend = self.remote_conn
Expand Down

0 comments on commit aa1e147

Please sign in to comment.