From 6e49211c306004b5877f3781526b2f0344863698 Mon Sep 17 00:00:00 2001 From: Hector Santos Date: Sat, 12 Oct 2024 16:55:32 +0200 Subject: [PATCH] Fix handshake tests (#1262) Co-authored-by: Ignacio Duart --- .../core/src/node/network_bridge/handshake.rs | 332 ++++++++++-------- crates/core/src/ring.rs | 61 +--- crates/core/src/ring/connection_manager.rs | 57 ++- crates/core/src/transport/peer_connection.rs | 2 +- 4 files changed, 243 insertions(+), 209 deletions(-) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index 5cb9ebd88..f8ab6eb5c 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -6,7 +6,7 @@ use std::{ sync::Arc, }; use tokio::time::{timeout, Duration}; -use tracing::instrument; +use tracing::{instrument, Instrument}; use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; use tokio::sync::mpsc::{self}; @@ -29,6 +29,8 @@ use crate::{ type Result = std::result::Result; type OutboundConnResult = Result; +const TIMEOUT: Duration = Duration::from_secs(60); + #[derive(Debug)] pub(super) struct ForwardInfo { pub target: PeerId, @@ -259,15 +261,17 @@ impl HandshakeHandler { Some(Ok(InternalEvent::RemoteConnectionAttempt { remote, tracker })) => { // this shouldn't happen as the tx would exit this module // see: OutboundGwConnConfirmed - debug_assert!(tracker.gw_accepted_processed && tracker.gw_accepted); + debug_assert!(!tracker.gw_accepted); tracing::debug!( at=?tracker.gw_conn.my_address(), gw=%tracker.gw_conn.remote_addr(), "Attempting remote connection to {remote}" ); self.start_outbound_connection(remote.clone(), tracker.tx, false).await; + let current_span = tracing::Span::current(); + let checking_hops_span = tracing::info_span!(parent: current_span, "checking_hops"); self.ongoing_outbound_connections.push( - check_remaining_hops(tracker).boxed() + check_remaining_hops(tracker).instrument(checking_hops_span).boxed() ); continue; } @@ -610,8 +614,14 @@ impl HandshakeHandler { .connect(remote.pub_key.clone(), remote.addr) .await .map(move |c| match c { - Ok(conn) if is_gw => Ok(InternalEvent::OutboundGwConnEstablished(remote, conn)), - Ok(conn) => Ok(InternalEvent::OutboundConnEstablished(remote, conn)), + Ok(conn) if is_gw => { + tracing::debug!(%remote, "established outbound gw connection"); + Ok(InternalEvent::OutboundGwConnEstablished(remote, conn)) + } + Ok(conn) => { + tracing::debug!(%remote, "established outbound connection"); + Ok(InternalEvent::OutboundConnEstablished(remote, conn)) + } Err(e) => Err((remote, e.into())), }) .boxed(); @@ -758,13 +768,11 @@ async fn wait_for_gw_confirmation( "Waiting for answer from gw" ); - let timeout_duration = Duration::from_secs(10); - // under this branch we just need to wait long enough for the gateway to reply with all the downstream // connection attempts, and then we can drop the connection, so keep listening to it in a loop or timeout let remote = tracker.gw_conn.remote_addr(); tokio::time::timeout( - timeout_duration, + TIMEOUT, check_remaining_hops(tracker), ) .await @@ -780,9 +788,14 @@ async fn wait_for_gw_confirmation( async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResult { let remote_addr = tracker.gw_conn.remote_addr(); let gw_peer_id = tracker.gw_peer.peer.clone(); + tracing::debug!( + at=?tracker.gw_conn.my_address(), + from=%tracker.gw_conn.remote_addr(), + "Checking for remaining hops, left: {}", tracker.remaining_checks + ); while tracker.remaining_checks > 0 { let msg = tokio::time::timeout( - Duration::from_secs(10), + TIMEOUT, tracker .gw_conn .recv() @@ -822,13 +835,16 @@ async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResul if accepted { return Ok(InternalEvent::OutboundGwConnConfirmed(tracker)); } else { + tracing::debug!("Rejected by gateway, waiting for forward replies"); return Ok(InternalEvent::NextCheck(tracker)); } - } else { + } else if accepted { return Ok(InternalEvent::RemoteConnectionAttempt { remote: acceptor.peer, tracker, }); + } else { + continue; } } NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { @@ -916,11 +932,9 @@ async fn gw_transient_peer_conn( mut info: ConnectivityInfo, ) -> Result<(InternalEvent, PeerOutboundMessage), HandshakeError> { // TODO: should be the same timeout as the one used for any other tx - let timeout_duration = Duration::from_secs(10); - loop { tokio::select! { - incoming_result = timeout(timeout_duration, conn.recv()) => { + incoming_result = timeout(TIMEOUT, conn.recv()) => { match incoming_result { Ok(Ok(msg)) => { let net_msg = decode_msg(&msg).unwrap(); @@ -947,7 +961,7 @@ async fn gw_transient_peer_conn( } } } - outbound_msg = timeout(timeout_duration, outbound.0.recv()) => { + outbound_msg = timeout(TIMEOUT, outbound.0.recv()) => { match outbound_msg { Ok(Some(msg)) => { if matches!( @@ -1101,10 +1115,16 @@ mod tests { /// This would happen when a new unsolicited connection is established with a gateway or /// when after initialising a connection with a peer via `outbound_recv`, a connection /// is successfully established. - async fn establish_inbound_conn(&mut self, addr: SocketAddr, pub_key: TransportPublicKey) { + async fn establish_inbound_conn( + &mut self, + addr: SocketAddr, + pub_key: TransportPublicKey, + hops_to_live: Option, + ) { let id = Transaction::new::(); let target_peer_id = PeerId::new(addr, pub_key.clone()); let target_peer = PeerKeyLocation::from(target_peer_id); + let hops_to_live = hops_to_live.unwrap_or(10); // let joiner_key = TransportKeypair::new(); // let pub_key = joiner_key.public().clone(); let initial_join_req = ConnectMsg::Request { @@ -1113,8 +1133,8 @@ mod tests { msg: ConnectRequest::StartJoinReq { joiner: None, joiner_key: pub_key, - hops_to_live: 10, - max_hops_to_live: 10, + hops_to_live, + max_hops_to_live: hops_to_live, skip_list: vec![], }, }; @@ -1181,7 +1201,10 @@ mod tests { node: NodeMock, } - fn config_handler(addr: impl Into) -> (HandshakeHandler, TestVerifier) { + fn config_handler( + addr: impl Into, + existing_connections: Option>, + ) -> (HandshakeHandler, TestVerifier) { let (outbound_sender, outbound_recv) = mpsc::channel(5); let outbound_conn_handler = OutboundConnectionHandler::new(outbound_sender); let (inbound_sender, inbound_recv) = mpsc::channel(5); @@ -1191,6 +1214,15 @@ mod tests { let mngr = ConnectionManager::default_with_key(keypair.public().clone()); mngr.try_set_peer_key(addr); let router = Router::new(&[]); + + if let Some(connections) = existing_connections { + for conn in connections { + let location = conn.get_location().location.unwrap(); + let peer_id = conn.get_location().peer.clone(); + mngr.add_connection(location, peer_id, false); + } + } + let (handler, establish_conn, _outbound_msg) = HandshakeHandler::new( inbound_conn_handler, outbound_conn_handler, @@ -1249,14 +1281,14 @@ mod tests { #[tokio::test] async fn test_gateway_inbound_conn_success() -> anyhow::Result<()> { let addr: SocketAddr = ([127, 0, 0, 1], 10000).into(); - let (mut handler, mut test) = config_handler(addr); + let (mut handler, mut test) = config_handler(addr, None); let remote_addr = ([127, 0, 0, 1], 10001).into(); let test_controller = async { let pub_key = TransportKeypair::new().public().clone(); test.transport.new_conn(remote_addr).await; test.transport - .establish_inbound_conn(remote_addr, pub_key) + .establish_inbound_conn(remote_addr, pub_key, None) .await; Ok::<_, anyhow::Error>(()) }; @@ -1279,18 +1311,31 @@ mod tests { #[tokio::test] async fn test_gateway_inbound_conn_rejected() -> anyhow::Result<()> { let addr: SocketAddr = ([127, 0, 0, 1], 10000).into(); - let (mut handler, mut test) = config_handler(addr); + let existing_remote_addr = ([127, 0, 0, 1], 10001).into(); + let remote_peer_loc = PeerKeyLocation { + peer: PeerId::new( + existing_remote_addr, + TransportKeypair::new().public().clone(), + ), + location: Some(Location::from_address(&existing_remote_addr)), + }; + let existing_conn = + Connection::new(remote_peer_loc.peer, remote_peer_loc.location.unwrap()); - // Configure the handler to reject connections by setting max_connections to 0 - handler.connection_manager.max_connections = 0; - handler.connection_manager.min_connections = 0; + let (mut handler, mut test) = config_handler(addr, Some(vec![existing_conn])); + + // Configure the handler to reject connections by setting max_connections to 1 + handler.connection_manager.max_connections = 1; + handler.connection_manager.min_connections = 1; + + let remote_addr = ([127, 0, 0, 1], 10002).into(); - let remote_addr = ([127, 0, 0, 1], 10001).into(); let test_controller = async { let pub_key = TransportKeypair::new().public().clone(); test.transport.new_conn(remote_addr).await; + // Put hops_to_live to 0 to avoid forwarding test.transport - .establish_inbound_conn(remote_addr, pub_key) + .establish_inbound_conn(remote_addr, pub_key, Some(0)) .await; let msg = test.transport.recv_outbound_msg().await?; tracing::debug!("Received outbound message: {:?}", msg); @@ -1305,7 +1350,7 @@ mod tests { let gw_inbound = async { let event = - tokio::time::timeout(Duration::from_secs(2), handler.wait_for_events()).await??; + tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events()).await??; match event { Event::InboundConnectionRejected { peer_id } => { assert_eq!(peer_id.addr, remote_addr); @@ -1322,7 +1367,7 @@ mod tests { #[tokio::test] async fn test_peer_to_gw_outbound_conn() -> anyhow::Result<()> { let addr = ([127, 0, 0, 1], 10000).into(); - let (mut handler, mut test) = config_handler(addr); + let (mut handler, mut test) = config_handler(addr, None); let joiner_key = TransportKeypair::new(); let pub_key = joiner_key.public().clone(); @@ -1340,14 +1385,10 @@ mod tests { let msg = match msg { NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { id: inbound_id, - msg: - ConnectRequest::StartJoinReq { - joiner, joiner_key, .. - }, + msg: ConnectRequest::StartJoinReq { joiner_key, .. }, .. })) => { assert_eq!(id, inbound_id); - assert!(joiner.is_none()); let sender = PeerKeyLocation { peer: PeerId::new(remote_addr, pub_key.clone()), location: Some(Location::from_address(&remote_addr)), @@ -1393,7 +1434,7 @@ mod tests { #[tokio::test] async fn test_peer_to_gw_outbound_conn_failed() -> anyhow::Result<()> { let addr = ([127, 0, 0, 1], 10000).into(); - let (mut handler, mut test) = config_handler(addr); + let (mut handler, mut test) = config_handler(addr, None); let joiner_key = TransportKeypair::new(); let pub_key = joiner_key.public().clone(); @@ -1439,7 +1480,7 @@ mod tests { let peer_addr: SocketAddr = ([127, 0, 0, 1], 10001).into(); let joiner_addr: SocketAddr = ([127, 0, 0, 1], 10002).into(); - let (mut gw_handler, mut gw_test) = config_handler(gw_addr); + let (mut gw_handler, mut gw_test) = config_handler(gw_addr, None); // the gw only will accept one connection gw_handler.connection_manager.max_connections = 1; @@ -1458,7 +1499,7 @@ mod tests { gw_test.transport.new_conn(peer_addr).await; gw_test .transport - .establish_inbound_conn(peer_addr, peer_pub_key.clone()) + .establish_inbound_conn(peer_addr, peer_pub_key.clone(), None) .await; // the joiner attempts to connect to the gw, but since it's out of connections @@ -1466,7 +1507,7 @@ mod tests { gw_test.transport.new_conn(joiner_addr).await; gw_test .transport - .establish_inbound_conn(joiner_addr, joiner_pub_key) + .establish_inbound_conn(joiner_addr, joiner_pub_key, None) .await; // TODO: maybe simulate forwarding back all expected responses @@ -1478,7 +1519,7 @@ mod tests { let mut third_party = None; loop { let event = - tokio::time::timeout(Duration::from_secs(5), gw_handler.wait_for_events()) + tokio::time::timeout(Duration::from_secs(1), gw_handler.wait_for_events()) .await??; match event { Event::InboundConnection { @@ -1490,12 +1531,11 @@ mod tests { assert_eq!(third_party_peer.pub_key, peer_pub_key); assert_eq!(first_peer_conn.remote_addr(), peer_addr); third_party = Some(third_party_peer); - gw_handler - .connection_manager - .add_connection(Connection::new( - peer_peer_id.clone(), - Location::from_address(&peer_addr), - )); + gw_handler.connection_manager.add_connection( + Location::from_address(&peer_addr), + peer_peer_id.clone(), + false, + ); } Event::TransientForwardTransaction { target, @@ -1531,9 +1571,9 @@ mod tests { #[tokio::test] async fn test_peer_to_gw_outbound_conn_rejected() -> anyhow::Result<()> { - // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::DEBUG)); + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); let joiner_addr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(joiner_addr); + let (mut handler, mut test) = config_handler(joiner_addr, None); let gw_key = TransportKeypair::new(); let gw_pub_key = gw_key.public().clone(); @@ -1589,15 +1629,16 @@ mod tests { NetMessage::V1(NetMessageV1::Connect(initial_join_req)), ) .await; + tracing::debug!("Sent initial gw rejected reply"); - for i in 0..Ring::DEFAULT_MAX_HOPS_TO_LIVE { + for i in 1..Ring::DEFAULT_MAX_HOPS_TO_LIVE { let port = i + 10; let addr = ([127, 0, port as u8, 1], port as u16).into(); let acceptor = PeerKeyLocation { location: Some(Location::from_address(&addr)), peer: PeerId::new(addr, TransportKeypair::new().public().clone()), }; - tracing::info!(%acceptor, "Sending forward reply"); + tracing::info!(%acceptor, "Sending forward reply number {i} with status `{}`", i > 3); let forward_response = ConnectMsg::Response { id: tx, sender: gw_pkloc.clone(), @@ -1614,34 +1655,35 @@ mod tests { NetMessage::V1(NetMessageV1::Connect(forward_response)), ) .await; - } - for _ in 0..5 { - let (remote, ev) = tokio::time::timeout( - Duration::from_secs(1), - test.transport.outbound_recv.recv(), - ) - .await? - .ok_or(anyhow!("Failed to receive event"))?; - let ConnectionEvent::ConnectionStart { - open_connection, .. - } = ev; - let out_symm_key = Aes128Gcm::new_from_slice(&[0; 16]).unwrap(); - let in_symm_key = Aes128Gcm::new_from_slice(&[1; 16]).unwrap(); - let (conn, out, inb) = PeerConnection::new_remote_test( - remote, - joiner_addr, - out_symm_key, - in_symm_key.clone(), - ); - test.transport - .packet_senders - .insert(remote, (in_symm_key, out)); - test.transport.packet_receivers.push(inb); - tracing::info!("Received open conn to {}", remote); - open_connection - .send(Ok(conn)) - .map_err(|_| anyhow!("failed to open conn"))?; + if i > 3 { + // Create the successful connection + let (remote, ev) = tokio::time::timeout( + Duration::from_secs(1), + test.transport.outbound_recv.recv(), + ) + .await? + .ok_or(anyhow!("Failed to receive event"))?; + let ConnectionEvent::ConnectionStart { + open_connection, .. + } = ev; + let out_symm_key = Aes128Gcm::new_from_slice(&[0; 16]).unwrap(); + let in_symm_key = Aes128Gcm::new_from_slice(&[1; 16]).unwrap(); + let (conn, out, inb) = PeerConnection::new_remote_test( + remote, + joiner_addr, + out_symm_key, + in_symm_key.clone(), + ); + test.transport + .packet_senders + .insert(remote, (in_symm_key, out)); + test.transport.packet_receivers.push(inb); + open_connection + .send(Ok(conn)) + .map_err(|_| anyhow!("failed to open conn"))?; + tracing::info!(conn_num = %i, %remote, "Forward response sent"); + } } Ok::<_, anyhow::Error>(()) @@ -1649,33 +1691,30 @@ mod tests { let peer_inbound = async { let mut conn_count = 0; - for _ in 0..5 { - let event = tokio::time::timeout(Duration::from_secs(5), handler.wait_for_events()) + let mut gw_rejected = false; + for conn_num in 3..Ring::DEFAULT_MAX_HOPS_TO_LIVE { + let event = tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events()) .await??; match event { - // Event::OutboundGatewayConnectionRejected { peer_id } => { - // tracing::info!(%peer_id, "Connection rejected"); - // } Event::OutboundConnectionSuccessful { peer_id, connection, } => { - tracing::info!(%peer_id, "Connection established"); + tracing::info!(%peer_id, %conn_num, "Connection established"); conn_count += 1; drop(connection); } + Event::OutboundGatewayConnectionRejected { peer_id } => { + tracing::info!(%peer_id, "Gateway connection rejected"); + assert_eq!(peer_id.addr, gw_addr); + gw_rejected = true; + } other => bail!("Unexpected event: {:?}", other), } } - let event = - tokio::time::timeout(Duration::from_secs(5), handler.wait_for_events()).await??; - match event { - Event::OutboundGatewayConnectionRejected { peer_id } => { - tracing::info!(%peer_id, "Connection rejected"); - } - _ => panic!("Unexpected event: {:?}", event), - } - assert_eq!(conn_count, 5); + tracing::debug!("Completed all checks, connection count: {conn_count}"); + assert!(gw_rejected); + assert_eq!(conn_count, 6); Ok(()) }; futures::try_join!(test_controller, peer_inbound)?; @@ -1684,88 +1723,85 @@ mod tests { #[tokio::test] async fn test_peer_to_gw_outbound_conn_forwarded() -> anyhow::Result<()> { + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::DEBUG), None); let joiner_addr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(joiner_addr); + let (mut handler, mut test) = config_handler(joiner_addr, None); let gw_key = TransportKeypair::new(); let gw_pub_key = gw_key.public().clone(); let gw_addr = ([127, 0, 0, 1], 10000).into(); let gw_peer_id = PeerId::new(gw_addr, gw_pub_key.clone()); + let gw_pkloc = PeerKeyLocation { + location: Some(Location::from_address(&gw_peer_id.addr)), + peer: gw_peer_id.clone(), + }; let joiner_key = TransportKeypair::new(); let joiner_pub_key = joiner_key.public().clone(); let joiner_peer_id = PeerId::new(joiner_addr, joiner_pub_key.clone()); - - let peer_key = TransportKeypair::new(); - let peer_pub_key = peer_key.public().clone(); - let peer_addr = ([127, 0, 0, 2], 10002).into(); - let peer_peer_id = PeerId::new(peer_addr, peer_pub_key.clone()); - - handler.connection_manager.max_connections = 1; - handler.connection_manager.min_connections = 1; + let joiner_pkloc = PeerKeyLocation { + peer: joiner_peer_id.clone(), + location: Some(Location::from_address(&joiner_peer_id.addr)), + }; let tx = Transaction::new::(); let test_controller = async { let open_connection_peer = - start_conn(&mut test, peer_addr, peer_pub_key.clone(), tx, false).await; + start_conn(&mut test, gw_addr, gw_pub_key.clone(), tx, true).await; test.transport - .new_outbound_conn(peer_addr, open_connection_peer) + .new_outbound_conn(gw_addr, open_connection_peer) .await; - test.transport.new_conn(joiner_addr).await; + let msg = test.transport.recv_outbound_msg().await?; + tracing::info!("Received connec request: {:?}", msg); + let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { + id, + msg: ConnectRequest::StartJoinReq { .. }, + .. + })) = msg + else { + panic!("unexpected message"); + }; + assert_eq!(id, tx); + + let initial_join_req = ConnectMsg::Response { + id: tx, + sender: gw_pkloc.clone(), + target: joiner_pkloc.clone(), + msg: ConnectResponse::AcceptedBy { + accepted: true, + acceptor: gw_pkloc.clone(), + joiner: joiner_peer_id.clone(), + }, + }; test.transport - .establish_inbound_conn(joiner_addr, joiner_pub_key) + .inbound_msg( + gw_addr, + NetMessage::V1(NetMessageV1::Connect(initial_join_req)), + ) .await; - + tracing::debug!("Sent initial gw rejected reply"); Ok::<_, anyhow::Error>(()) }; let peer_inbound = async { - let mut received_outbound_successful = false; - let mut received_forward_transaction = false; - - while !received_outbound_successful || !received_forward_transaction { - let event = tokio::time::timeout(Duration::from_secs(5), handler.wait_for_events()) - .await??; + let mut conn_count = 0; - match event { - Event::OutboundConnectionSuccessful { peer_id, .. } => { - assert_eq!(peer_id.addr, peer_addr); - tracing::info!("Outbound connection to peer successful: {:?}", peer_id); - received_outbound_successful = true; - } - Event::TransientForwardTransaction { - target, - tx, - forward_to, - msg, - } => { - assert_eq!(target, peer_addr); - assert_eq!(tx, tx); - assert_eq!(forward_to, peer_peer_id); - if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { - msg: ConnectRequest::CheckConnectivity { sender, joiner, .. }, - .. - })) = &*msg - { - assert_eq!(sender.peer, gw_peer_id); - assert_eq!(joiner.peer, joiner_peer_id); - } else { - panic!("Unexpected message type"); - } - received_forward_transaction = true; - } - Event::InboundConnection { conn, .. } => { - tracing::info!( - "Inbound connection request received: {:?}", - conn.remote_addr() - ); - } - other => bail!("Unexpected event: {:?}", other), + let event = + tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events()).await??; + let _conn = match event { + Event::OutboundGatewayConnectionSuccessful { + peer_id, + connection, + .. + } => { + tracing::info!(%peer_id, "Gateway connection accepted"); + assert_eq!(peer_id.addr, gw_addr); + connection } - } - + other => bail!("Unexpected event: {:?}", other), + }; Ok(()) }; @@ -1776,7 +1812,7 @@ mod tests { #[tokio::test] async fn test_peer_to_peer_outbound_conn_failed() -> anyhow::Result<()> { let addr: SocketAddr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(addr); + let (mut handler, mut test) = config_handler(addr, None); let peer_key = TransportKeypair::new(); let peer_pub_key = peer_key.public().clone(); @@ -1821,7 +1857,7 @@ mod tests { #[tokio::test] async fn test_peer_to_peer_outbound_conn_succeeded() -> anyhow::Result<()> { let addr: SocketAddr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(addr); + let (mut handler, mut test) = config_handler(addr, None); let peer_key = TransportKeypair::new(); let peer_pub_key = peer_key.public().clone(); diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index ddfcb26c3..b4e62e057 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -107,6 +107,10 @@ impl Connection { open_at: Instant::now(), } } + + pub fn get_location(&self) -> &PeerKeyLocation { + &self.location + } } #[derive(Clone)] @@ -312,9 +316,7 @@ impl Ring { } pub fn open_connections(&self) -> usize { - self.connection_manager - .open_connections - .load(std::sync::atomic::Ordering::SeqCst) + self.connection_manager.get_open_connections() } async fn refresh_router(router: Arc>, register: ER) { @@ -420,54 +422,16 @@ impl Ring { pub async fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { tracing::info!(%peer, this = ?self.connection_manager.get_peer_key(), %was_reserved, "Adding connection to peer"); - debug_assert!( - self.connection_manager - .get_peer_key() - .expect("should be set") - != peer - ); - if was_reserved { - let old = self - .connection_manager - .reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - #[cfg(debug_assertions)] - { - tracing::debug!(old, "Decremented reserved connections"); - if old == 0 { - panic!("Underflow of reserved connections"); - } - } - let _ = old; - } self.connection_manager - .open_connections - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + .add_connection(loc, peer.clone(), was_reserved); self.event_register - .register_events(Either::Left(NetEventLog::connected( - self, - peer.clone(), - loc, - ))) + .register_events(Either::Left(NetEventLog::connected(self, peer, loc))) .await; - let mut cbl = self.connection_manager.connections_by_location.write(); - cbl.entry(loc).or_default().push(Connection { - location: PeerKeyLocation { - peer: peer.clone(), - location: Some(loc), - }, - open_at: Instant::now(), - }); - self.connection_manager - .location_for_peer - .write() - .insert(peer.clone(), loc); - std::mem::drop(cbl); self.refresh_density_request_cache() } fn refresh_density_request_cache(&self) { - let cbl = self.connection_manager.connections_by_location.read(); + let cbl = self.connection_manager.get_connections_by_location(); let topology_manager = &mut self.connection_manager.topology_manager.write(); let _ = topology_manager.refresh_cache(&cbl); } @@ -573,8 +537,7 @@ impl Ring { ) -> Option { use rand::seq::SliceRandom; self.connection_manager - .connections_by_location - .read() + .get_connections_by_location() .iter() .sorted_by(|(loc_a, _), (loc_b, _)| { loc_a.distance(location).cmp(&loc_b.distance(location)) @@ -670,7 +633,7 @@ impl Ring { } let neighbor_locations = { - let peers = self.connection_manager.connections_by_location.read(); + let peers = self.connection_manager.get_connections_by_location(); peers .iter() .map(|(loc, conns)| { @@ -784,7 +747,7 @@ impl Ring { pub struct Location(f64); impl Location { - #[cfg(not(feature = "local-simulation"))] + #[cfg(all(not(feature = "local-simulation"), not(test)))] pub fn from_address(addr: &SocketAddr) -> Self { match addr.ip() { std::net::IpAddr::V4(ipv4) => { @@ -804,7 +767,7 @@ impl Location { } } - #[cfg(feature = "local-simulation")] + #[cfg(any(feature = "local-simulation", test))] pub fn from_address(_addr: &SocketAddr) -> Self { let random_component: f64 = rand::random(); Location(random_component) diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index ea8028416..9e315da61 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -6,15 +6,15 @@ use super::*; #[derive(Clone)] pub(crate) struct ConnectionManager { - pub(super) open_connections: Arc, - pub(super) reserved_connections: Arc, + open_connections: Arc, + reserved_connections: Arc, pub(super) location_for_peer: Arc>>, pub(super) topology_manager: Arc>, - pub(super) connections_by_location: Arc>>>, + connections_by_location: Arc>>>, /// Interim connections ongoing handshake or successfully open connections /// Is important to keep track of this so no more connections are accepted prematurely. - pub(super) own_location: Arc, - pub(super) peer_key: Arc>>, + own_location: Arc, + peer_key: Arc>>, pub min_connections: usize, pub max_connections: usize, pub rnd_if_htl_above: usize, @@ -40,12 +40,6 @@ impl ConnectionManager { None, ) } - - pub fn add_connection(&self, conn: Connection) { - let loc = conn.location.location.unwrap_or_else(Location::random); - let mut conns = self.connections_by_location.write(); - conns.entry(loc).or_default().push(conn); - } } impl ConnectionManager { @@ -148,6 +142,9 @@ impl ConnectionManager { if open == 0 { // if this is the first connection, then accept it + self.location_for_peer + .write() + .insert(peer_id.clone(), location); return true; } @@ -247,6 +244,35 @@ impl ConnectionManager { self.prune_connection(peer, false) } + pub fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { + debug_assert!(self.get_peer_key().expect("should be set") != peer); + if was_reserved { + let old = self + .reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + #[cfg(debug_assertions)] + { + tracing::debug!(old, "Decremented reserved connections"); + if old == 0 { + panic!("Underflow of reserved connections"); + } + } + let _ = old; + } + self.open_connections + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let mut cbl = self.connections_by_location.write(); + cbl.entry(loc).or_default().push(Connection { + location: PeerKeyLocation { + peer: peer.clone(), + location: Some(loc), + }, + open_at: Instant::now(), + }); + self.location_for_peer.write().insert(peer.clone(), loc); + std::mem::drop(cbl); + } + fn prune_connection(&self, peer: &PeerId, is_alive: bool) -> Option { let connection_type = if is_alive { "active" } else { "in transit" }; tracing::debug!(%peer, "Pruning {} connection", connection_type); @@ -280,6 +306,15 @@ impl ConnectionManager { Some(loc) } + pub(super) fn get_open_connections(&self) -> usize { + self.open_connections + .load(std::sync::atomic::Ordering::SeqCst) + } + + pub(super) fn get_connections_by_location(&self) -> BTreeMap> { + self.connections_by_location.read().clone() + } + /// Get a random peer from the known ring connections. pub fn random_peer(&self, filter_fn: F) -> Option where diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 2665a3504..4678affea 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -273,7 +273,7 @@ impl PeerConnection { tracing::error!(%error, %packet_id, remote = %self.remote_conn.remote_addr, "error processing inbound packet"); error })? { - tracing::debug!(%packet_id, "returning full stream message"); + tracing::trace!(%packet_id, "returning full stream message"); return Ok(msg); } }