Skip to content

Commit

Permalink
fix: remaining transport tests
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Oct 19, 2024
1 parent af262f8 commit bc57b41
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 38 deletions.
76 changes: 45 additions & 31 deletions crates/core/src/transport/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl<S: Socket> UdpPacketsListener<S> {
// Handling of connection events
connection_event = self.connection_handler.recv() => {
let Some((remote_addr, event)) = connection_event else {
tracing::debug!("connection handler closed");
tracing::debug!(%self.this_addr, "connection handler closed");
return Ok(());
};
if let Some(_conn) = self.remote_connections.remove(&remote_addr) {
Expand Down Expand Up @@ -1198,7 +1198,7 @@ mod test {

#[tokio::test]
async fn simulate_nat_traversal_drop_packet_ranges_of_peerb_killed() -> anyhow::Result<()> {
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE));
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None);
let channels = Arc::new(DashMap::new());
let (peer_a_pub, mut peer_a, peer_a_addr) =
set_peer_connection(Default::default(), channels.clone()).await?;
Expand All @@ -1210,23 +1210,25 @@ mod test {

let peer_b = tokio::spawn(async move {
let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await;
let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_a_conn).await??;
let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await;
conn.send("some data").await.unwrap();
let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_a_conn).await??;
conn.send("some data").await.inspect_err(|error| {
tracing::error!(%error, "error while sending message to peer a");
})?;
tracing::info!("Dropping peer b");
Ok::<_, anyhow::Error>(())
});

let peer_a = tokio::spawn(async move {
let peer_b_conn = peer_a.connect(peer_b_pub, peer_b_addr).await;
let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_b_conn).await??;
let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await;
let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_b_conn).await??;
let b = tokio::time::timeout(Duration::from_secs(2), conn.recv()).await??;
// we should receive the message
let b = conn.recv().await.unwrap();
assert_eq!(b, b"some data");
// as peer b will drop all packets after the third packet, the connection should be broken
tokio::time::sleep(Duration::from_secs(10)).await;
assert_eq!(&b[8..], b"some data");
tracing::info!("Peer a received package from peer b");
tokio::time::sleep(Duration::from_secs(3)).await;
// conn should be broken as the remote peer cannot receive message and ping
conn.recv().await.unwrap_err();
let res = conn.recv().await;
assert!(res.is_err());
Ok::<_, anyhow::Error>(())
});

Expand All @@ -1239,7 +1241,6 @@ mod test {

#[tokio::test]
async fn simulate_nat_traversal_drop_packet_ranges_of_peerb() -> anyhow::Result<()> {
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None);
let channels = Arc::new(DashMap::new());
let (peer_a_pub, mut peer_a, peer_a_addr) =
set_peer_connection(Default::default(), channels.clone()).await?;
Expand All @@ -1248,37 +1249,50 @@ mod test {

let peer_b = tokio::spawn(async move {
let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await;
let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_a_conn).await??;
let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await;
conn.send("some foo").await.unwrap();

tokio::time::sleep(Duration::from_secs(10)).await;

let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_a_conn)
.await
.inspect_err(|_| tracing::error!("peer a timed out"))?
.inspect_err(|error| tracing::error!(%error, "error while connecting to peer a"))?;
tracing::info!("Connected peer b to peer a");
conn.send("some foo").await.inspect_err(|error| {
tracing::error!(%error, "error while sending 1st message");
})?;
tokio::time::sleep(Duration::from_secs(2)).await;
// although we drop some packets, we still alive
conn.send("some data").await.unwrap();
Ok::<_, anyhow::Error>(())
conn.send("some data").await.inspect_err(|error| {
tracing::error!(%error, "error while sending 2nd message");
})?;
let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await;
Ok::<_, anyhow::Error>(conn)
});

let peer_a = tokio::spawn(async move {
let peer_b_conn = peer_a.connect(peer_b_pub, peer_b_addr).await;
let mut conn = tokio::time::timeout(Duration::from_secs(60), peer_b_conn).await??;
let _ = tokio::time::timeout(Duration::from_secs(3), conn.recv()).await;
let mut conn = tokio::time::timeout(Duration::from_secs(2), peer_b_conn)
.await
.inspect_err(|_| tracing::error!("peer b timed out"))?
.inspect_err(|error| tracing::error!(%error, "error while connecting to peer b"))?;
tracing::info!("Connected peer a to peer b");

tokio::time::sleep(Duration::from_secs(5)).await;
// we should receive the message
let b = conn.recv().await.unwrap();
let b = conn.recv().await.inspect_err(|error| {
tracing::error!(%error, "error while receiving 1st message");
})?;
assert_eq!(&b[8..], b"some foo");

tokio::time::sleep(Duration::from_secs(10)).await;
// conn should not be broken
let b = conn.recv().await.unwrap();
let b = conn.recv().await.inspect_err(|error| {
tracing::error!(%error, "error while receiving 2nd message");
})?;
assert_eq!(&b[8..], b"some data");
Ok::<_, anyhow::Error>(())
let _ = conn.send("complete").await.inspect_err(|error| {
tracing::error!(%error, "error while sending 3rd message");
});
Ok::<_, anyhow::Error>(conn)
});

let (a, b) = tokio::try_join!(peer_a, peer_b)?;
a?;
b?;
let _ = a?;
let _ = b?;

Ok(())
}
Expand Down
14 changes: 7 additions & 7 deletions crates/core/src/transport/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@ impl PeerConnection {
// listen for incoming messages or receipts or wait until is time to do anything else again
let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_millis(10)));

// #[cfg(debug_assertions)]
// const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2);
// #[cfg(not(debug_assertions))]
#[cfg(debug_assertions)]
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2);
#[cfg(not(debug_assertions))]
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(20);
// #[cfg(debug_assertions)]
// const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(6);
// #[cfg(not(debug_assertions))]
#[cfg(debug_assertions)]
const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(6);
#[cfg(not(debug_assertions))]
const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(60);

let mut keep_alive = tokio::time::interval(KEEP_ALIVE_INTERVAL);
Expand Down Expand Up @@ -306,7 +306,7 @@ impl PeerConnection {
}
_ = resend_check.take().unwrap_or(tokio::time::sleep(Duration::from_millis(10))) => {
loop {
tracing::trace!(remote = ?self.remote_conn.remote_addr, "checking for resends");
// tracing::trace!(remote = ?self.remote_conn.remote_addr, "checking for resends");
let maybe_resend = self.remote_conn
.sent_tracker
.lock()
Expand Down

0 comments on commit bc57b41

Please sign in to comment.