Skip to content

Commit

Permalink
wip: fix transient connection to gw
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Sep 14, 2024
1 parent 73dc00d commit 4fc9239
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ impl HandshakeHandler {
Some(Ok(InternalEvent::RemoteConnectionAttempt { remote, gw_conn, remaining_checks, gw_peer_id, tx })) => {
tracing::debug!(at=?gw_conn.my_address(), gw=%gw_conn.remote_addr(), "Attempting remote connection to {remote}");
self.start_outbound_connection(remote.clone(), tx, false).await;
let gw_accepted = true; // FIXME: need to track this down properly
if remaining_checks > 0 {
self.ongoing_outbound_connections.push(
check_remaining_hops(tx, gw_peer_id, gw_conn, remaining_checks).boxed()
check_remaining_hops(tx, gw_peer_id, gw_conn, remaining_checks, gw_accepted).boxed()
);
continue;
} else {
Expand Down Expand Up @@ -661,6 +662,8 @@ async fn wait_for_gw_confirmation(
tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "Waiting for answer from gw");

let timeout_duration = Duration::from_secs(10);
let mut gw_accepted = false;
let mut remaining_checks = max_hops_to_live;
loop {
let msg = tokio::time::timeout(timeout_duration, conn.recv())
.await
Expand All @@ -677,12 +680,18 @@ async fn wait_for_gw_confirmation(

match deserialized {
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
msg: ConnectResponse::AcceptedBy { accepted, .. },
msg:
ConnectResponse::AcceptedBy {
accepted, acceptor, ..
},
..
})) => {
// the first message should always be a response to the initial connection request at the gateway
if accepted {
return Ok(InternalEvent::OutboundGwConnConfirmed(gw_peer_id, conn));
if accepted && acceptor.peer.addr == conn.remote_addr() {
// this is a message from the gw indicating they accept, but there could be an other remote address
// inbound connection,
gw_accepted = true;
} else {
remaining_checks -= 1;
}
break;
}
Expand All @@ -700,11 +709,10 @@ async fn wait_for_gw_confirmation(

// under this branch we just need to wait long enough for the gateway to reply with all the downstream
// connection attempts, and then we can drop the connection, so keep listening to it in a loop or timeout
let remaining_checks = max_hops_to_live;
let remote = conn.remote_addr();
Ok(tokio::time::timeout(
timeout_duration,
check_remaining_hops(tx, gw_peer_id.clone(), conn, remaining_checks),
check_remaining_hops(tx, gw_peer_id.clone(), conn, remaining_checks, gw_accepted),
)
.await
.map_err(|_| {
Expand All @@ -721,6 +729,7 @@ async fn check_remaining_hops(
gw_peer_id: PeerId,
mut conn: PeerConnection,
mut remaining_checks: usize,
gw_accepted: bool,
) -> OutboundConnResult {
while remaining_checks > 0 {
let msg = conn
Expand Down Expand Up @@ -753,7 +762,11 @@ async fn check_remaining_hops(
});
}
}
Ok(InternalEvent::OutboundGwConnRejected(gw_peer_id))
if gw_accepted {
Ok(InternalEvent::OutboundGwConnConfirmed(gw_peer_id, conn))
} else {
Ok(InternalEvent::OutboundGwConnRejected(gw_peer_id))
}
}

/// Handles communication with a potentially transient peer connection.
Expand Down

0 comments on commit 4fc9239

Please sign in to comment.