Skip to content

Commit

Permalink
fix(tests): test_peer_to_gw_outbound_conn_rejected flakiness (#1275)
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez authored Oct 20, 2024
1 parent 37af57a commit 1b1eb69
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 46 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
targets: wasm32-unknown-unknown

- uses: Swatinem/rust-cache@v2
if: always()

- name: Install stdlib packages
working-directory: stdlib/typescript
Expand Down
126 changes: 80 additions & 46 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
type Result<T, E = HandshakeError> = std::result::Result<T, E>;
type OutboundConnResult = Result<InternalEvent, (PeerId, HandshakeError)>;

const TIMEOUT: Duration = Duration::from_secs(60);
const TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Debug)]
pub(super) struct ForwardInfo {
Expand Down Expand Up @@ -114,13 +114,25 @@ impl OutboundMessage {
}
}

pub(super) enum ExternConnection {
Establish {
peer: PeerId,
tx: Transaction,
is_gw: bool,
},
}

/// Use for starting a new outboound connection to a peer.
pub(super) struct EstablishConnection(pub(crate) mpsc::Sender<(PeerId, Transaction, bool)>);
pub(super) struct EstablishConnection(pub(crate) mpsc::Sender<ExternConnection>);

impl EstablishConnection {
pub async fn establish_conn(&self, remote: PeerId, tx: Transaction, is_gw: bool) -> Result<()> {
self.0
.send((remote, tx, is_gw))
.send(ExternConnection::Establish {
peer: remote,
tx,
is_gw,
})
.await
.map_err(|_| HandshakeError::ChannelClosed)?;
Ok(())
Expand All @@ -129,7 +141,7 @@ impl EstablishConnection {

type OutboundMessageSender = mpsc::Sender<NetMessage>;
type OutboundMessageReceiver = mpsc::Receiver<(SocketAddr, NetMessage)>;
type EstablishConnectionReceiver = mpsc::Receiver<(PeerId, Transaction, bool)>;
type EstablishConnectionReceiver = mpsc::Receiver<ExternConnection>;

/// Manages the handshake process for establishing connections with peers.
/// Handles both inbound and outbound connection attempts, and manages
Expand Down Expand Up @@ -449,10 +461,12 @@ impl HandshakeHandler {
}
// Handle requests to establish new connections
establish_connection = self.establish_connection_rx.recv() => {
let Some((peer_id, tx, is_gw)) = establish_connection else {
return Err(HandshakeError::ChannelClosed);
};
self.start_outbound_connection(peer_id, tx, is_gw).await;
match establish_connection {
Some(ExternConnection::Establish { peer, tx, is_gw }) => {
self.start_outbound_connection(peer, tx, is_gw).await;
}
None => return Err(HandshakeError::ChannelClosed),
}
}
}
}
Expand Down Expand Up @@ -1155,7 +1169,9 @@ mod tests {
vec![],
)
.unwrap();
tracing::trace!(at=?self.my_addr, to=%addr, "Sending message to peer");
packet_sender.send(sym_msg.into_unknown()).await.unwrap();
tracing::trace!(at=?self.my_addr, from=%addr, "Message sent");
self.packet_id += 1;
}

Expand Down Expand Up @@ -1205,9 +1221,9 @@ mod tests {
addr: impl Into<SocketAddr>,
existing_connections: Option<Vec<Connection>>,
) -> (HandshakeHandler, TestVerifier) {
let (outbound_sender, outbound_recv) = mpsc::channel(5);
let (outbound_sender, outbound_recv) = mpsc::channel(100);
let outbound_conn_handler = OutboundConnectionHandler::new(outbound_sender);
let (inbound_sender, inbound_recv) = mpsc::channel(5);
let (inbound_sender, inbound_recv) = mpsc::channel(100);
let inbound_conn_handler = InboundConnectionHandler::new(inbound_recv);
let addr = addr.into();
let keypair = TransportKeypair::new();
Expand Down Expand Up @@ -1569,7 +1585,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_peer_to_gw_outbound_conn_rejected() -> anyhow::Result<()> {
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None);
let joiner_addr = ([127, 0, 0, 1], 10001).into();
Expand Down Expand Up @@ -1645,44 +1661,58 @@ mod tests {
target: joiner_pkloc.clone(),
msg: ConnectResponse::AcceptedBy {
accepted: i > 3,
acceptor,
acceptor: acceptor.clone(),
joiner: joiner_peer_id.clone(),
},
};
test.transport
.inbound_msg(
gw_addr,
NetMessage::V1(NetMessageV1::Connect(forward_response)),
NetMessage::V1(NetMessageV1::Connect(forward_response.clone())),
)
.await;

if i > 3 {
// Create the successful connection
let (remote, ev) = tokio::time::timeout(
Duration::from_secs(2),
test.transport.outbound_recv.recv(),
)
.await?
.ok_or(anyhow!("Failed to receive event"))?;
let ConnectionEvent::ConnectionStart {
open_connection, ..
} = ev;
let out_symm_key = Aes128Gcm::new_from_slice(&[0; 16]).unwrap();
let in_symm_key = Aes128Gcm::new_from_slice(&[1; 16]).unwrap();
let (conn, out, inb) = PeerConnection::new_remote_test(
remote,
joiner_addr,
out_symm_key,
in_symm_key.clone(),
);
test.transport
.packet_senders
.insert(remote, (in_symm_key, out));
test.transport.packet_receivers.push(inb);
open_connection
.send(Ok(conn))
.map_err(|_| anyhow!("failed to open conn"))?;
tracing::info!(conn_num = %i, %remote, "Forward response sent");
async fn establish_conn(
test: &mut TestVerifier,
i: usize,
joiner_addr: SocketAddr,
) -> Result<(), anyhow::Error> {
let (remote, ev) = tokio::time::timeout(
Duration::from_secs(10),
test.transport.outbound_recv.recv(),
)
.await
.inspect_err(|error| {
tracing::error!(%error, conn_num = %i, "failed while receiving connection events");
})
.map_err(|_| anyhow!("time out"))?
.ok_or( anyhow!("Failed to receive event"))?;
let ConnectionEvent::ConnectionStart {
open_connection, ..
} = ev;
let out_symm_key = Aes128Gcm::new_from_slice(&[0; 16]).unwrap();
let in_symm_key = Aes128Gcm::new_from_slice(&[1; 16]).unwrap();
let (conn, out, inb) = PeerConnection::new_remote_test(
remote,
joiner_addr,
out_symm_key,
in_symm_key.clone(),
);
test.transport
.packet_senders
.insert(remote, (in_symm_key, out));
test.transport.packet_receivers.push(inb);
tracing::info!(conn_num = %i, %remote, "Connection established at remote");
open_connection
.send(Ok(conn))
.map_err(|_| anyhow!("failed to open conn"))?;
tracing::info!(conn_num = %i, "Returned open conn");
Ok(())
}

establish_conn(&mut test, i, joiner_addr).await?;
}
}

Expand All @@ -1693,16 +1723,20 @@ mod tests {
let mut conn_count = 0;
let mut gw_rejected = false;
for conn_num in 3..Ring::DEFAULT_MAX_HOPS_TO_LIVE {
let event = tokio::time::timeout(Duration::from_secs(2), handler.wait_for_events())
.await??;
let conn_num = conn_num + 2;
let event =
tokio::time::timeout(Duration::from_secs(60), handler.wait_for_events())
.await
.inspect_err(|_| {
tracing::error!(%conn_num, "failed while waiting for events");
})?
.inspect_err(|error| {
tracing::error!(%error, %conn_num, "failed while receiving events");
})?;
match event {
Event::OutboundConnectionSuccessful {
peer_id,
connection,
} => {
tracing::info!(%peer_id, %conn_num, "Connection established");
Event::OutboundConnectionSuccessful { peer_id, .. } => {
tracing::info!(%peer_id, %conn_num, "Connection established at peer");
conn_count += 1;
drop(connection);
}
Event::OutboundGatewayConnectionRejected { peer_id } => {
tracing::info!(%peer_id, "Gateway connection rejected");
Expand Down

0 comments on commit 1b1eb69

Please sign in to comment.