diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b43d1c62..6228957f9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,6 +38,7 @@ jobs: targets: wasm32-unknown-unknown - uses: Swatinem/rust-cache@v2 + if: always() - name: Install stdlib packages working-directory: stdlib/typescript diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index 57e111aff..4fa950f95 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -29,7 +29,7 @@ use crate::{ type Result = std::result::Result; type OutboundConnResult = Result; -const TIMEOUT: Duration = Duration::from_secs(60); +const TIMEOUT: Duration = Duration::from_secs(10); #[derive(Debug)] pub(super) struct ForwardInfo { @@ -114,13 +114,25 @@ impl OutboundMessage { } } +pub(super) enum ExternConnection { + Establish { + peer: PeerId, + tx: Transaction, + is_gw: bool, + }, +} + /// Use for starting a new outboound connection to a peer. -pub(super) struct EstablishConnection(pub(crate) mpsc::Sender<(PeerId, Transaction, bool)>); +pub(super) struct EstablishConnection(pub(crate) mpsc::Sender); impl EstablishConnection { pub async fn establish_conn(&self, remote: PeerId, tx: Transaction, is_gw: bool) -> Result<()> { self.0 - .send((remote, tx, is_gw)) + .send(ExternConnection::Establish { + peer: remote, + tx, + is_gw, + }) .await .map_err(|_| HandshakeError::ChannelClosed)?; Ok(()) @@ -129,7 +141,7 @@ impl EstablishConnection { type OutboundMessageSender = mpsc::Sender; type OutboundMessageReceiver = mpsc::Receiver<(SocketAddr, NetMessage)>; -type EstablishConnectionReceiver = mpsc::Receiver<(PeerId, Transaction, bool)>; +type EstablishConnectionReceiver = mpsc::Receiver; /// Manages the handshake process for establishing connections with peers. /// Handles both inbound and outbound connection attempts, and manages @@ -449,10 +461,12 @@ impl HandshakeHandler { } // Handle requests to establish new connections establish_connection = self.establish_connection_rx.recv() => { - let Some((peer_id, tx, is_gw)) = establish_connection else { - return Err(HandshakeError::ChannelClosed); - }; - self.start_outbound_connection(peer_id, tx, is_gw).await; + match establish_connection { + Some(ExternConnection::Establish { peer, tx, is_gw }) => { + self.start_outbound_connection(peer, tx, is_gw).await; + } + None => return Err(HandshakeError::ChannelClosed), + } } } } @@ -1155,7 +1169,9 @@ mod tests { vec![], ) .unwrap(); + tracing::trace!(at=?self.my_addr, to=%addr, "Sending message to peer"); packet_sender.send(sym_msg.into_unknown()).await.unwrap(); + tracing::trace!(at=?self.my_addr, from=%addr, "Message sent"); self.packet_id += 1; } @@ -1205,9 +1221,9 @@ mod tests { addr: impl Into, existing_connections: Option>, ) -> (HandshakeHandler, TestVerifier) { - let (outbound_sender, outbound_recv) = mpsc::channel(5); + let (outbound_sender, outbound_recv) = mpsc::channel(100); let outbound_conn_handler = OutboundConnectionHandler::new(outbound_sender); - let (inbound_sender, inbound_recv) = mpsc::channel(5); + let (inbound_sender, inbound_recv) = mpsc::channel(100); let inbound_conn_handler = InboundConnectionHandler::new(inbound_recv); let addr = addr.into(); let keypair = TransportKeypair::new(); @@ -1569,7 +1585,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_peer_to_gw_outbound_conn_rejected() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); let joiner_addr = ([127, 0, 0, 1], 10001).into(); @@ -1645,44 +1661,58 @@ mod tests { target: joiner_pkloc.clone(), msg: ConnectResponse::AcceptedBy { accepted: i > 3, - acceptor, + acceptor: acceptor.clone(), joiner: joiner_peer_id.clone(), }, }; test.transport .inbound_msg( gw_addr, - NetMessage::V1(NetMessageV1::Connect(forward_response)), + NetMessage::V1(NetMessageV1::Connect(forward_response.clone())), ) .await; if i > 3 { // Create the successful connection - let (remote, ev) = tokio::time::timeout( - Duration::from_secs(2), - test.transport.outbound_recv.recv(), - ) - .await? - .ok_or(anyhow!("Failed to receive event"))?; - let ConnectionEvent::ConnectionStart { - open_connection, .. - } = ev; - let out_symm_key = Aes128Gcm::new_from_slice(&[0; 16]).unwrap(); - let in_symm_key = Aes128Gcm::new_from_slice(&[1; 16]).unwrap(); - let (conn, out, inb) = PeerConnection::new_remote_test( - remote, - joiner_addr, - out_symm_key, - in_symm_key.clone(), - ); - test.transport - .packet_senders - .insert(remote, (in_symm_key, out)); - test.transport.packet_receivers.push(inb); - open_connection - .send(Ok(conn)) - .map_err(|_| anyhow!("failed to open conn"))?; - tracing::info!(conn_num = %i, %remote, "Forward response sent"); + async fn establish_conn( + test: &mut TestVerifier, + i: usize, + joiner_addr: SocketAddr, + ) -> Result<(), anyhow::Error> { + let (remote, ev) = tokio::time::timeout( + Duration::from_secs(10), + test.transport.outbound_recv.recv(), + ) + .await + .inspect_err(|error| { + tracing::error!(%error, conn_num = %i, "failed while receiving connection events"); + }) + .map_err(|_| anyhow!("time out"))? + .ok_or( anyhow!("Failed to receive event"))?; + let ConnectionEvent::ConnectionStart { + open_connection, .. + } = ev; + let out_symm_key = Aes128Gcm::new_from_slice(&[0; 16]).unwrap(); + let in_symm_key = Aes128Gcm::new_from_slice(&[1; 16]).unwrap(); + let (conn, out, inb) = PeerConnection::new_remote_test( + remote, + joiner_addr, + out_symm_key, + in_symm_key.clone(), + ); + test.transport + .packet_senders + .insert(remote, (in_symm_key, out)); + test.transport.packet_receivers.push(inb); + tracing::info!(conn_num = %i, %remote, "Connection established at remote"); + open_connection + .send(Ok(conn)) + .map_err(|_| anyhow!("failed to open conn"))?; + tracing::info!(conn_num = %i, "Returned open conn"); + Ok(()) + } + + establish_conn(&mut test, i, joiner_addr).await?; } } @@ -1693,16 +1723,20 @@ mod tests { let mut conn_count = 0; let mut gw_rejected = false; for conn_num in 3..Ring::DEFAULT_MAX_HOPS_TO_LIVE { - let event = tokio::time::timeout(Duration::from_secs(2), handler.wait_for_events()) - .await??; + let conn_num = conn_num + 2; + let event = + tokio::time::timeout(Duration::from_secs(60), handler.wait_for_events()) + .await + .inspect_err(|_| { + tracing::error!(%conn_num, "failed while waiting for events"); + })? + .inspect_err(|error| { + tracing::error!(%error, %conn_num, "failed while receiving events"); + })?; match event { - Event::OutboundConnectionSuccessful { - peer_id, - connection, - } => { - tracing::info!(%peer_id, %conn_num, "Connection established"); + Event::OutboundConnectionSuccessful { peer_id, .. } => { + tracing::info!(%peer_id, %conn_num, "Connection established at peer"); conn_count += 1; - drop(connection); } Event::OutboundGatewayConnectionRejected { peer_id } => { tracing::info!(%peer_id, "Gateway connection rejected");