From 71cf889e7cbc354a3c35b11689eb1de2eb0e6752 Mon Sep 17 00:00:00 2001 From: Hector Santos Date: Mon, 5 Aug 2024 00:13:01 +0200 Subject: [PATCH] Transport and hanshake handler connection fixes --- crates/core/Cargo.toml | 1 + crates/core/src/client_events.rs | 36 +- crates/core/src/lib.rs | 2 +- crates/core/src/message.rs | 2 +- crates/core/src/node.rs | 20 +- crates/core/src/node/network_bridge.rs | 5 +- .../core/src/node/network_bridge/handshake.rs | 391 ++++++++++++------ .../src/node/network_bridge/p2p_protoc.rs | 187 ++++++--- crates/core/src/node/testing_impl.rs | 309 +++----------- .../core/src/node/testing_impl/in_memory.rs | 22 +- crates/core/src/node/testing_impl/network.rs | 15 +- crates/core/src/operations/connect.rs | 277 +++++++------ crates/core/src/operations/get.rs | 167 -------- crates/core/src/operations/put.rs | 93 ----- crates/core/src/operations/subscribe.rs | 66 --- crates/core/src/ring.rs | 62 ++- crates/core/src/ring/connection_manager.rs | 93 +++-- crates/core/src/tracing.rs | 113 +---- crates/core/src/transport.rs | 3 +- .../core/src/transport/connection_handler.rs | 125 +++--- crates/core/src/transport/crypto.rs | 32 +- crates/core/src/transport/peer_connection.rs | 28 +- crates/fdev/Cargo.toml | 2 +- crates/fdev/src/testing.rs | 35 ++ crates/fdev/src/testing/network.rs | 36 +- 25 files changed, 928 insertions(+), 1194 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 339fb2f23..8644dacfd 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -92,6 +92,7 @@ tracing = "0.1" [features] default = ["redb", "trace", "websocket"] local-mode = [] +local-simulation = [] network-mode = [] sqlite = ["sqlx"] trace = ["tracing-subscriber"] diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index 2e36f35aa..dfb9757eb 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -170,13 +170,13 @@ pub(crate) mod test { use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; - use crate::node::{testing_impl::EventId, PeerId}; + use crate::{node::testing_impl::EventId, transport::TransportPublicKey}; use super::*; pub struct MemoryEventsGen { - id: PeerId, - signal: Receiver<(EventId, PeerId)>, + key: TransportPublicKey, + signal: Receiver<(EventId, TransportPublicKey)>, events_to_gen: HashMap>, rng: Option, internal_state: Option, @@ -186,10 +186,14 @@ pub(crate) mod test { where R: RandomEventGenerator, { - pub fn new_with_seed(signal: Receiver<(EventId, PeerId)>, id: PeerId, seed: u64) -> Self { + pub fn new_with_seed( + signal: Receiver<(EventId, TransportPublicKey)>, + key: TransportPublicKey, + seed: u64, + ) -> Self { Self { signal, - id, + key, events_to_gen: HashMap::new(), rng: Some(R::seed_from_u64(seed)), internal_state: None, @@ -233,10 +237,13 @@ pub(crate) mod test { impl MemoryEventsGen { #[cfg(test)] - pub fn new(signal: Receiver<(EventId, PeerId)>, id: PeerId) -> Self { + pub fn new( + signal: Receiver<(EventId, TransportPublicKey)>, + key: TransportPublicKey, + ) -> Self { Self { signal, - id, + key, events_to_gen: HashMap::new(), rng: None, internal_state: None, @@ -267,7 +274,7 @@ pub(crate) mod test { loop { if self.signal.changed().await.is_ok() { let (ev_id, pk) = self.signal.borrow().clone(); - if self.rng.is_some() && pk == self.id { + if self.rng.is_some() && pk == self.key { let res = OpenRequest { client_id: ClientId::FIRST, request: self @@ -279,7 +286,7 @@ pub(crate) mod test { token: None, }; return Ok(res.into_owned()); - } else if pk == self.id { + } else if pk == self.key { let res = OpenRequest { client_id: ClientId::FIRST, request: self @@ -321,7 +328,7 @@ pub(crate) mod test { } pub struct NetworkEventGenerator { - id: PeerId, + id: TransportPublicKey, memory_event_generator: MemoryEventsGen, ws_client: Arc>>>, } @@ -331,7 +338,7 @@ pub(crate) mod test { R: RandomEventGenerator, { pub fn new( - id: PeerId, + id: TransportPublicKey, memory_event_generator: MemoryEventsGen, ws_client: Arc>>>, ) -> Self { @@ -359,8 +366,11 @@ pub(crate) mod test { match message { Some(Ok(Message::Binary(data))) => { - if let Ok((_, pk)) = bincode::deserialize::<(EventId, PeerId)>(&data) { - if pk == self.id { + if let Ok((id, pub_key)) = + bincode::deserialize::<(EventId, TransportPublicKey)>(&data) + { + tracing::debug!(peer = %self.id, %id, "Received event from the supervisor"); + if &pub_key == &self.id { let res = OpenRequest { client_id: ClientId::FIRST, request: self diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 73beee628..e38a8e969 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -53,7 +53,7 @@ pub mod dev_tool { InitPeerNode, NodeConfig, PeerId, }; pub use ring::Location; - pub use transport::TransportKeypair; + pub use transport::{TransportKeypair, TransportPublicKey}; pub use wasm_runtime::{ContractStore, DelegateStore, Runtime, SecretsStore, StateStore}; } diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index cea2cf679..860e6955c 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -304,7 +304,7 @@ pub(crate) enum NodeEvent { ConnectPeer { peer: PeerId, tx: Transaction, - callback: tokio::sync::mpsc::Sender>, + callback: tokio::sync::mpsc::Sender>, is_gw: bool, }, Disconnect { diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index b09742cc7..62ff5f78a 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -12,6 +12,7 @@ use std::{ borrow::Cow, fmt::Display, fs::File, + hash::Hash, io::Read, net::{IpAddr, SocketAddr, ToSocketAddrs}, sync::Arc, @@ -704,7 +705,6 @@ async fn process_message_v1( let span = tracing::info_span!( parent: parent_span, "handle_connect_op_request", - peer = ?op_manager.ring.connection_manager.get_peer_key(), transaction = %msg.id(), tx_type = %msg.id().transaction_type() ); @@ -919,12 +919,24 @@ where /// /// A gateway will have its `PeerId` set when it is created since it will know its own address /// from the start. -#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)] +#[derive(Serialize, Deserialize, Eq, Clone)] pub struct PeerId { pub addr: SocketAddr, pub pub_key: TransportPublicKey, } +impl Hash for PeerId { + fn hash(&self, state: &mut H) { + self.addr.hash(state); + } +} + +impl PartialEq for PeerId { + fn eq(&self, other: &PeerId) -> bool { + self.addr == other.addr + } +} + impl Ord for PeerId { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.addr.cmp(&other.addr) @@ -1010,7 +1022,7 @@ impl std::fmt::Debug for PeerId { impl Display for PeerId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.addr) + write!(f, "{:?}", self.pub_key) } } @@ -1132,7 +1144,7 @@ pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> { node.0 .peer_id .clone() - .map(|id| Location::from_address(&id.addr())) + .map(|id| Location::from_address(&id.addr)) }) .flatten(); diff --git a/crates/core/src/node/network_bridge.rs b/crates/core/src/node/network_bridge.rs index cf452cb52..724b1d918 100644 --- a/crates/core/src/node/network_bridge.rs +++ b/crates/core/src/node/network_bridge.rs @@ -42,8 +42,10 @@ pub(crate) enum ConnectionError { Serialization(#[from] Option>), #[error("{0}")] TransportError(String), - #[error("unwanted connection")] + #[error("failed connect")] FailedConnectOp, + #[error("unwanted connection")] + UnwantedConnection, // errors produced while handling the connection: #[error("IO error: {0}")] @@ -75,6 +77,7 @@ impl Clone for ConnectionError { Self::UnexpectedReq => Self::UnexpectedReq, Self::TransportError(err) => Self::TransportError(err.clone()), Self::FailedConnectOp => Self::FailedConnectOp, + Self::UnwantedConnection => Self::UnwantedConnection, } } } diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index 96ddf5b16..a6e3da943 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -6,8 +6,9 @@ use std::{ sync::Arc, }; use tokio::time::{timeout, Duration}; +use tracing::instrument; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; use tokio::sync::mpsc::{self}; use crate::{ @@ -17,7 +18,7 @@ use crate::{ operations::connect::{ forward_conn, ConnectMsg, ConnectRequest, ConnectResponse, ConnectState, ConnectivityInfo, }, - ring::{ConnectionManager, PeerKeyLocation}, + ring::{ConnectionManager, PeerKeyLocation, Ring}, router::Router, transport::{ InboundConnectionHandler, OutboundConnectionHandler, PeerConnection, TransportError, @@ -46,7 +47,7 @@ pub(super) enum Event { // todo: instead of returning InboundJoinReq which is an internal event // return a proper well formed ConnectOp and any other types needed (PeerConnection etc.) /// An inbound connection to a peer was successfully established at a gateway. - InboundConnection(InboundJoinRequest), + InboundConnection(InboundGwJoinRequest), /// An outbound connection to a peer was successfully established. OutboundConnectionSuccessful { peer_id: PeerId, @@ -189,6 +190,7 @@ impl HandshakeHandler { /// Processes events related to connection establishment and management. /// This is the main event loop for the HandshakeHandler. + #[instrument(skip(self))] pub async fn wait_for_events(&mut self) -> Result { loop { tokio::select! { @@ -197,7 +199,7 @@ impl HandshakeHandler { let Some(conn) = new_conn else { return Err(HandshakeError::ChannelClosed); }; - tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "New inbound connection"); + tracing::debug!(from=%conn.remote_addr(), "New inbound connection"); self.track_inbound_connection(conn); } // Process outbound connection attempts @@ -209,45 +211,59 @@ impl HandshakeHandler { } Some(Ok(InternalEvent::OutboundGwConnEstablished(id, connection))) => { if let Some(addr) = connection.my_address() { + tracing::debug!(%addr, "Attempting setting own peer key"); self.connection_manager.try_set_peer_key(addr); } tracing::debug!(at=?connection.my_address(), from=%connection.remote_addr(), "Outbound connection to gw successful"); - self.wait_for_gw_confirmation(id, connection).await?; + self.wait_for_gw_confirmation(id, connection, Ring::DEFAULT_MAX_HOPS_TO_LIVE).await?; continue; } - Some(Ok(InternalEvent::OutboundGwConnConfirmed(id, connection))) => { - tracing::debug!(at=?connection.my_address(), from=%connection.remote_addr(), "Outbound connection to gw confirmed"); - self.connected.insert(connection.remote_addr()); - self.connecting.remove(&connection.remote_addr()); - return Ok(Event::OutboundGatewayConnectionSuccessful { peer_id: id, connection }); + Some(Ok(InternalEvent::FinishedOutboundConnProcess(tracker))) => { + self.connecting.remove(&tracker.gw_peer.peer.addr); + // at this point we are done checking all the accepts inbound from a transient gw conn + tracing::debug!(at=?tracker.gw_conn.my_address(), gw=%tracker.gw_conn.remote_addr(), "Done checking, connection not accepted by gw, dropping connection"); + Ok(Event::OutboundGatewayConnectionRejected { peer_id: tracker.gw_peer.peer }) + } + // Some(Ok(InternalEvent::OutboundGwConnRejected(peer_id))) => { + // tracing::debug!(from=%peer_id.addr, "Outbound connection to gw rejected"); + // return Ok(Event::OutboundGatewayConnectionRejected { peer_id }); + // } + Some(Ok(InternalEvent::OutboundGwConnConfirmed(tracker))) => { + tracing::debug!(at=?tracker.gw_conn.my_address(), from=%tracker.gw_conn.remote_addr(), "Outbound connection to gw confirmed"); + self.connected.insert(tracker.gw_conn.remote_addr()); + self.connecting.remove(&tracker.gw_conn.remote_addr()); + // FIXME: at p2p_protoc we need to continue this transaction keeping track of the remaining checks at this point + return Ok(Event::OutboundGatewayConnectionSuccessful { peer_id: tracker.gw_peer.peer, connection: tracker.gw_conn }); + } + Some(Ok(InternalEvent::NextCheck(tracker))) => { + self.ongoing_outbound_connections.push( + check_remaining_hops(tracker).boxed() + ); + continue; + } + Some(Ok(InternalEvent::RemoteConnectionAttempt { remote, tracker })) => { + // this shouldn't happen as the tx would exit this module + // see: OutboundGwConnConfirmed + debug_assert!(tracker.gw_accepted_processed && tracker.gw_accepted); + tracing::debug!( + at=?tracker.gw_conn.my_address(), + gw=%tracker.gw_conn.remote_addr(), + "Attempting remote connection to {remote}" + ); + self.start_outbound_connection(remote.clone(), tracker.tx, false).await; + self.ongoing_outbound_connections.push( + check_remaining_hops(tracker).boxed() + ); + continue; } Some(Ok(InternalEvent::DropInboundConnection(addr))) => { self.connecting.remove(&addr); self.outbound_messages.remove(&addr); continue; } - Some(Ok(InternalEvent::InboundJoinRequest(req))) => { + Some(Ok(InternalEvent::InboundGwJoinRequest(req))) => { return Ok(Event::InboundConnection(req)); } - Some(Ok(InternalEvent::OutboundGwConnRejected(peer_id))) => { - tracing::debug!(from=%peer_id.addr, "Outbound connection to gw rejected"); - self.connected.insert(peer_id.addr); - self.connecting.remove(&peer_id.addr); - return Ok(Event::OutboundGatewayConnectionRejected { peer_id }); - } - Some(Ok(InternalEvent::RemoteConnectionAttempt { remote, gw_conn, remaining_checks, gw_peer_id, tx })) => { - tracing::debug!(at=?gw_conn.my_address(), gw=%gw_conn.remote_addr(), "Attempting remote connection to {remote}"); - self.start_outbound_connection(remote.clone(), tx, false).await; - if remaining_checks > 0 { - self.ongoing_outbound_connections.push( - check_remaining_hops(tx, gw_peer_id, gw_conn, remaining_checks).boxed() - ); - continue; - } else { - tracing::debug!(at=?gw_conn.my_address(), gw=%gw_conn.remote_addr(), "No more checks left, dropping connection"); - Ok(Event::OutboundGatewayConnectionRejected { peer_id: gw_peer_id }) - } - } Some(Err((peer_id, error))) => { tracing::debug!(from=%peer_id.addr, "Outbound connection failed: {error}"); self.connecting.remove(&peer_id.addr); @@ -265,15 +281,36 @@ impl HandshakeHandler { }; let (event, outbound_sender) = res?; match event { - InternalEvent::InboundJoinRequest(req) => { + InternalEvent::InboundGwJoinRequest(mut req) => { let remote = req.conn.remote_addr(); let location = Location::from_address(&remote); let should_accept = self.connection_manager.should_accept(location, Some(&req.joiner)); if should_accept { + let accepted_msg = NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { + id: req.id, + sender: self.connection_manager.own_location(), + target: PeerKeyLocation { + peer: req.joiner.clone(), + location: Some(location), + }, + msg: ConnectResponse::AcceptedBy { + accepted: true, + acceptor: self.connection_manager.own_location(), + joiner: req.joiner.clone(), + }, + })); + tracing::debug!(at=?req.conn.my_address(), from=%req.conn.remote_addr(), "Accepting connection"); + + if let Err(e) = req.conn.send(accepted_msg).await { + tracing::error!(%e, "Failed to send accepted message from gw, pruning reserved connection"); + self.connection_manager.prune_in_transit_connection(&req.joiner); + return Err(e.into()); + } + return Ok(Event::InboundConnection(req)); } else { - let InboundJoinRequest { mut conn, id, hops_to_live, max_hops_to_live, skip_list, .. } = req; + let InboundGwJoinRequest { mut conn, id, hops_to_live, max_hops_to_live, skip_list, .. } = req; let remote = conn.remote_addr(); tracing::debug!(at=?conn.my_address(), from=%remote, "Transient connection"); let mut tx = TransientConnection { @@ -518,7 +555,7 @@ impl HandshakeHandler { { // we don't want to crash the node in case of a bug here tracing::error!("No outbound message sender for {addr}", addr = addr); - Err(HandshakeError::UnexpectedMessage(op)) + None } } } @@ -555,24 +592,39 @@ impl HandshakeHandler { /// Waits for confirmation from a gateway after establishing a connection. async fn wait_for_gw_confirmation( &mut self, - peer_id: PeerId, + gw_peer_id: PeerId, conn: PeerConnection, + max_hops_to_live: usize, ) -> Result<()> { let tx = self .connecting - .get(&peer_id.addr) + .get(&gw_peer_id.addr) .ok_or_else(|| HandshakeError::ConnectionClosed(conn.remote_addr()))? .clone(); + let this_peer = self.connection_manager.own_location().peer; + tracing::debug!(at=?conn.my_address(), %this_peer.addr, from=%conn.remote_addr(), remote_addr = %gw_peer_id, "Waiting for confirmation from gw"); self.ongoing_outbound_connections.push( - wait_for_gw_confirmation(peer_id, conn, self.connection_manager.max_hops_to_live, tx) - .boxed(), + wait_for_gw_confirmation( + this_peer, + AcceptedTracker { + gw_peer: gw_peer_id.into(), + gw_conn: conn, + gw_accepted: false, + gw_accepted_processed: false, + remaining_checks: max_hops_to_live, + accepted: 0, + total_checks: max_hops_to_live, + tx, + }, + ) + .boxed(), ); Ok(()) } } #[derive(Debug)] -pub(super) struct InboundJoinRequest { +pub(super) struct InboundGwJoinRequest { pub conn: PeerConnection, pub id: Transaction, pub joiner: PeerId, @@ -583,114 +635,199 @@ pub(super) struct InboundJoinRequest { #[derive(Debug)] enum InternalEvent { - InboundJoinRequest(InboundJoinRequest), + InboundGwJoinRequest(InboundGwJoinRequest), /// Regular connection established OutboundConnEstablished(PeerId, PeerConnection), OutboundGwConnEstablished(PeerId, PeerConnection), - OutboundGwConnConfirmed(PeerId, PeerConnection), - OutboundGwConnRejected(PeerId), + OutboundGwConnConfirmed(AcceptedTracker), + // OutboundGwConnRejected(PeerId), DropInboundConnection(SocketAddr), RemoteConnectionAttempt { - tx: Transaction, remote: PeerId, - gw_conn: PeerConnection, - gw_peer_id: PeerId, - remaining_checks: usize, + tracker: AcceptedTracker, }, + NextCheck(AcceptedTracker), + FinishedOutboundConnProcess(AcceptedTracker), } #[repr(transparent)] struct PeerOutboundMessage(mpsc::Receiver); +#[derive(Debug)] +struct AcceptedTracker { + gw_peer: PeerKeyLocation, + gw_conn: PeerConnection, + gw_accepted_processed: bool, + gw_accepted: bool, + /// Remaining checks to be made, at max total_checks + remaining_checks: usize, + /// At max this will be total_checks + accepted: usize, + /// Equivalent to max_hops_to_live + total_checks: usize, + tx: Transaction, +} + /// Waits for confirmation from a gateway after initiating a connection. async fn wait_for_gw_confirmation( - gw_peer_id: PeerId, - mut conn: PeerConnection, - max_hops_to_live: usize, - tx: Transaction, + this_peer: PeerId, + mut tracker: AcceptedTracker, ) -> OutboundConnResult { + let gw_peer_id = tracker.gw_peer.peer.clone(); let msg = NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { - id: tx, + id: tracker.tx, + target: tracker.gw_peer.clone(), msg: ConnectRequest::StartJoinReq { - joiner: None, - joiner_key: gw_peer_id.pub_key.clone(), - hops_to_live: max_hops_to_live, - max_hops_to_live, + joiner: Some(this_peer.clone()), + joiner_key: this_peer.pub_key.clone(), + hops_to_live: tracker.total_checks, + max_hops_to_live: tracker.total_checks, skip_list: vec![], }, })); - tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "Sending initial connection message to gw"); - conn.send(msg) - .await - .map_err(|err| (gw_peer_id.clone(), HandshakeError::TransportError(err)))?; - tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "Waiting for answer from gw"); - let msg = conn - .recv() + tracing::debug!( + at=?tracker.gw_conn.my_address(), + from=%tracker.gw_conn.remote_addr(), + "Sending initial connection message to gw" + ); + tracker + .gw_conn + .send(msg) .await .map_err(|err| (gw_peer_id.clone(), HandshakeError::TransportError(err)))?; - let deserialized = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e.into()))?; - tracing::debug!(%deserialized, at=?conn.my_address(), from=%conn.remote_addr(), "Received answer from gw"); + tracing::debug!( + at=?tracker.gw_conn.my_address(), + from=%tracker.gw_conn.remote_addr(), + "Waiting for answer from gw" + ); - match deserialized { - NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { - msg: ConnectResponse::AcceptedBy { - accepted, acceptor, .. - }, - .. - })) => { - // the first message should always be a response to the initial connection request at the gateway - debug_assert_eq!(acceptor.peer.addr, conn.remote_addr()); - if accepted { - return Ok(InternalEvent::OutboundGwConnConfirmed(gw_peer_id, conn)); - } - } - other => return Err((gw_peer_id, HandshakeError::UnexpectedMessage(other))), - } + let timeout_duration = Duration::from_secs(10); // under this branch we just need to wait long enough for the gateway to reply with all the downstream // connection attempts, and then we can drop the connection, so keep listening to it in a loop or timeout - let remaining_checks = max_hops_to_live; - check_remaining_hops(tx, gw_peer_id, conn, remaining_checks).await + let remote = tracker.gw_conn.remote_addr(); + Ok(tokio::time::timeout( + timeout_duration, + check_remaining_hops(tracker), + ) + .await + .map_err(|_| { + tracing::debug!(from=%gw_peer_id, "Timed out waiting for acknowledgement from downstream requests"); + ( + gw_peer_id, + HandshakeError::ConnectionClosed(remote), + ) + })??) } -async fn check_remaining_hops( - tx: Transaction, - gw_peer_id: PeerId, - mut conn: PeerConnection, - mut remaining_checks: usize, -) -> OutboundConnResult { - while remaining_checks > 0 { - let msg = conn - .recv() - .await - .map_err(|err| (gw_peer_id.clone(), HandshakeError::TransportError(err)))?; +async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResult { + // let mut gw_accepted = false; + // loop { + // let msg = tokio::time::timeout(timeout_duration, conn.recv()) + // .await + // .map_err(|_| { + // tracing::debug!(from = %gw_peer_id, "Timed out waiting for response from gw"); + // ( + // gw_peer_id.clone(), + // HandshakeError::ConnectionClosed(conn.remote_addr()), + // ) + // })? + // .map_err(|e| (gw_peer_id.clone(), HandshakeError::TransportError(e)))?; + // let deserialized = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e.into()))?; + // tracing::debug!(%deserialized, at=?conn.my_address(), from=%conn.remote_addr(), "Received answer from gw"); + // match deserialized { + // NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { + // msg: + // ConnectResponse::AcceptedBy { + // accepted, acceptor, .. + // }, + // .. + // })) => { + // if accepted && acceptor.peer.addr == conn.remote_addr() { + // // this is a message from the gw indicating they accept, but there could be an other remote address + // // inbound connection, + // gw_accepted = true; + // } else { + // remaining_checks -= 1; + // } + // break; + // } + // NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { + // msg: ConnectRequest::FindOptimalPeer { query_target, .. }, + // .. + // })) => { + // debug_assert_eq!(query_target.peer, this_peer); + // tracing::warn!(from=%conn.remote_addr(), "Received FindOptimalPeer request, ignoring"); + // continue; + // } + // other => return Err((gw_peer_id, HandshakeError::UnexpectedMessage(other))), + // } + // } + + let remote_addr = tracker.gw_conn.remote_addr(); + let gw_peer_id = tracker.gw_peer.peer.clone(); + while tracker.remaining_checks > 0 { + let msg = tokio::time::timeout( + Duration::from_secs(10), + tracker + .gw_conn + .recv() + .map_err(|err| (gw_peer_id.clone(), HandshakeError::TransportError(err))), + ) + .map_err(|_| { + tracing::debug!(from = %gw_peer_id, "Timed out waiting for response from gw"); + ( + gw_peer_id.clone(), + HandshakeError::ConnectionClosed(remote_addr), + ) + }) + .await??; let msg = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e.into()))?; - remaining_checks -= 1; - tracing::debug!(%msg, at=?conn.my_address(), from=%conn.remote_addr(), %remaining_checks, "Received message from gw"); - let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { - msg: - ConnectResponse::AcceptedBy { - accepted, - acceptor, - joiner, - }, - .. - })) = msg - else { - return Err((gw_peer_id, HandshakeError::UnexpectedMessage(msg))); - }; - debug_assert_eq!(joiner.addr, conn.my_address().unwrap()); - if accepted { - return Ok(InternalEvent::RemoteConnectionAttempt { - tx, - remote: acceptor.peer, - gw_peer_id, - gw_conn: conn, - remaining_checks, - }); + match msg { + NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { + msg: + ConnectResponse::AcceptedBy { + accepted, acceptor, .. + }, + .. + })) => { + tracker.remaining_checks -= 1; + if acceptor.peer.addr == tracker.gw_conn.remote_addr() { + // this is a message from the gw indicating if they accepted or not + tracker.gw_accepted_processed = true; + if accepted { + tracker.gw_accepted = true; + tracker.accepted += 1; + } + tracing::debug!( + at = ?tracker.gw_conn.my_address(), + from = %tracker.gw_conn.remote_addr(), + %accepted, + "Received answer from gw" + ); + if accepted { + return Ok(InternalEvent::OutboundGwConnConfirmed(tracker)); + } else { + return Ok(InternalEvent::NextCheck(tracker)); + } + } else { + return Ok(InternalEvent::RemoteConnectionAttempt { + remote: acceptor.peer, + tracker, + }); + } + } + NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { + msg: ConnectRequest::FindOptimalPeer { .. }, + .. + })) => { + tracing::warn!(from=%tracker.gw_conn.remote_addr(), "Received FindOptimalPeer request, ignoring"); + continue; + } + other => return Err((gw_peer_id, HandshakeError::UnexpectedMessage(other))), } } - Ok(InternalEvent::OutboundGwConnRejected(gw_peer_id)) + Ok(InternalEvent::FinishedOutboundConnProcess(tracker)) } /// Handles communication with a potentially transient peer connection. @@ -720,12 +857,16 @@ async fn gw_peer_connection_listener( match net_message { NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { id, - msg: ConnectRequest::StartJoinReq { joiner, joiner_key, hops_to_live, max_hops_to_live, skip_list } + msg: ConnectRequest::StartJoinReq { joiner, joiner_key, hops_to_live, max_hops_to_live, skip_list }, + .. })) => { - let joiner = joiner.unwrap_or_else(|| PeerId::new(conn.remote_addr(), joiner_key)); + let joiner = joiner.unwrap_or_else(|| { + tracing::debug!(%joiner_key, "Joiner not provided, using joiner key"); + PeerId::new(conn.remote_addr(), joiner_key) + }); break Ok(( - InternalEvent::InboundJoinRequest( - InboundJoinRequest { + InternalEvent::InboundGwJoinRequest( + InboundGwJoinRequest { conn, id, joiner, hops_to_live, max_hops_to_live, skip_list } ), @@ -850,6 +991,7 @@ impl TransientConnection { if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { id, msg: ConnectRequest::CleanConnection { joiner }, + .. })) = net_message { // this peer should never be receiving messages for other transactions or other peers at this point @@ -942,10 +1084,13 @@ mod tests { /// is successfully established. async fn establish_inbound_conn(&mut self, addr: SocketAddr, pub_key: TransportPublicKey) { let id = Transaction::new::(); + let target_peer_id = PeerId::new(addr, pub_key.clone()); + let target_peer = PeerKeyLocation::from(target_peer_id); // let joiner_key = TransportKeypair::new(); // let pub_key = joiner_key.public().clone(); let initial_join_req = ConnectMsg::Request { id, + target: target_peer, msg: ConnectRequest::StartJoinReq { joiner: None, joiner_key: pub_key, @@ -1183,6 +1328,7 @@ mod tests { ConnectRequest::StartJoinReq { joiner, joiner_key, .. }, + .. })) => { assert_eq!(id, inbound_id); assert!(joiner.is_none()); @@ -1272,7 +1418,7 @@ mod tests { #[tokio::test] async fn test_gw_to_peer_outbound_conn_forwarded() -> anyhow::Result<()> { - crate::config::set_logger(Some(tracing::level_filters::LevelFilter::DEBUG)); + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::DEBUG)); let gw_addr: SocketAddr = ([127, 0, 0, 1], 10000).into(); let peer_addr: SocketAddr = ([127, 0, 0, 1], 10001).into(); let joiner_addr: SocketAddr = ([127, 0, 0, 1], 10002).into(); @@ -1319,7 +1465,7 @@ mod tests { tokio::time::timeout(Duration::from_secs(5), gw_handler.wait_for_events()) .await??; match event { - Event::InboundConnection(InboundJoinRequest { + Event::InboundConnection(InboundGwJoinRequest { conn: first_peer_conn, joiner: third_party_peer, .. @@ -1385,7 +1531,7 @@ mod tests { #[tokio::test] async fn test_peer_to_gw_outbound_conn_rejected() -> anyhow::Result<()> { - crate::config::set_logger(Some(tracing::level_filters::LevelFilter::DEBUG)); + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::DEBUG)); let joiner_addr = ([127, 0, 0, 1], 10001).into(); let (mut handler, mut test) = config_handler(joiner_addr); @@ -1677,7 +1823,6 @@ mod tests { let peer_key = TransportKeypair::new(); let peer_pub_key = peer_key.public().clone(); let peer_addr = ([127, 0, 0, 2], 10002).into(); - let peer_peer_id = PeerId::new(peer_addr, peer_pub_key.clone()); let tx = Transaction::new::(); diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index a54cb31de..8034d6248 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -18,9 +18,10 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::oneshot; use tracing::Instrument; +use crate::dev_tool::Location; use crate::node::network_bridge::handshake::{ EstablishConnection, Event as HandshakeEvent, HandshakeError, HandshakeHandler, - InboundJoinRequest, OutboundMessage, + InboundGwJoinRequest, OutboundMessage, }; use crate::node::PeerId; use crate::transport::{ @@ -138,7 +139,7 @@ impl P2pConnManager { }) } - #[tracing::instrument(name = "network_event_listener", fields(peer = ?self.bridge.op_manager.ring.connection_manager.get_peer_key()), skip_all)] + #[tracing::instrument(name = "network_event_listener", fields(peer = %self.bridge.op_manager.ring.connection_manager.pub_key), skip_all)] pub async fn run_event_listener( mut self, op_manager: Arc, @@ -181,58 +182,82 @@ impl P2pConnManager { match event { EventResult::Continue => continue, - EventResult::Event(event) => match event { - ConnEvent::InboundMessage(msg) => { - self.handle_inbound_message( - msg, - &outbound_message, - &op_manager, - &mut state, - &executor_listener, - &cli_response_sender, - ) - .await?; - } - ConnEvent::HandshakeAction(action) => { - self.handle_handshake_action(action, &mut state).await?; - } - ConnEvent::ClosedChannel => { - tracing::info!("Notification channel closed"); - break; - } - ConnEvent::NodeAction(action) => match action { - NodeEvent::DropConnection(peer) => { - if let Some(conn) = self.connections.remove(&peer) { - let _ = conn.send(Right(ConnEvent::NodeAction( - NodeEvent::DropConnection(peer), - ))); - } - } - NodeEvent::ConnectPeer { - peer, - tx, - callback, - is_gw, - } => { - self.handle_connect_peer( - peer, - Box::new(callback), - tx, - &establish_connection, + EventResult::Event(event) => { + match event { + ConnEvent::InboundMessage(msg) => { + self.handle_inbound_message( + msg, + &outbound_message, + &op_manager, &mut state, - is_gw, + &executor_listener, + &cli_response_sender, ) .await?; } - NodeEvent::Disconnect { cause } => { - tracing::info!( - "Disconnecting from network{}", - cause.map(|c| format!(": {}", c)).unwrap_or_default() + ConnEvent::OutboundMessage(NetMessage::V1(NetMessageV1::Aborted(tx))) => { + // TODO: handle aborted transaction as internal message + tracing::error!(%tx, "Aborted transaction"); + } + ConnEvent::OutboundMessage(msg) => { + let target_peer = msg.target().expect( + "Target peer not set, must be set for connection outbound message", ); + tracing::debug!(%target_peer, %msg, "Sending message to peer"); + match self.connections.get(&target_peer.peer) { + Some(peer_connection) => { + if let Err(e) = peer_connection.send(Left(msg)).await { + tracing::error!("Failed to send message to peer: {}", e); + } + } + None => { + tracing::error!("No existing outbound connection to forward the message to {}", target_peer.peer); + } + } + } + + ConnEvent::HandshakeAction(action) => { + self.handle_handshake_action(action, &mut state).await?; + } + ConnEvent::ClosedChannel => { + tracing::info!("Notification channel closed"); break; } - }, - }, + ConnEvent::NodeAction(action) => match action { + NodeEvent::DropConnection(peer) => { + tracing::debug!(%peer, "Dropping connection"); + if let Some(conn) = self.connections.remove(&peer) { + let _ = conn.send(Right(ConnEvent::NodeAction( + NodeEvent::DropConnection(peer), + ))); + } + } + NodeEvent::ConnectPeer { + peer, + tx, + callback, + is_gw, + } => { + self.handle_connect_peer( + peer, + Box::new(callback), + tx, + &establish_connection, + &mut state, + is_gw, + ) + .await?; + } + NodeEvent::Disconnect { cause } => { + tracing::info!( + "Disconnecting from network{}", + cause.map(|c| format!(": {}", c)).unwrap_or_default() + ); + break; + } + }, + } + } } } Ok(()) @@ -329,7 +354,6 @@ impl P2pConnManager { let span = tracing::info_span!( "process_network_message", - peer = ?self.bridge.op_manager.ring.connection_manager.get_peer_key(), transaction = %msg.id(), tx_type = %msg.id().transaction_type() ); @@ -382,12 +406,30 @@ impl P2pConnManager { state: &mut EventListenerState, ) -> anyhow::Result<()> { match event { - HandshakeEvent::InboundConnection(InboundJoinRequest { conn, joiner, .. }) => { + HandshakeEvent::InboundConnection(InboundGwJoinRequest { + id, + conn, + joiner, + hops_to_live, + .. + }) => { let (tx, rx) = mpsc::channel(1); - self.connections.insert(joiner, tx); + self.connections.insert(joiner.clone(), tx); + let was_reserved = { + // this is an unexpected inbound request at a gateway so it didn't have a reserved spot + false + }; + self.bridge + .op_manager + .ring + .add_connection( + Location::from_address(&joiner.addr), + joiner.clone(), + was_reserved, + ) + .await; let task = peer_connection_listener(rx, conn).boxed(); state.peer_connections.push(task); - todo!("Need to add the operation to the op_manager properly, in the correct state") } HandshakeEvent::TransientForwardTransaction { target, @@ -418,7 +460,6 @@ impl P2pConnManager { } => { self.handle_successful_connection(peer_id, connection, state) .await?; - todo!("Create a new connect op to handle the inbound forward connection attempts") } HandshakeEvent::OutboundGatewayConnectionSuccessful { peer_id, @@ -456,7 +497,22 @@ impl P2pConnManager { state: &mut EventListenerState, ) -> anyhow::Result<()> { if let Some(mut cb) = state.awaiting_connection.remove(&peer_id.addr) { - let _ = cb.send_result(Ok(())).await; + let peer_id = if let Some(peer_id) = self + .bridge + .op_manager + .ring + .connection_manager + .get_peer_key() + { + peer_id + } else { + let self_addr = connection + .my_address() + .ok_or_else(|| anyhow::anyhow!("self addr should be set"))?; + let key = (&*self.bridge.op_manager.ring.connection_manager.pub_key).clone(); + PeerId::new(self_addr, key) + }; + let _ = cb.send_result(Ok(peer_id)).await; } else { tracing::warn!(%peer_id, "No callback for connection established"); } @@ -485,6 +541,7 @@ impl P2pConnManager { .keys() .find_map(|k| (k.addr == socket_addr).then(|| k.clone())) { + tracing::debug!(%peer, "Dropping connection"); self.bridge .op_manager .ring @@ -507,7 +564,7 @@ impl P2pConnManager { msg: Option>, ) -> EventResult { match msg { - Some(Left(msg)) => EventResult::Event(ConnEvent::InboundMessage(msg)), // FIXME: this is not inbound, is outbound + Some(Left(msg)) => EventResult::Event(ConnEvent::OutboundMessage(msg)), Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action)), None => EventResult::Continue, } @@ -515,7 +572,7 @@ impl P2pConnManager { async fn handle_bridge_msg(&self, msg: Option) -> EventResult { match msg { - Some(Left((_, msg))) => EventResult::Event(ConnEvent::InboundMessage(*msg)), + Some(Left((_, msg))) => EventResult::Event(ConnEvent::OutboundMessage(*msg)), Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action)), None => EventResult::Event(ConnEvent::ClosedChannel), } @@ -568,14 +625,14 @@ impl P2pConnManager { trait ConnectResultSender { fn send_result( &mut self, - result: Result<(), HandshakeError>, + result: Result, ) -> Pin> + Send + '_>>; } -impl ConnectResultSender for Option>> { +impl ConnectResultSender for Option>> { fn send_result( &mut self, - result: Result<(), HandshakeError>, + result: Result, ) -> Pin> + Send + '_>> { async move { let _ = self.take().expect("always set").send(result); @@ -585,10 +642,10 @@ impl ConnectResultSender for Option>> } } -impl ConnectResultSender for mpsc::Sender> { +impl ConnectResultSender for mpsc::Sender> { fn send_result( &mut self, - result: Result<(), HandshakeError>, + result: Result, ) -> Pin> + Send + '_>> { async move { self.send(result.map_err(|_| ())) @@ -628,6 +685,7 @@ enum EventResult { #[derive(Debug)] enum ConnEvent { InboundMessage(NetMessage), + OutboundMessage(NetMessage), HandshakeAction(HandshakeEvent), NodeAction(NodeEvent), ClosedChannel, @@ -655,17 +713,16 @@ async fn peer_connection_listener( loop { tokio::select! { msg = rx.recv() => { - tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "Received message from channel"); let Some(msg) = msg else { break Err(TransportError::ConnectionClosed(conn.remote_addr())); }; match msg { Left(msg) => { - tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr() ,"Sending message to peer. Msg: {msg}"); + tracing::debug!(to=%conn.remote_addr() ,"Sending message to peer. Msg: {msg}"); conn .send(msg) .await?; } Right(action) => { - tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "Received action from channel"); + tracing::debug!(to=%conn.remote_addr(), "Received action from channel"); match action { ConnEvent::NodeAction(NodeEvent::DropConnection(_)) | ConnEvent::ClosedChannel => { break Err(TransportError::ConnectionClosed(conn.remote_addr())); @@ -679,12 +736,12 @@ async fn peer_connection_listener( } msg = conn.recv() => { let Ok(msg) = msg.map_err(|error| { - tracing::error!(at=?conn.my_address(), from=%conn.remote_addr(), "Error while receiving message: {error}"); + tracing::error!(from=%conn.remote_addr(), "Error while receiving message: {error}"); }) else { break Err(TransportError::ConnectionClosed(conn.remote_addr())); }; let net_message = decode_msg(&msg).unwrap(); - tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr() ,"Received message from peer. Msg: {net_message}"); + tracing::debug!(from=%conn.remote_addr() ,"Received message from peer. Msg: {net_message}"); break Ok(PeerConnectionInbound { conn, rx, msg: net_message }); } } diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index 60ae65284..90fdffbb8 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -1,6 +1,5 @@ use std::{ collections::{HashMap, HashSet}, - fmt::Write, net::Ipv6Addr, num::NonZeroUsize, pin::Pin, @@ -11,7 +10,6 @@ use std::{ use either::Either; use freenet_stdlib::prelude::*; use futures::Future; -use itertools::Itertools; use rand::seq::SliceRandom; use tokio::sync::{mpsc, watch}; use tracing::{info, Instrument}; @@ -25,11 +23,13 @@ use crate::{ self, ContractHandlerChannel, ExecutorToEventLoopChannel, NetworkEventListenerHalve, WaitingResolution, }, + dev_tool::TransportKeypair, message::{MessageStats, NetMessage, NetMessageV1, NodeEvent, Transaction}, node::{InitPeerNode, NetEventRegister, NodeConfig}, operations::connect, ring::{Distance, Location, PeerKeyLocation}, tracing::TestEventListener, + transport::TransportPublicKey, }; mod in_memory; @@ -105,14 +105,6 @@ impl<'a> From<&'a str> for NodeLabel { } } -#[cfg(test)] -#[derive(Clone)] -pub(crate) struct NodeSpecification { - pub owned_contracts: Vec<(ContractContainer, WrappedState, bool)>, - pub events_to_generate: HashMap>, - pub contract_subscribers: HashMap>, -} - #[derive(Clone)] struct GatewayConfig { label: NodeLabel, @@ -120,20 +112,19 @@ struct GatewayConfig { location: Location, } -pub struct EventChain> { - labels: Vec<(NodeLabel, PeerId)>, - // user_ev_controller: Sender<(EventId, PeerId)>, +pub struct EventChain> { + labels: Vec<(NodeLabel, TransportPublicKey)>, user_ev_controller: S, total_events: u32, count: u32, rng: rand::rngs::SmallRng, clean_up_tmp_dirs: bool, - choice: Option, + choice: Option, } impl EventChain { pub fn new( - labels: Vec<(NodeLabel, PeerId)>, + labels: Vec<(NodeLabel, TransportPublicKey)>, user_ev_controller: S, total_events: u32, clean_up_tmp_dirs: bool, @@ -158,7 +149,7 @@ impl EventChain { } } - fn choose_peer(self: Pin<&mut Self>) -> PeerId { + fn choose_peer(self: Pin<&mut Self>) -> TransportPublicKey { let this = unsafe { // This is safe because we're not moving the EventChain, just copying one inner valur self.get_unchecked_mut() @@ -172,7 +163,7 @@ impl EventChain { id.clone() } - fn set_choice(self: Pin<&mut Self>, id: PeerId) { + fn set_choice(self: Pin<&mut Self>, id: TransportPublicKey) { let this = unsafe { // This is safe because we're not moving the EventChain, just copying one inner valur self.get_unchecked_mut() @@ -185,15 +176,15 @@ trait EventSender { fn send( &self, cx: &mut std::task::Context<'_>, - value: (EventId, PeerId), + value: (EventId, TransportPublicKey), ) -> std::task::Poll>; } -impl EventSender for mpsc::Sender<(EventId, PeerId)> { +impl EventSender for mpsc::Sender<(EventId, TransportPublicKey)> { fn send( &self, cx: &mut std::task::Context<'_>, - value: (EventId, PeerId), + value: (EventId, TransportPublicKey), ) -> std::task::Poll> { let f = self.send(value); futures::pin_mut!(f); @@ -201,11 +192,11 @@ impl EventSender for mpsc::Sender<(EventId, PeerId)> { } } -impl EventSender for watch::Sender<(EventId, PeerId)> { +impl EventSender for watch::Sender<(EventId, TransportPublicKey)> { fn send( &self, _cx: &mut std::task::Context<'_>, - value: (EventId, PeerId), + value: (EventId, TransportPublicKey), ) -> std::task::Poll> { match self.send(value) { Ok(_) => std::task::Poll::Ready(Ok(())), @@ -246,7 +237,7 @@ impl futures::stream::Stream for EventChain { impl Drop for EventChain { fn drop(&mut self) { if self.clean_up_tmp_dirs { - clean_up_tmp_dirs(&self.labels) + clean_up_tmp_dirs(self.labels.iter().map(|(l, _)| l)); } } } @@ -258,8 +249,7 @@ type DefaultRegistry = CombinedRegister<2>; type DefaultRegistry = TestEventListener; pub(super) struct Builder { - pub(super) peer_key: PeerId, - config: NodeConfig, + pub config: NodeConfig, contract_handler_name: String, add_noise: bool, event_register: ER, @@ -275,9 +265,7 @@ impl Builder { contract_handler_name: String, add_noise: bool, ) -> Builder { - let peer_key = builder.get_peer_id().unwrap(); Builder { - peer_key, config: builder.clone(), contract_handler_name, add_noise, @@ -292,10 +280,10 @@ impl Builder { pub struct SimNetwork { name: String, clean_up_tmp_dirs: bool, - labels: Vec<(NodeLabel, PeerId)>, + labels: Vec<(NodeLabel, TransportPublicKey)>, pub(crate) event_listener: TestEventListener, - user_ev_controller: Option>, - receiver_ch: watch::Receiver<(EventId, PeerId)>, + user_ev_controller: Option>, + receiver_ch: watch::Receiver<(EventId, TransportPublicKey)>, number_of_gateways: usize, gateways: Vec<(Builder, GatewayConfig)>, number_of_nodes: usize, @@ -319,7 +307,8 @@ impl SimNetwork { min_connections: usize, ) -> Self { assert!(nodes > 0); - let (user_ev_controller, mut receiver_ch) = watch::channel((0, PeerId::random())); + let (user_ev_controller, mut receiver_ch) = + watch::channel((0, TransportKeypair::new().public().clone())); receiver_ch.borrow_and_update(); let mut net = Self { name: name.into(), @@ -382,6 +371,7 @@ impl SimNetwork { config.key_pair = keypair; config.network_listener_ip = Ipv6Addr::LOCALHOST.into(); config.network_listener_port = port; + config.with_peer_id(id.clone()); config .with_location(location) .max_hops_to_live(self.ring_max_htl) @@ -389,7 +379,8 @@ impl SimNetwork { .min_number_of_connections(self.min_connections) .is_gateway() .rnd_if_htl_above(self.rnd_if_htl_above); - self.event_listener.add_node(label.clone(), id.clone()); + self.event_listener + .add_node(label.clone(), config.key_pair.public().clone()); configs.push(( config, GatewayConfig { @@ -444,7 +435,6 @@ impl SimNetwork { for node_no in self.number_of_gateways..num + self.number_of_gateways { let label = NodeLabel::node(node_no); - let peer = PeerId::random(); let mut config_args = ConfigArgs::default(); config_args.id = Some(format!("{label}")); @@ -455,12 +445,14 @@ impl SimNetwork { let port = crate::util::get_free_port().unwrap(); config.network_listener_port = port; config.network_listener_ip = Ipv6Addr::LOCALHOST.into(); + config.key_pair = crate::transport::TransportKeypair::new(); config .max_hops_to_live(self.ring_max_htl) .rnd_if_htl_above(self.rnd_if_htl_above) .max_number_of_connections(self.max_connections); - self.event_listener.add_node(label.clone(), peer); + self.event_listener + .add_node(label.clone(), config.key_pair.public().clone()); let event_listener = { #[cfg(feature = "trace-ot")] @@ -486,43 +478,6 @@ impl SimNetwork { } } - #[cfg(test)] - pub(crate) async fn start(&mut self) { - self.start_with_spec(HashMap::new()).await - } - - #[cfg(test)] - pub(crate) async fn start_with_spec( - &mut self, - mut specs: HashMap, - ) { - let gw = self.gateways.drain(..).map(|(n, c)| (n, c.label)); - for (mut node, label) in gw.chain(self.nodes.drain(..)).collect::>() { - tracing::debug!(peer = %label, "initializing"); - let node_spec = specs.remove(&label); - let mut user_events = - MemoryEventsGen::new(self.receiver_ch.clone(), node.peer_key.clone()); - if let Some(specs) = node_spec.clone() { - user_events.generate_events(specs.events_to_generate); - } - let span = if label.is_gateway() { - tracing::info_span!("in_mem_gateway", %node.peer_key) - } else { - tracing::info_span!("in_mem_node", %node.peer_key) - }; - if let Some(specs) = node_spec { - node.append_contracts(specs.owned_contracts, specs.contract_subscribers); - } - self.labels.push((label, node.peer_key.clone())); - - let node_task = async move { node.run_node(user_events, span).await }; - GlobalExecutor::spawn(node_task); - - tokio::time::sleep(self.start_backoff).await; - } - self.labels.sort_by(|(a, _), (b, _)| a.cmp(b)); - } - pub async fn start_with_rand_gen( &mut self, seed: u64, @@ -539,16 +494,17 @@ impl SimNetwork { tracing::debug!(peer = %label, "initializing"); let mut user_events = MemoryEventsGen::::new_with_seed( self.receiver_ch.clone(), - node.peer_key.clone(), + node.config.key_pair.public().clone(), seed, ); user_events.rng_params(label.number(), total_peer_num, max_contract_num, iterations); let span = if label.is_gateway() { - tracing::info_span!("in_mem_gateway", %node.peer_key) + tracing::info_span!("in_mem_gateway", %label) } else { - tracing::info_span!("in_mem_node", %node.peer_key) + tracing::info_span!("in_mem_node", %label) }; - self.labels.push((label, node.peer_key.clone())); + self.labels + .push((label, node.config.key_pair.public().clone())); let node_task = async move { node.run_node(user_events, span).await }; let handle = GlobalExecutor::spawn(node_task); @@ -565,7 +521,8 @@ impl SimNetwork { let gw = self.gateways.drain(..).map(|(n, c)| (n, c.label)); let mut peers = vec![]; for (builder, label) in gw.chain(self.nodes.drain(..)).collect::>() { - self.labels.push((label.clone(), builder.peer_key)); + let pub_key = builder.config.key_pair.public(); + self.labels.push((label.clone(), pub_key.clone())); peers.push((label, builder.config)); } self.labels.sort_by(|(a, _), (b, _)| a.cmp(b)); @@ -573,130 +530,24 @@ impl SimNetwork { peers } - pub fn get_locations_by_node(&self) -> HashMap { - let mut locations_by_node: HashMap = HashMap::new(); - - // Get node and gateways location by label - for (node, label) in &self.nodes { - locations_by_node.insert( - label.clone(), - PeerKeyLocation { - peer: node.peer_key.clone(), - location: None, - }, - ); - } - for (node, config) in &self.gateways { - locations_by_node.insert( - config.label.clone(), - PeerKeyLocation { - peer: node.peer_key.clone(), - location: config.location.into(), - }, - ); - } - locations_by_node - } - - pub fn connected(&self, peer: &NodeLabel) -> bool { - let pos = self - .labels - .binary_search_by(|(label, _)| label.cmp(peer)) - .expect("peer not found"); - self.event_listener.is_connected(&self.labels[pos].1) - } - - pub fn has_put_contract(&self, peer: impl Into, key: &ContractKey) -> bool { - let peer = peer.into(); - let pos = self - .labels - .binary_search_by(|(label, _)| label.cmp(&peer)) - .expect("peer not found"); - self.event_listener - .has_put_contract(&self.labels[pos].1, key) - } - - pub fn has_got_contract(&self, peer: impl Into, key: &ContractKey) -> bool { - let peer = peer.into(); - let pos = self - .labels - .binary_search_by(|(label, _)| label.cmp(&peer)) - .expect("peer not found"); - self.event_listener - .has_got_contract(&self.labels[pos].1, key) - } - - pub fn is_subscribed_to_contract(&self, peer: impl Into, key: &ContractKey) -> bool { - let peer = peer.into(); - let pos = self - .labels - .binary_search_by(|(label, _)| label.cmp(&peer)) - .expect("peer not found"); - self.event_listener - .is_subscribed_to_contract(&self.labels[pos].1, key) - } - - /// Builds an histogram of the distribution in the ring of each node relative to each other. - pub fn ring_distribution(&self, scale: i32) -> Vec<(f64, usize)> { - let mut all_dists = Vec::with_capacity(self.labels.len()); - for (.., key) in &self.labels { - all_dists.push(self.event_listener.connections(key.clone())); - } - let mut dist_buckets = group_locations_in_buckets( - all_dists.into_iter().flatten().map(|(_, l)| l.as_f64()), - scale, - ) - .collect::>(); - dist_buckets - .sort_by(|(d0, _), (d1, _)| d0.partial_cmp(d1).unwrap_or(std::cmp::Ordering::Equal)); - dist_buckets - } - /// Returns the connectivity in the network per peer (that is all the connections /// this peers has registered). - pub fn node_connectivity(&self) -> HashMap)> { + pub fn node_connectivity( + &self, + ) -> HashMap)> { let mut peers_connections = HashMap::with_capacity(self.labels.len()); let key_to_label: HashMap<_, _> = self.labels.iter().map(|(k, v)| (v, k)).collect(); for (label, key) in &self.labels { let conns = self .event_listener - .connections(key.clone()) - .map(|(k, d)| (key_to_label[&k].clone(), d)) + .connections(key) + .map(|(k, d)| (key_to_label[&k.pub_key].clone(), d)) .collect::>(); peers_connections.insert(label.clone(), (key.clone(), conns)); } peers_connections } - /// # Arguments - /// - /// - label: node for which to trigger the - /// - event_id: which event to trigger - /// - await_for: if set, wait for the duration before returning - #[cfg(test)] - pub(crate) async fn trigger_event( - &self, - label: impl Into, - event_id: EventId, - await_for: Option, - ) -> anyhow::Result<()> { - let label = label.into(); - let pos = self - .labels - .binary_search_by(|(other, _)| other.cmp(&label)) - .map_err(|_| anyhow::anyhow!("peer not found"))?; - let (_, peer) = &self.labels[pos]; - self.user_ev_controller - .as_ref() - .expect("should be set") - .send((event_id, peer.clone())) - .expect("node listeners disconnected"); - if let Some(sleep_time) = await_for { - tokio::time::sleep(sleep_time).await; - } - Ok(()) - } - /// Start an event chain for this simulation. Allows passing a different controller for the peers. /// /// If done make sure you set the proper receiving side for the controller. For example in the @@ -704,7 +555,7 @@ impl SimNetwork { pub fn event_chain( mut self, total_events: u32, - controller: Option>, + controller: Option>, ) -> EventChain { let user_ev_controller = controller.unwrap_or_else(|| { self.user_ev_controller @@ -777,15 +628,12 @@ impl SimNetwork { Ok(()) } - pub fn print_network_connections(&self) { - let node_connectivity = self.node_connectivity(); - let connections = pretty_print_connections(&node_connectivity); - tracing::info!("{connections}"); - } - - pub fn print_ring_distribution(&self) { - let hist = self.ring_distribution(1); - tracing::info!("Ring distribution: {:?}", hist); + pub fn connected(&self, peer: &NodeLabel) -> bool { + let pos = self + .labels + .binary_search_by(|(label, _)| label.cmp(peer)) + .expect("peer not found"); + self.event_listener.is_connected(&self.labels[pos].1) } /// Recommended to calling after `check_connectivity` to ensure enough time @@ -859,13 +707,13 @@ impl std::fmt::Debug for SimNetwork { impl Drop for SimNetwork { fn drop(&mut self) { if self.clean_up_tmp_dirs { - clean_up_tmp_dirs(&self.labels); + clean_up_tmp_dirs(self.labels.iter().map(|(l, _)| l)); } } } -fn clean_up_tmp_dirs(labels: &[(NodeLabel, PeerId)]) { - for (label, _) in labels { +fn clean_up_tmp_dirs<'a>(labels: impl Iterator) { + for label in labels { let p = std::env::temp_dir().join(format!( "freenet-executor-{sim}-{label}", sim = "sim", @@ -875,67 +723,6 @@ fn clean_up_tmp_dirs(labels: &[(NodeLabel, PeerId)]) { } } -fn group_locations_in_buckets( - locs: impl IntoIterator, - scale: i32, -) -> impl Iterator { - let mut distances = HashMap::new(); - for (bucket, group) in &locs - .into_iter() - .chunk_by(|l| (l * (10.0f64).powi(scale)).floor() as u32) - { - let count = group.count(); - distances - .entry(bucket) - .and_modify(|c| *c += count) - .or_insert(count); - } - distances - .into_iter() - .map(move |(k, v)| ((k as f64 / (10.0f64).powi(scale)), v)) -} - -fn pretty_print_connections( - conns: &HashMap)>, -) -> String { - let mut connections = String::from("Node connections:\n"); - let mut conns = conns.iter().collect::>(); - conns.sort_by(|(a, _), (b, _)| a.cmp(b)); - for (peer, (key, conns)) in conns { - writeln!(&mut connections, "{peer} ({key}):").unwrap(); - for (conn, dist) in conns { - let dist = dist.as_f64(); - writeln!(&mut connections, " {conn} (dist: {dist:.3})").unwrap(); - } - } - connections -} - -#[test] -fn group_locations_test() -> anyhow::Result<()> { - let locations = vec![0.5356, 0.5435, 0.5468, 0.5597, 0.6745, 0.7309, 0.7412]; - - let mut grouped: Vec<_> = group_locations_in_buckets(locations.clone(), 1).collect(); - grouped.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); - assert_eq!(grouped, vec![(0.5, 4), (0.6, 1), (0.7, 2)]); - - let mut grouped: Vec<_> = group_locations_in_buckets(locations, 2).collect(); - grouped.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); - assert_eq!( - grouped, - vec![ - (0.53, 1), - (0.54, 2), - (0.55, 1), - (0.67, 1), - (0.73, 1), - (0.74, 1) - ] - ); - - Ok(()) -} - use super::op_state_manager::OpManager; use crate::client_events::ClientEventsProxy; diff --git a/crates/core/src/node/testing_impl/in_memory.rs b/crates/core/src/node/testing_impl/in_memory.rs index 2c13f45d4..322aa38e7 100644 --- a/crates/core/src/node/testing_impl/in_memory.rs +++ b/crates/core/src/node/testing_impl/in_memory.rs @@ -10,7 +10,7 @@ use crate::{ node::{ network_bridge::{event_loop_notification_channel, in_memory::MemoryConnManager}, op_state_manager::OpManager, - NetEventRegister, NetworkBridge, + NetEventRegister, NetworkBridge, PeerId, }, ring::{ConnectionManager, PeerKeyLocation}, }; @@ -49,7 +49,10 @@ impl Builder { .map_err(|e| anyhow::anyhow!(e))?; let conn_manager = MemoryConnManager::new( - self.peer_key.clone(), + PeerId::new( + ([127, 0, 0, 1], 0).into(), + self.config.key_pair.public().clone(), + ), self.event_register.clone(), op_manager.clone(), self.add_noise, @@ -61,7 +64,10 @@ impl Builder { ); let mut config = super::RunnerConfig { - peer_key: self.peer_key, + peer_key: PeerId::new( + ([127, 0, 0, 1], 0).into(), + self.config.key_pair.public().clone(), + ), gateways, parent_span: Some(parent_span), op_manager, @@ -77,16 +83,6 @@ impl Builder { .await?; super::run_node(config).await } - - #[cfg(test)] - pub fn append_contracts( - &mut self, - contracts: Vec<(ContractContainer, WrappedState, bool)>, - contract_subscribers: std::collections::HashMap>, - ) { - self.contracts.extend(contracts); - self.contract_subscribers.extend(contract_subscribers); - } } impl RunnerConfig diff --git a/crates/core/src/node/testing_impl/network.rs b/crates/core/src/node/testing_impl/network.rs index 5f4dda104..e04e6217e 100644 --- a/crates/core/src/node/testing_impl/network.rs +++ b/crates/core/src/node/testing_impl/network.rs @@ -1,5 +1,6 @@ use crate::client_events::BoxedClient; use crate::contract::MemoryContractHandler; +use crate::dev_tool::TransportPublicKey; use crate::node::p2p_impl::NodeP2P; use crate::node::Node; use crate::tracing::EventRegister; @@ -16,8 +17,8 @@ pub struct NetworkPeer { pub id: String, pub config: crate::node::NodeConfig, pub ws_client: Option>>>>, - pub user_ev_controller: Arc>, - pub receiver_ch: Arc>, + pub user_ev_controller: Arc>, + pub receiver_ch: Arc>, } #[derive(Debug, Serialize, Deserialize)] @@ -34,23 +35,23 @@ pub enum PeerMessage { Info(String), } -type PeerEventSender = Sender<(u32, crate::node::PeerId)>; -type PeerEventReceiver = Receiver<(u32, crate::node::PeerId)>; +type PeerEventSender = Sender<(u32, TransportPublicKey)>; +type PeerEventReceiver = Receiver<(u32, TransportPublicKey)>; impl NetworkPeer { pub async fn new(peer_id: String) -> Result { - let (ws_stream, _) = tokio_tungstenite::connect_async("ws://localhost:3000/ws") + let (ws_stream, _) = tokio_tungstenite::connect_async("ws://localhost:3000/v1/ws") .await .expect("Failed to connect to supervisor"); - let config_url = format!("http://127.0.0.1:3000/config/{}", peer_id); + let config_url = format!("http://127.0.0.1:3000/v1/config/{}", peer_id); let response = reqwest::get(&config_url).await?; let peer_config = response.json::().await?; tracing::debug!(?peer_config.network_listener_port, %peer_config.is_gateway, key = ?peer_config.key_pair.public(), "Received peer config"); let (user_ev_controller, receiver_ch): (PeerEventSender, PeerEventReceiver) = - tokio::sync::watch::channel((0, peer_config.get_peer_id().unwrap())); + tokio::sync::watch::channel((0, peer_config.key_pair.public().clone())); Ok(NetworkPeer { id: peer_id, diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 1d5d6d96c..5854b376f 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -35,6 +35,19 @@ pub(crate) struct ConnectOp { } impl ConnectOp { + pub fn new( + id: Transaction, + state: Option, + gateway: Option>, + backoff: Option, + ) -> Self { + Self { + id, + state, + gateway, + backoff, + } + } pub fn has_backoff(&self) -> bool { self.backoff.is_some() } @@ -147,6 +160,7 @@ impl Operation for ConnectOp { skip_list, }, id, + .. } => { let own_loc = op_manager.ring.connection_manager.own_location(); let PeerKeyLocation { @@ -164,6 +178,7 @@ impl Operation for ConnectOp { tx = %id, query_target = %query_target.peer, joiner = %joiner.peer, + skip_list = ?skip_list, "Got queried for new connections from joiner", ); if let Some(desirable_peer) = op_manager @@ -181,6 +196,7 @@ impl Operation for ConnectOp { *id, &own_loc, joiner, + &desirable_peer, *max_hops_to_live, *max_hops_to_live, skip_list, @@ -212,6 +228,7 @@ impl Operation for ConnectOp { })); let msg = ConnectMsg::Request { id: *id, + target: query_target.clone(), msg: ConnectRequest::FindOptimalPeer { query_target: query_target.clone(), ideal_location: *ideal_location, @@ -224,7 +241,61 @@ impl Operation for ConnectOp { return_msg = None; } } + ConnectMsg::Request { + id, + msg: + ConnectRequest::StartJoinReq { + joiner, + hops_to_live, + max_hops_to_live, + skip_list, // + .. + }, + .. + } => { + let joiner: PeerId = joiner + .clone() + .expect("should be already set at the p2p bridge level"); + let this_peer = op_manager.ring.connection_manager.own_location(); + let assigned_location = Location::from_address(&joiner.addr); + + let new_peer_loc = PeerKeyLocation { + location: Some(assigned_location), + peer: joiner.clone(), + }; + let mut skip_list = skip_list.clone(); + skip_list.push(joiner.clone()); + + if let Some(updated_state) = forward_conn( + *id, + &op_manager.ring.connection_manager, + op_manager.ring.router.clone(), + network_bridge, + (this_peer.clone(), new_peer_loc.clone()), + *hops_to_live, + *max_hops_to_live, + true, + skip_list.clone(), + ) + .await? + { + new_state = Some(updated_state); + } else { + new_state = None + } + + return_msg = Some(ConnectMsg::Response { + id: *id, + sender: this_peer.clone(), + target: new_peer_loc.clone(), + msg: ConnectResponse::AcceptedBy { + accepted: true, + acceptor: this_peer.clone(), + joiner: joiner.clone(), + }, + }); + } ConnectMsg::Request { id, msg: @@ -236,6 +307,7 @@ impl Operation for ConnectOp { skip_list, .. }, + .. } => { //debug_assert_ne!(sender.peer, joiner.peer); if sender.peer == joiner.peer { @@ -283,13 +355,22 @@ impl Operation for ConnectOp { .ok_or(OpError::NotificationError)? .is_ok() { + let was_reserved = { + // reserved just above in call to should_accept + true + }; // Add the connection to the ring op_manager .ring - .add_connection(joiner_loc, joiner.peer.clone()) + .add_connection(joiner_loc, joiner.peer.clone(), was_reserved) .await; true } else { + // If the connection was not completed, prune the reserved connection + op_manager + .ring + .connection_manager + .prune_in_transit_connection(&joiner.peer); false } } else { @@ -313,7 +394,6 @@ impl Operation for ConnectOp { { new_state = Some(updated_state); } else { - tracing::debug!(tx = %id, at = %this_peer.peer, "Rejecting connection from {:?}", joiner); new_state = None } } @@ -374,6 +454,7 @@ impl Operation for ConnectOp { .add_connection( acceptor.location.expect("location not found"), acceptor.peer.clone(), + true, // we reserved the connection to this peer before asking to join ) .await; } else { @@ -558,6 +639,7 @@ where if need_to_clean_gw_conn { let msg = ConnectMsg::Request { id, + target: state.gateway.clone(), msg: ConnectRequest::CleanConnection { joiner }, }; conn_bridge.send(&state.gateway.peer, msg.into()).await?; @@ -568,7 +650,7 @@ where type Requester = PeerKeyLocation; #[derive(Debug)] -pub(crate) enum ConnectState { +pub enum ConnectState { Initializing, ConnectingToNode(ConnectionInfo), AwaitingConnectivity(ConnectivityInfo), @@ -584,7 +666,7 @@ pub(crate) struct ConnectivityInfo { } impl ConnectivityInfo { - fn new(requester: Requester, remaining_checks: usize) -> Self { + pub fn new(requester: Requester, remaining_checks: usize) -> Self { Self { requester, remaining_checks, @@ -658,9 +740,10 @@ where "Attempting to connect to {} gateways in parallel", number_of_parallel_connections ); + // FIXME: we are attempting to connect to gws which are already connected for gateway in op_manager .ring - .is_connected(gateways.iter()) + .is_not_connected(gateways.iter()) .shuffle() .take(number_of_parallel_connections) { @@ -674,7 +757,12 @@ where ) .await { - tracing::error!(%error, "Failed while attempting connection to gateway"); + if !matches!( + error, + OpError::ConnError(crate::node::ConnectionError::UnwantedConnection) + ) { + tracing::error!(%error, "Failed while attempting connection to gateway"); + } } } } @@ -688,7 +776,7 @@ where Ok(()) } -#[tracing::instrument(fields(peer = ?op_manager.ring.connection_manager.get_peer_key()), skip_all)] +#[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)] pub(crate) async fn join_ring_request( backoff: Option, peer_pub_key: TransportPublicKey, @@ -699,16 +787,20 @@ pub(crate) async fn join_ring_request( where CM: NetworkBridge + Send, { - if op_manager.ring.connection_manager.num_connections() > 0 { - use crate::node::ConnectionError; - if !op_manager.ring.connection_manager.should_accept( - gateway.location.unwrap_or_else(Location::random), - Some(&gateway.peer), - ) { - // ensure that we still want to connect AND reserve an spot implicitly - return Err(OpError::ConnError(ConnectionError::FailedConnectOp)); - } + use crate::node::ConnectionError; + if !op_manager.ring.connection_manager.should_accept( + gateway.location.ok_or_else(|| { + tracing::error!( + "Gateway location not found, this should not be possible, report an error" + ); + OpError::ConnError(ConnectionError::LocationUnknown) + })?, + Some(&gateway.peer), + ) { + // ensure that we still want to connect AND reserve an spot implicitly + return Err(OpError::ConnError(ConnectionError::UnwantedConnection)); } + let tx_id = Transaction::new::(); tracing::info!(%gateway.peer, %peer_pub_key, "Attempting network join"); let mut op = initial_request( @@ -806,15 +898,22 @@ where }) .await?; match result.recv().await.ok_or(OpError::NotificationError)? { - Ok(_) => { + Ok(joiner) => { + tracing::debug!( + tx = %id, + gateway = %gateway, + joiner = %joiner, + "Sending connection request to gateway", + ); let join_req = NetMessage::from(messages::ConnectMsg::Request { id: tx, + target: gateway.clone(), msg: ConnectRequest::StartJoinReq { - joiner: None, + joiner: Some(joiner.clone()), joiner_key: peer_pub_key.clone(), hops_to_live: max_hops_to_live, max_hops_to_live, - skip_list: vec![], + skip_list: vec![joiner], }, }); conn_bridge.send(&gateway.peer, join_req).await?; @@ -890,8 +989,15 @@ where skip_list.push(req_peer.peer.clone()); match target_peer { Some(target_peer) => { - let forward_msg = - create_forward_message(id, &req_peer, &joiner, left_htl, max_htl, skip_list); + let forward_msg = create_forward_message( + id, + &req_peer, + &joiner, + &target_peer, + left_htl, + max_htl, + skip_list, + ); tracing::debug!(target: "network", "Forwarding connection request to {:?}", target_peer); network_bridge.send(&target_peer.peer, forward_msg).await?; update_state_with_forward_info(&req_peer, left_htl) @@ -937,12 +1043,14 @@ fn create_forward_message( id: Transaction, request_peer: &PeerKeyLocation, joiner: &PeerKeyLocation, + target: &PeerKeyLocation, hops_to_live: usize, max_hops_to_live: usize, skip_list: Vec, ) -> NetMessage { NetMessage::from(ConnectMsg::Request { id, + target: target.clone(), msg: ConnectRequest::CheckConnectivity { sender: request_peer.clone(), joiner: joiner.clone(), @@ -988,6 +1096,7 @@ mod messages { pub(crate) enum ConnectMsg { Request { id: Transaction, + target: PeerKeyLocation, msg: ConnectRequest, }, Response { @@ -1015,9 +1124,9 @@ mod messages { fn target(&self) -> Option> { use ConnectMsg::*; match self { + Request { target, .. } => Some(target), Response { target, .. } => Some(target), Connected { target, .. } => Some(target), - _ => None, } } @@ -1042,15 +1151,35 @@ mod messages { let id = self.id(); match self { Self::Request { + target, msg: ConnectRequest::StartJoinReq { .. }, .. - } => write!(f, "StartRequest(id: {id})"), + } => write!(f, "StartRequest(id: {id}, target: {target})"), + Self::Request { + target, + msg: ConnectRequest::CheckConnectivity { + sender, + joiner, + .. + }, + .. + } => write!( + f, + "CheckConnectivity(id: {id}, target: {target}, sender: {sender}, joiner: {joiner})" + ), Self::Response { - msg: ConnectResponse::AcceptedBy { accepted, .. }, + target, + msg: + ConnectResponse::AcceptedBy { + accepted, acceptor, .. + }, .. - } => write!(f, "AcceptedBy(id: {id}, accepted: {accepted})"), + } => write!( + f, + "AcceptedBy(id: {id}, target: {target}, accepted: {accepted}, acceptor: {acceptor})" + ), Self::Connected { .. } => write!(f, "Connected(id: {id})"), - ConnectMsg::Request { id, .. } => write!(f, "Request(id: {id})"), + ConnectMsg::Request { id, target, .. } => write!(f, "Request(id: {id}, target: {target})"), } } } @@ -1099,99 +1228,3 @@ mod messages { }, } } - -#[cfg(test)] -mod test { - use std::time::Duration; - - use crate::node::testing_impl::SimNetwork; - - /// Given a network of one node and one gateway test that both are connected. - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn one_node_connects_to_gw() -> anyhow::Result<()> { - const NUM_NODES: usize = 1usize; - const NUM_GW: usize = 1usize; - const MAX_HTL: usize = 1usize; - const RAND_IF_HTL_ABOVE: usize = 1usize; - const MAX_CONNS: usize = 1usize; - const MIN_CONNS: usize = 1usize; - let mut sim_nw = SimNetwork::new( - "join_one_node_connects_to_gw", - NUM_NODES, - NUM_GW, - MAX_HTL, - RAND_IF_HTL_ABOVE, - MAX_CONNS, - MIN_CONNS, - ) - .await; - sim_nw.start().await; - sim_nw.check_connectivity(Duration::from_secs(1))?; - assert!(sim_nw.connected(&"node-1".into())); - Ok(()) - } - - /// Once a gateway is left without remaining open slots, ensure forwarding connects - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn forward_connection_to_node() -> anyhow::Result<()> { - const NUM_NODES: usize = 3usize; - const NUM_GW: usize = 1usize; - const MAX_HTL: usize = 2usize; - const RAND_IF_HTL_ABOVE: usize = 1usize; - const MAX_CONNS: usize = 2usize; - const MIN_CONNS: usize = 1usize; - let mut sim_nw = SimNetwork::new( - "join_forward_connection_to_node", - NUM_GW, - NUM_NODES, - MAX_HTL, - RAND_IF_HTL_ABOVE, - MAX_CONNS, - MIN_CONNS, - ) - .await; - // sim_nw.with_start_backoff(Duration::from_millis(100)); - sim_nw.start().await; - sim_nw.check_connectivity(Duration::from_secs(3))?; - let some_forwarded = sim_nw - .node_connectivity() - .into_iter() - .flat_map(|(_this, (_, conns))| conns.into_keys()) - .any(|c| c.is_node()); - assert!( - some_forwarded, - "didn't find any connection succesfully forwarded" - ); - Ok(()) - } - - /// Given a network of N peers all good connectivity - #[tokio::test(flavor = "multi_thread")] - async fn network_should_achieve_good_connectivity() -> anyhow::Result<()> { - // crate::config::set_logger(); - const NUM_NODES: usize = 10usize; - const NUM_GW: usize = 2usize; - const MAX_HTL: usize = 5usize; - const RAND_IF_HTL_ABOVE: usize = 3usize; - const MAX_CONNS: usize = 4usize; - const MIN_CONNS: usize = 2usize; - let mut sim_nw = SimNetwork::new( - "join_all_nodes_should_connect", - NUM_GW, - NUM_NODES, - MAX_HTL, - RAND_IF_HTL_ABOVE, - MAX_CONNS, - MIN_CONNS, - ) - .await; - sim_nw.start().await; - sim_nw.check_connectivity(Duration::from_secs(10))?; - // wait for a bit so peers can acquire more connections - tokio::time::sleep(Duration::from_secs(3)).await; - sim_nw.network_connectivity_quality()?; - sim_nw.print_network_connections(); - sim_nw.print_ring_distribution(); - Ok(()) - } -} diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 1e29ffc61..2829fd5cc 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -859,170 +859,3 @@ mod messages { } } } - -#[cfg(test)] -mod test { - use freenet_stdlib::client_api::ContractRequest; - use std::{collections::HashMap, time::Duration}; - - use super::*; - use crate::node::testing_impl::{NodeSpecification, SimNetwork}; - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn successful_get_op_between_nodes() -> anyhow::Result<()> { - const NUM_NODES: usize = 1usize; - const NUM_GW: usize = 1usize; - - let bytes = crate::util::test::random_bytes_1kb(); - let mut gen = arbitrary::Unstructured::new(&bytes); - let contract: WrappedContract = gen.arbitrary()?; - let contract_val: WrappedState = gen.arbitrary()?; - let key = *contract.key(); - let get_event = ContractRequest::Get { - key, - fetch_contract: true, - } - .into(); - let node_1 = NodeSpecification { - owned_contracts: vec![], - events_to_generate: HashMap::from_iter([(1, get_event)]), - contract_subscribers: HashMap::new(), - }; - - let gw_0 = NodeSpecification { - owned_contracts: vec![( - ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract)), - contract_val, - false, - )], - events_to_generate: HashMap::new(), - contract_subscribers: HashMap::new(), - }; - - let get_specs = HashMap::from_iter([("node-1".into(), node_1), ("gateway-0".into(), gw_0)]); - - // establish network - let mut sim_nw = SimNetwork::new( - "successful_get_op_between_nodes", - NUM_GW, - NUM_NODES, - 3, - 2, - 4, - 2, - ) - .await; - sim_nw.start_with_spec(get_specs).await; - sim_nw.check_connectivity(Duration::from_secs(3))?; - - // trigger get @ node-0, which does not own the contract - sim_nw - .trigger_event("node-1", 1, Some(Duration::from_secs(1))) - .await?; - assert!(sim_nw.has_got_contract("node-1", &key)); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn contract_not_found() -> anyhow::Result<()> { - const NUM_NODES: usize = 2usize; - const NUM_GW: usize = 1usize; - - let bytes = crate::util::test::random_bytes_1kb(); - let mut gen = arbitrary::Unstructured::new(&bytes); - let contract: WrappedContract = gen.arbitrary()?; - let key = *contract.key(); - - let get_event = ContractRequest::Get { - key, - fetch_contract: false, - } - .into(); - let node_1 = NodeSpecification { - owned_contracts: vec![], - events_to_generate: HashMap::from_iter([(1, get_event)]), - contract_subscribers: HashMap::new(), - }; - - let get_specs = HashMap::from_iter([("node-1".into(), node_1)]); - - // establish network - let mut sim_nw = - SimNetwork::new("get_contract_not_found", NUM_GW, NUM_NODES, 3, 2, 4, 2).await; - sim_nw.start_with_spec(get_specs).await; - sim_nw.check_connectivity(Duration::from_secs(3))?; - - // trigger get @ node-1, which does not own the contract - sim_nw - .trigger_event("node-1", 1, Some(Duration::from_secs(1))) - .await?; - assert!(!sim_nw.has_got_contract("node-1", &key)); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn contract_found_after_retry() -> anyhow::Result<()> { - // crate::config::set_logger(); - const NUM_NODES: usize = 2usize; - const NUM_GW: usize = 1usize; - - let bytes = crate::util::test::random_bytes_1kb(); - let mut gen = arbitrary::Unstructured::new(&bytes); - let contract: WrappedContract = gen.arbitrary()?; - let contract_val: WrappedState = gen.arbitrary()?; - let key = *contract.key(); - - let get_event = ContractRequest::Get { - key, - fetch_contract: false, - } - .into(); - - let node_1 = NodeSpecification { - owned_contracts: vec![], - events_to_generate: HashMap::from_iter([(1, get_event)]), - contract_subscribers: HashMap::new(), - }; - - let node_2 = NodeSpecification { - owned_contracts: vec![( - ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract)), - contract_val, - false, - )], - events_to_generate: HashMap::new(), - contract_subscribers: HashMap::new(), - }; - - let gw_0 = NodeSpecification { - owned_contracts: vec![], - events_to_generate: HashMap::new(), - contract_subscribers: HashMap::new(), - }; - - let get_specs = HashMap::from_iter([ - ("node-1".into(), node_1), - ("node-2".into(), node_2), - ("gateway-0".into(), gw_0), - ]); - - // establish network - let mut sim_nw = SimNetwork::new( - "get_contract_found_after_retry", - NUM_GW, - NUM_NODES, - 3, - 2, - 4, - 2, - ) - .await; - sim_nw.start_with_spec(get_specs).await; - sim_nw.check_connectivity(Duration::from_secs(3))?; - sim_nw - .trigger_event("node-1", 1, Some(Duration::from_secs(1))) - .await?; - assert!(sim_nw.has_got_contract("node-1", &key)); - Ok(()) - } -} diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index f6efb91e5..96122ba9b 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -924,96 +924,3 @@ mod messages { } } } - -#[cfg(test)] -mod test { - use std::{collections::HashMap, time::Duration}; - - use freenet_stdlib::client_api::ContractRequest; - use freenet_stdlib::prelude::*; - - use crate::node::testing_impl::{NodeSpecification, SimNetwork}; - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn successful_put_op_between_nodes() -> anyhow::Result<()> { - const NUM_NODES: usize = 2usize; - const NUM_GW: usize = 1usize; - - let bytes = crate::util::test::random_bytes_1kb(); - let mut gen = arbitrary::Unstructured::new(&bytes); - let contract: WrappedContract = gen.arbitrary()?; - let key = *contract.key(); - let contract_val: WrappedState = gen.arbitrary()?; - let new_value = WrappedState::new(Vec::from_iter(gen.arbitrary::<[u8; 20]>().unwrap())); - - let mut sim_nw = SimNetwork::new( - "successful_put_op_between_nodes", - NUM_GW, - NUM_NODES, - 2, - 1, - 3, - 2, - ) - .await; - let mut locations = sim_nw.get_locations_by_node(); - let node0_loc = locations.remove(&"node-1".into()).unwrap(); - let node1_loc = locations.remove(&"node-2".into()).unwrap(); - - // both own the contract, and one triggers an update - let node_1 = NodeSpecification { - owned_contracts: vec![( - ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract.clone())), - contract_val.clone(), - false, - )], - events_to_generate: HashMap::new(), - contract_subscribers: HashMap::new(), - }; - - let node_2 = NodeSpecification { - owned_contracts: vec![( - ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract.clone())), - contract_val.clone(), - false, - )], - events_to_generate: HashMap::new(), - contract_subscribers: HashMap::new(), - }; - - let put_event = ContractRequest::Put { - contract: ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract.clone())), - state: new_value.clone(), - related_contracts: Default::default(), - } - .into(); - - let gw_0 = NodeSpecification { - owned_contracts: vec![( - ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract.clone())), - contract_val, - false, - )], - events_to_generate: HashMap::from_iter([(1, put_event)]), - contract_subscribers: HashMap::from_iter([(key, vec![node0_loc, node1_loc])]), - }; - - // establish network - let put_specs = HashMap::from_iter([ - ("node-1".into(), node_1), - ("node-2".into(), node_2), - ("gateway-0".into(), gw_0), - ]); - - sim_nw.start_with_spec(put_specs).await; - sim_nw.check_connectivity(Duration::from_secs(3))?; - - // trigger the put op @ gw-0 - sim_nw - .trigger_event("gateway-0", 1, Some(Duration::from_secs(1))) - .await?; - assert!(sim_nw.has_put_contract("gateway-0", &key)); - assert!(sim_nw.event_listener.contract_broadcasted(&key)); - Ok(()) - } -} diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index cbe8ec929..6a87860e6 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -505,69 +505,3 @@ mod messages { } } } - -#[cfg(test)] -mod test { - use std::{collections::HashMap, time::Duration}; - - use freenet_stdlib::client_api::ContractRequest; - - use super::*; - use crate::node::testing_impl::{NodeSpecification, SimNetwork}; - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn successful_subscribe_op_between_nodes() -> anyhow::Result<()> { - const NUM_NODES: usize = 4usize; - const NUM_GW: usize = 1usize; - - let bytes = crate::util::test::random_bytes_1kb(); - let mut gen = arbitrary::Unstructured::new(&bytes); - let contract: WrappedContract = gen.arbitrary()?; - let contract_val: WrappedState = gen.arbitrary()?; - let contract_key: ContractKey = *contract.key(); - - let event = ContractRequest::Subscribe { - key: contract_key, - summary: None, - } - .into(); - let first_node = NodeSpecification { - owned_contracts: vec![( - ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract)), - contract_val, - true, - )], - events_to_generate: HashMap::new(), - contract_subscribers: HashMap::new(), - }; - let second_node = NodeSpecification { - owned_contracts: Vec::new(), - events_to_generate: HashMap::from_iter([(1, event)]), - contract_subscribers: HashMap::new(), - }; - - let subscribe_specs = HashMap::from_iter([ - ("node-1".into(), first_node), - ("node-2".into(), second_node), - ]); - let mut sim_nw = SimNetwork::new( - "successful_subscribe_op_between_nodes", - NUM_GW, - NUM_NODES, - 4, - 3, - 5, - 2, - ) - .await; - sim_nw.start_with_spec(subscribe_specs).await; - sim_nw.check_connectivity(Duration::from_secs(3))?; - sim_nw - .trigger_event("node-2", 1, Some(Duration::from_secs(1))) - .await?; - assert!(sim_nw.has_got_contract("node-2", &contract_key)); - tokio::time::sleep(Duration::from_secs(3)).await; - assert!(sim_nw.is_subscribed_to_contract("node-2", &contract_key)); - Ok(()) - } -} diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index c8df75159..d07f7d5e6 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -5,7 +5,7 @@ use std::collections::VecDeque; use std::hash::Hash; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::{ cmp::Reverse, collections::BTreeMap, @@ -322,7 +322,7 @@ impl Ring { pub fn open_connections(&self) -> usize { self.connection_manager .open_connections - .load(std::sync::atomic::Ordering::Acquire) + .load(std::sync::atomic::Ordering::SeqCst) } async fn refresh_router(router: Arc>, register: ER) { @@ -426,8 +426,31 @@ impl Ring { .record_request(recipient, target, request_type); } - pub async fn add_connection(&self, loc: Location, peer: PeerId) { - tracing::info!(%peer, this = ?self.connection_manager.get_peer_key(), "Adding connection to peer"); + pub async fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { + tracing::info!(%peer, this = ?self.connection_manager.get_peer_key(), %was_reserved, "Adding connection to peer"); + debug_assert!( + self.connection_manager + .get_peer_key() + .expect("should be set") + != peer + ); + if was_reserved { + let old = self + .connection_manager + .reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + #[cfg(debug_assertions)] + { + tracing::debug!(old, "Decremented reserved connections"); + if old == 0 { + panic!("Underflow of reserved connections"); + } + } + let _ = old; + } + self.connection_manager + .open_connections + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); self.event_register .register_events(Either::Left(NetEventLog::connected( self, @@ -457,7 +480,8 @@ impl Ring { let _ = topology_manager.refresh_cache(&cbl); } - pub fn is_connected<'a>( + /// Returns a filtered iterator for peers that are not connected to this node already. + pub fn is_not_connected<'a>( &self, peers: impl Iterator, ) -> impl Iterator + Send { @@ -531,13 +555,10 @@ impl Ring { } pub async fn prune_connection(&self, peer: PeerId) { - #[cfg(debug_assertions)] - { - tracing::info!(%peer, "Removing connection"); - } + tracing::debug!(%peer, "Removing connection"); self.live_tx_tracker.prune_transactions_from_peer(&peer); // This case would be when a connection is being open, so peer location hasn't been recorded yet and we can ignore everything below - let Some(loc) = self.connection_manager.prune_connection(&peer) else { + let Some(loc) = self.connection_manager.prune_alive_connection(&peer) else { return; }; { @@ -716,7 +737,7 @@ impl Ring { } } - #[tracing::instrument(level = "debug", skip(self, notifier))] + #[tracing::instrument(level = "debug", skip(self, notifier), fields(peer = %self.connection_manager.pub_key))] async fn acquire_new( &self, ideal_location: Location, @@ -743,14 +764,20 @@ impl Ring { "Adding new connections" ); let missing_connections = self.connection_manager.max_connections - self.open_connections(); + let connected = self.connection_manager.connected_peers(); let msg = connect::ConnectMsg::Request { id: Transaction::new::(), + target: query_target.clone(), msg: connect::ConnectRequest::FindOptimalPeer { query_target, ideal_location, joiner, max_hops_to_live: missing_connections, - skip_list: skip_list.iter().map(|p| (*p).clone()).collect(), + skip_list: skip_list + .iter() + .map(|p| (*p).clone()) + .chain(connected) + .collect(), }, }; let id = *msg.id(); @@ -765,16 +792,17 @@ impl Ring { pub struct Location(f64); impl Location { + #[cfg(not(feature = "local-simulation"))] pub fn from_address(addr: &SocketAddr) -> Self { match addr.ip() { - IpAddr::V4(ipv4) => { + std::net::IpAddr::V4(ipv4) => { let octets = ipv4.octets(); let combined_octets = (u32::from(octets[0]) << 16) | (u32::from(octets[1]) << 8) | u32::from(octets[2]); Location(combined_octets as f64 / (u32::MAX as f64)) } - IpAddr::V6(ipv6) => { + std::net::IpAddr::V6(ipv6) => { let segments = ipv6.segments(); let combined_segments = (u64::from(segments[0]) << 32) | (u64::from(segments[1]) << 16) @@ -784,6 +812,12 @@ impl Location { } } + #[cfg(feature = "local-simulation")] + pub fn from_address(_addr: &SocketAddr) -> Self { + let random_component: f64 = rand::random(); + Location(random_component) + } + pub fn new(location: f64) -> Self { debug_assert!( (0.0..=1.0).contains(&location), diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 63a408283..9e7595c09 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -7,6 +7,7 @@ use super::*; #[derive(Clone)] pub(crate) struct ConnectionManager { pub(super) open_connections: Arc, + pub(super) reserved_connections: Arc, pub(super) location_for_peer: Arc>>, pub(super) topology_manager: Arc>, pub(super) connections_by_location: Arc>>>, @@ -16,9 +17,8 @@ pub(crate) struct ConnectionManager { pub(super) peer_key: Arc>>, pub min_connections: usize, pub max_connections: usize, - pub max_hops_to_live: usize, pub rnd_if_htl_above: usize, - pub_key: Arc, + pub pub_key: Arc, } #[cfg(test)] @@ -28,7 +28,6 @@ impl ConnectionManager { let max_connections = Ring::DEFAULT_MAX_CONNECTIONS; let max_upstream_bandwidth = Ring::DEFAULT_MAX_UPSTREAM_BANDWIDTH; let max_downstream_bandwidth = Ring::DEFAULT_MAX_DOWNSTREAM_BANDWIDTH; - let max_hops_to_live = Ring::DEFAULT_MAX_HOPS_TO_LIVE; let rnd_if_htl_above = Ring::DEFAULT_RAND_WALK_ABOVE_HTL; Self::init( @@ -36,9 +35,9 @@ impl ConnectionManager { max_downstream_bandwidth, min_connections, max_connections, - max_hops_to_live, rnd_if_htl_above, pub_key, + None, ) } @@ -75,12 +74,6 @@ impl ConnectionManager { Ring::DEFAULT_MAX_DOWNSTREAM_BANDWIDTH }; - let max_hops_to_live = if let Some(v) = config.max_hops_to_live { - v - } else { - Ring::DEFAULT_MAX_HOPS_TO_LIVE - }; - let rnd_if_htl_above = if let Some(v) = config.rnd_if_htl_above { v } else { @@ -92,9 +85,9 @@ impl ConnectionManager { max_downstream_bandwidth, min_connections, max_connections, - max_hops_to_live, rnd_if_htl_above, config.key_pair.public().clone(), + config.peer_id.clone(), ) } @@ -103,12 +96,18 @@ impl ConnectionManager { max_downstream_bandwidth: Rate, min_connections: usize, max_connections: usize, - max_hops_to_live: usize, rnd_if_htl_above: usize, pub_key: TransportPublicKey, + peerid: Option, ) -> Self { - // for location here consider -1 == None - let own_location = AtomicU64::new(u64::from_le_bytes((-1f64).to_le_bytes())); + let own_location = if let Some(peer_key) = &peerid { + // if the peer id is set, then the location must be set, since it is a gateway + let location = Location::from_address(&peer_key.addr); + AtomicU64::new(u64::from_le_bytes(location.0.to_le_bytes())) + } else { + // for location here consider -1 == None + AtomicU64::new(u64::from_le_bytes((-1f64).to_le_bytes())) + }; let topology_manager = Arc::new(RwLock::new(TopologyManager::new(Limits { max_upstream_bandwidth, @@ -121,12 +120,12 @@ impl ConnectionManager { connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), open_connections: Arc::new(AtomicUsize::new(0)), + reserved_connections: Arc::new(AtomicUsize::new(0)), topology_manager, own_location: own_location.into(), - peer_key: Arc::new(Mutex::new(None)), + peer_key: Arc::new(Mutex::new(peerid)), min_connections, max_connections, - max_hops_to_live, rnd_if_htl_above, pub_key: Arc::new(pub_key), } @@ -139,14 +138,24 @@ impl ConnectionManager { /// Will panic if the node checking for this condition has no location assigned. // FIXME: peer here should not be optional ever pub fn should_accept(&self, location: Location, peer: Option<&PeerId>) -> bool { - let open_conn = self + tracing::debug!("Checking if should accept connection"); + let open = self .open_connections - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + .load(std::sync::atomic::Ordering::SeqCst); + let total_conn = self + .reserved_connections + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + + open; + + if open == 0 { + // if this is the first connection, then accept it + return true; + } if let Some(peer_id) = peer { if self.location_for_peer.read().get(peer_id).is_some() { // avoid connecting more than once to the same peer - self.open_connections + self.reserved_connections .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); tracing::debug!(%peer_id, "Peer already connected"); return false; @@ -161,9 +170,9 @@ impl ConnectionManager { || self.connections_by_location.read().contains_key(&location) { false - } else if open_conn < self.min_connections { + } else if total_conn < self.min_connections { true - } else if open_conn >= self.max_connections { + } else if total_conn >= self.max_connections { false } else { self.topology_manager @@ -172,9 +181,10 @@ impl ConnectionManager { .unwrap_or(true) }; if !accepted { - self.open_connections + self.reserved_connections .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } else if let Some(peer_id) = peer { + tracing::debug!(%peer_id, "Accepted connection, reserving spot"); self.location_for_peer .write() .insert(peer_id.clone(), location); @@ -232,20 +242,44 @@ impl ConnectionManager { } } - pub fn prune_connection(&self, peer: &PeerId) -> Option { + pub fn prune_alive_connection(&self, peer: &PeerId) -> Option { + self.prune_connection(peer, true) + } + + pub fn prune_in_transit_connection(&self, peer: &PeerId) -> Option { + self.prune_connection(peer, false) + } + + fn prune_connection(&self, peer: &PeerId, is_alive: bool) -> Option { + let connection_type = if is_alive { "active" } else { "in transit" }; + tracing::debug!(%peer, "Pruning {} connection", connection_type); + let Some(loc) = self.location_for_peer.write().remove(&peer) else { - self.open_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + if is_alive { + self.open_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } else { + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } return None; }; + let conns = &mut *self.connections_by_location.write(); if let Some(conns) = conns.get_mut(&loc) { if let Some(pos) = conns.iter().position(|c| &c.location.peer == peer) { conns.swap_remove(pos); } } - self.open_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + + if is_alive { + self.open_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } else { + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } + Some(loc) } @@ -304,4 +338,9 @@ impl ConnectionManager { pub fn num_connections(&self) -> usize { self.connections_by_location.read().len() } + + pub(super) fn connected_peers<'a>(&'a self) -> impl Iterator { + let read = self.location_for_peer.read(); + read.keys().cloned().collect::>().into_iter() + } } diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index e72f5a8be..49fb489ec 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -957,7 +957,7 @@ pub(crate) mod tracer { let disabled_logs = std::env::var("FREENET_DISABLE_LOGS").is_ok(); let to_stderr = std::env::var("FREENET_LOG_TO_STDERR").is_ok(); let layers = { - let fmt_layer = tracing_subscriber::fmt::layer().with_level(true); + let fmt_layer = tracing_subscriber::fmt::layer().with_level(true).pretty(); let fmt_layer = if cfg!(any(test, debug_assertions)) { fmt_layer.with_file(true).with_line_number(true) } else { @@ -1036,13 +1036,13 @@ pub(super) mod test { }; use super::*; - use crate::{node::testing_impl::NodeLabel, ring::Distance}; + use crate::{node::testing_impl::NodeLabel, ring::Distance, transport::TransportPublicKey}; static LOG_ID: AtomicUsize = AtomicUsize::new(0); #[derive(Clone)] pub(crate) struct TestEventListener { - node_labels: Arc>, + node_labels: Arc>, tx_log: Arc>>, logs: Arc>>, network_metrics_server: @@ -1061,114 +1061,25 @@ pub(super) mod test { } } - pub fn add_node(&mut self, label: NodeLabel, peer: PeerId) { + pub fn add_node(&mut self, label: NodeLabel, peer: TransportPublicKey) { self.node_labels.insert(label, peer); } - pub fn is_connected(&self, peer: &PeerId) -> bool { + pub fn is_connected(&self, peer: &TransportPublicKey) -> bool { let Ok(logs) = self.logs.try_lock() else { return false; }; logs.iter().any(|log| { - &log.peer_id == peer + &log.peer_id.pub_key == peer && matches!(log.kind, EventKind::Connect(ConnectEvent::Connected { .. })) }) } - pub fn has_put_contract(&self, peer: &PeerId, for_key: &ContractKey) -> bool { - let Ok(logs) = self.logs.try_lock() else { - return false; - }; - let put_ops = logs.iter().filter_map(|l| match &l.kind { - EventKind::Put(ev) => Some((&l.tx, ev)), - _ => None, - }); - let put_ops: HashMap<_, Vec<_>> = put_ops.fold(HashMap::new(), |mut acc, (id, ev)| { - acc.entry(id).or_default().push(ev); - acc - }); - - for (_tx, events) in put_ops { - let mut is_expected_key = false; - let mut is_expected_peer = false; - for ev in events { - match ev { - PutEvent::Request { key, .. } if key != for_key => break, - PutEvent::Request { key, .. } if key == for_key => { - is_expected_key = true; - } - PutEvent::PutSuccess { requester, .. } if requester == peer => { - is_expected_peer = true; - } - _ => {} - } - } - if is_expected_peer && is_expected_key { - return true; - } - } - false - } - - /// The contract was broadcasted from one peer to an other successfully. - #[cfg(test)] - pub fn contract_broadcasted(&self, for_key: &ContractKey) -> bool { - let Ok(logs) = self.logs.try_lock() else { - return false; - }; - let put_broadcast_ops = logs.iter().filter_map(|l| match &l.kind { - EventKind::Put(ev @ PutEvent::BroadcastEmitted { .. }) - | EventKind::Put(ev @ PutEvent::BroadcastReceived { .. }) => Some((&l.tx, ev)), - _ => None, - }); - let put_broadcast_by_tx: HashMap<_, Vec<_>> = - put_broadcast_ops.fold(HashMap::new(), |mut acc, (id, ev)| { - acc.entry(id).or_default().push(ev); - acc - }); - for (_tx, events) in put_broadcast_by_tx { - let mut was_emitted = false; - let mut was_received = false; - for ev in events { - match ev { - PutEvent::BroadcastEmitted { key, .. } if key == for_key => { - was_emitted = true; - } - PutEvent::BroadcastReceived { key, .. } if key == for_key => { - was_received = true; - } - _ => {} - } - } - if was_emitted && was_received { - return true; - } - } - false - } - - pub fn has_got_contract(&self, peer: &PeerId, expected_key: &ContractKey) -> bool { - let Ok(logs) = self.logs.try_lock() else { - return false; - }; - logs.iter().any(|log| { - &log.peer_id == peer - && matches!(log.kind, EventKind::Get { ref key } if key == expected_key ) - }) - } - - pub fn is_subscribed_to_contract(&self, peer: &PeerId, expected_key: &ContractKey) -> bool { - let Ok(logs) = self.logs.try_lock() else { - return false; - }; - logs.iter().any(|log| { - &log.peer_id == peer - && matches!(log.kind, EventKind::Subscribed { ref key, .. } if key == expected_key ) - }) - } - /// Unique connections for a given peer and their relative distance to other peers. - pub fn connections(&self, peer: PeerId) -> Box> { + pub fn connections( + &self, + key: &TransportPublicKey, + ) -> Box> { let Ok(logs) = self.logs.try_lock() else { return Box::new([].into_iter()); }; @@ -1193,7 +1104,7 @@ pub(super) mod test { .flat_map(|dcs| dcs.iter()) .any(|dc| dc > &l.datetime); if let Some((this_loc, conn_loc)) = this.location.zip(connected.location) { - if this.peer == peer && !disconnected { + if &this.peer.pub_key == key && !disconnected { return Some((connected.peer.clone(), conn_loc.distance(this_loc))); } } @@ -1299,7 +1210,7 @@ pub(super) mod test { futures::future::join_all(futs).await; - let distances: Vec<_> = listener.connections(peer_id).collect(); + let distances: Vec<_> = listener.connections(&peer_id.pub_key).collect(); assert!(distances.len() == 3); assert!( (distances.iter().map(|(_, l)| l.as_f64()).sum::() - 0.5f64).abs() < f64::EPSILON diff --git a/crates/core/src/transport.rs b/crates/core/src/transport.rs index 102b384e1..d2638bf7b 100644 --- a/crates/core/src/transport.rs +++ b/crates/core/src/transport.rs @@ -24,7 +24,7 @@ type PacketId = u32; use self::peer_connection::StreamId; -pub use self::crypto::TransportKeypair; +pub use self::crypto::{TransportKeypair, TransportPublicKey}; #[cfg(test)] pub(crate) use self::{ connection_handler::ConnectionEvent, @@ -36,7 +36,6 @@ pub(crate) use self::{ connection_handler::{ create_connection_handler, InboundConnectionHandler, OutboundConnectionHandler, }, - crypto::TransportPublicKey, peer_connection::PeerConnection, }; diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index bc0d43b92..5972b0bbf 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -17,6 +17,7 @@ use futures::{FutureExt, TryFutureExt}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot}; use tokio::task; +use tracing::{span, Instrument}; use super::{ crypto::{TransportKeypair, TransportPublicKey}, @@ -246,13 +247,16 @@ impl Drop for UdpPacketsListener { } impl UdpPacketsListener { + #[tracing::instrument(level = "debug", name = "transport_listener", fields(peer = %self.this_peer_keypair.public), skip_all)] async fn listen(mut self) -> Result<(), TransportError> { let mut buf = [0u8; MAX_PACKET_SIZE]; let mut ongoing_connections: BTreeMap = BTreeMap::new(); - let mut gw_ongoing_connections: BTreeMap = BTreeMap::new(); + let mut ongoing_gw_connections: BTreeMap< + SocketAddr, + mpsc::Sender>, + > = BTreeMap::new(); let mut connection_tasks = FuturesUnordered::new(); let mut gw_connection_tasks = FuturesUnordered::new(); - let (gw_outbound_tx, mut gw_inbound_rx) = tokio::sync::mpsc::channel(100); loop { tokio::select! { // Handling of inbound packets @@ -266,6 +270,12 @@ impl UdpPacketsListener { continue; } + if let Some(inbound_packet_sender) = ongoing_gw_connections.remove(&remote_addr){ + let _ = inbound_packet_sender.send(packet_data).await; + ongoing_gw_connections.insert(remote_addr, inbound_packet_sender); + continue; + } + if let Some((packets_sender, open_connection)) = ongoing_connections.remove(&remote_addr) { if packets_sender.send(packet_data).await.is_err() { // it can happen that the connection is established but the channel is closed because the task completed @@ -281,10 +291,13 @@ impl UdpPacketsListener { continue; } let packet_data = PacketData::from_buf(&buf[..size]); - let gw_ongoing_connection = self.gateway_connection(packet_data, remote_addr, gw_outbound_tx.clone()); - let task = tokio::spawn(gw_ongoing_connection.map_err(move |error| { + let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr); + let task = tokio::spawn(gw_ongoing_connection + .instrument(tracing::span!(tracing::Level::DEBUG, "gateway_connection")) + .map_err(move |error| { (error, remote_addr) })); + ongoing_gw_connections.insert(remote_addr, packets_sender); gw_connection_tasks.push(task); } Err(e) => { @@ -294,17 +307,6 @@ impl UdpPacketsListener { } } }, - req = gw_inbound_rx.recv() => { - let Some(GatewayMessage { remote_addr, packet, resp_tx }) = req else { - unreachable!(); - }; - - if let Some(remote) = self.remote_connections.remove(&remote_addr) { - let _ = remote.inbound_packet_sender.send(packet).await; - self.remote_connections.insert(remote_addr, remote); - let _ = resp_tx.send(true); - } - } gw_connection_handshake = gw_connection_tasks.next(), if !gw_connection_tasks.is_empty() => { let Some(res): GwOngoingConnectionResult = gw_connection_handshake else { unreachable!(); @@ -312,6 +314,7 @@ impl UdpPacketsListener { match res.expect("task shouldn't panic") { Ok((outbound_remote_conn, inbound_remote_connection, outbound_ack_packet)) => { let remote_addr = outbound_remote_conn.remote_addr; + ongoing_gw_connections.remove(&remote_addr); let sent_tracker = outbound_remote_conn.sent_tracker.clone(); self.remote_connections.insert(remote_addr, inbound_remote_connection); @@ -331,9 +334,8 @@ impl UdpPacketsListener { } Err((error, remote_addr)) => { tracing::error!(%error, ?remote_addr, "Failed to establish gateway connection"); - if let Some((_, result_sender)) = gw_ongoing_connections.remove(&remote_addr) { - let _ = result_sender.send(Err(error)); - } + ongoing_gw_connections.remove(&remote_addr); + ongoing_connections.remove(&remote_addr); } } } @@ -344,13 +346,13 @@ impl UdpPacketsListener { match res.expect("task shouldn't panic") { Ok((outbound_remote_conn, inbound_remote_connection)) => { if let Some((_, result_sender)) = ongoing_connections.remove(&outbound_remote_conn.remote_addr) { - tracing::debug!(%outbound_remote_conn.remote_addr, "connection established"); + tracing::debug!(remote_addr = %outbound_remote_conn.remote_addr, "connection established"); self.remote_connections.insert(outbound_remote_conn.remote_addr, inbound_remote_connection); let _ = result_sender.send(Ok(outbound_remote_conn)).map_err(|_| { tracing::error!("failed sending back peer connection"); }); } else { - tracing::error!(%outbound_remote_conn.remote_addr, "connection established but no ongoing connection found"); + tracing::error!(remote_addr = %outbound_remote_conn.remote_addr, "connection established but no ongoing connection found"); } } Err((error, remote_addr)) => { @@ -364,6 +366,9 @@ impl UdpPacketsListener { // Handling of connection events connection_event = self.connection_handler.recv() => { let Some((remote_addr, event)) = connection_event else { return Ok(()); }; + if let Some(_conn) = self.remote_connections.remove(&remote_addr) { + tracing::warn!(%remote_addr, "connection already established, dropping old connection"); + } let ConnectionEvent::ConnectionStart { remote_public_key, open_connection } = event; tracing::debug!(%remote_addr, "attempting to establish connection"); let (ongoing_connection, packets_sender) = self.traverse_nat( @@ -371,7 +376,7 @@ impl UdpPacketsListener { ); let task = tokio::spawn(ongoing_connection.map_err(move |error| { (error, remote_addr) - })); + }).instrument(span!(tracing::Level::DEBUG, "traverse_nat"))); connection_tasks.push(task); ongoing_connections.insert(remote_addr, (packets_sender, open_connection)); }, @@ -383,23 +388,26 @@ impl UdpPacketsListener { &mut self, remote_intro_packet: PacketData, remote_addr: SocketAddr, - outbound_tx: mpsc::Sender, - ) -> impl Future< - Output = Result< - ( - RemoteConnection, - InboundRemoteConnection, - PacketData, - ), - TransportError, - >, - > + Send - + 'static { + ) -> ( + impl Future< + Output = Result< + ( + RemoteConnection, + InboundRemoteConnection, + PacketData, + ), + TransportError, + >, + > + Send + + 'static, + mpsc::Sender>, + ) { let secret = self.this_peer_keypair.secret.clone(); let outbound_packets = self.outbound_packets.clone(); - let socket_listener = self.socket_listener.clone(); - async move { + let (inbound_from_remote, mut next_inbound) = + mpsc::channel::>(1); + let f = async move { let decrypted_intro_packet = secret.decrypt(remote_intro_packet.data()).map_err(|err| { tracing::debug!(%remote_addr, %err, "Failed to decrypt intro packet"); @@ -433,7 +441,6 @@ impl UdpPacketsListener { let outbound_ack_packet = SymmetricMessage::ack_ok(&outbound_key, inbound_key_bytes, remote_addr)?; - let mut buf = [0u8; MAX_PACKET_SIZE]; let mut waiting_time = INITIAL_INTERVAL; let mut attempts = 0; const MAX_ATTEMPTS: usize = 30; @@ -445,34 +452,9 @@ impl UdpPacketsListener { .map_err(|_| TransportError::ChannelClosed)?; // wait until the remote sends the ack packet - let timeout = - tokio::time::timeout(waiting_time, socket_listener.recv_from(&mut buf)); + let timeout = tokio::time::timeout(waiting_time, next_inbound.recv()); match timeout.await { - Ok(Ok((size, remote))) => { - let packet: PacketData = - PacketData::from_buf(&buf[..size]); - - let mut should_continue = false; - - if remote != remote_addr { - let (tx, rx) = tokio::sync::oneshot::channel(); - outbound_tx - .send(GatewayMessage { - remote_addr, - packet: packet.clone(), - resp_tx: tx, - }) - .await - .map_err(|_| TransportError::ChannelClosed)?; - - should_continue = - rx.await.map_err(|_| TransportError::ChannelClosed)?; - } - - if should_continue { - continue; - } - + Ok(Some(packet)) => { let _ = packet.try_decrypt_sym(&inbound_key).map_err(|_| { tracing::debug!(%remote_addr, "Failed to decrypt packet with inbound key"); TransportError::ConnectionEstablishmentFailure { @@ -480,8 +462,11 @@ impl UdpPacketsListener { } })?; } - Ok(Err(_)) => { - return Err(TransportError::ChannelClosed); + Ok(None) => { + tracing::debug!(%remote_addr, "connection timed out"); + return Err(TransportError::ConnectionEstablishmentFailure { + cause: "connection close".into(), + }); } Err(_) => { attempts += 1; @@ -522,7 +507,8 @@ impl UdpPacketsListener { tracing::debug!("returning connection at gw"); Ok((remote_conn, inbound_conn, outbound_ack_packet)) - } + }; + (f, inbound_from_remote) } // TODO: this value should be set given exponential backoff and max timeout @@ -660,6 +646,10 @@ impl UdpPacketsListener { { tracing::debug!(%remote_addr, ?symmetric_message.payload, "received symmetric packet"); } + #[cfg(not(test))] + { + tracing::trace!(%remote_addr, "received symmetric packet"); + } match symmetric_message.payload { SymmetricMessagePayload::AckConnection { @@ -689,6 +679,7 @@ impl UdpPacketsListener { .await .map_err(|_| TransportError::ChannelClosed)?; let (inbound_sender, inbound_recv) = mpsc::channel(100); + tracing::debug!(%remote_addr, "connection established"); return Ok(( RemoteConnection { outbound_packets: outbound_packets.clone(), @@ -789,7 +780,7 @@ impl UdpPacketsListener { } Err(_) => { failures += 1; - tracing::debug!(%this_addr, %remote_addr, "failed to receive UDP response, time out"); + tracing::debug!(%this_addr, %remote_addr, "failed to receive UDP response in time, retrying"); } } diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 70964b98a..ea5063f43 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -20,10 +20,9 @@ impl TransportKeypair { // Key size, can be adjusted const BITS: usize = 2048; let priv_key = RsaPrivateKey::new(&mut rng, BITS).expect("failed to generate a key"); - let pub_key = RsaPublicKey::from(&priv_key); - + let public = TransportPublicKey(RsaPublicKey::from(&priv_key)); TransportKeypair { - public: TransportPublicKey(pub_key), + public, secret: TransportSecretKey(priv_key), } } @@ -44,7 +43,7 @@ impl TransportKeypair { } } -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Hash)] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Hash)] pub struct TransportPublicKey(RsaPublicKey); impl TransportPublicKey { @@ -57,21 +56,26 @@ impl TransportPublicKey { } } +impl std::fmt::Debug for TransportPublicKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(&self, f) + } +} + impl std::fmt::Display for TransportPublicKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { use pkcs8::EncodePublicKey; let encoded = self.0.to_public_key_der().map_err(|_| std::fmt::Error)?; - write!( - f, - "{}", - bs58::encode(if encoded.as_bytes().len() > 16 { - &encoded.as_bytes()[..16] - } else { - encoded.as_bytes() - }) - .into_string() - ) + if encoded.as_bytes().len() >= 16 { + let bytes = encoded.as_bytes(); + let first_six = &bytes[..6]; + let last_six = &bytes[bytes.len() - 6..]; + let to_encode = [first_six, last_six].concat(); + write!(f, "{}", bs58::encode(to_encode).into_string()) + } else { + write!(f, "{}", bs58::encode(encoded.as_bytes()).into_string()) + } } } diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index bfd9b7bae..3da0c2f1e 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -12,6 +12,7 @@ use futures::{Future, StreamExt}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::task::JoinHandle; +use tracing::{instrument, span, Instrument}; mod inbound_stream; mod outbound_stream; @@ -179,6 +180,7 @@ impl PeerConnection { ) } + #[instrument(name = "peer_connection", skip_all)] pub async fn send(&mut self, data: T) -> Result where T: Serialize + Send + 'static, @@ -187,15 +189,16 @@ impl PeerConnection { .await .unwrap(); if data.len() + SymmetricMessage::short_message_overhead() > MAX_DATA_SIZE { - tracing::debug!("sending as stream"); + tracing::trace!("sending as stream"); self.outbound_stream(data).await; } else { - tracing::debug!("sending as short message"); + tracing::trace!("sending as short message"); self.outbound_short_message(data).await?; } Ok(()) } + #[instrument(name = "peer_connection", skip(self))] pub async fn recv(&mut self) -> Result> { // listen for incoming messages or receipts or wait until is time to do anything else again let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_secs(1))); @@ -427,15 +430,18 @@ impl PeerConnection { async fn outbound_stream(&mut self, data: SerializedMessage) { let stream_id = StreamId::next(); - let task = tokio::spawn(outbound_stream::send_stream( - stream_id, - self.remote_conn.last_packet_id.clone(), - self.remote_conn.outbound_packets.clone(), - self.remote_conn.remote_addr, - data, - self.remote_conn.outbound_symmetric_key.clone(), - self.remote_conn.sent_tracker.clone(), - )); + let task = tokio::spawn( + outbound_stream::send_stream( + stream_id, + self.remote_conn.last_packet_id.clone(), + self.remote_conn.outbound_packets.clone(), + self.remote_conn.remote_addr, + data, + self.remote_conn.outbound_symmetric_key.clone(), + self.remote_conn.sent_tracker.clone(), + ) + .instrument(span!(tracing::Level::DEBUG, "outbound_stream")), + ); self.outbound_stream_futures.push(task); } } diff --git a/crates/fdev/Cargo.toml b/crates/fdev/Cargo.toml index c089b44eb..c6566ca0e 100644 --- a/crates/fdev/Cargo.toml +++ b/crates/fdev/Cargo.toml @@ -37,7 +37,7 @@ reqwest = { version = "0.12", features = ["json"] } http = "1.1" # internal -freenet = { path = "../core" } +freenet = { path = "../core", features = ["local-simulation"]} freenet-stdlib = { workspace = true } [features] diff --git a/crates/fdev/src/testing.rs b/crates/fdev/src/testing.rs index c840d4943..89588e828 100644 --- a/crates/fdev/src/testing.rs +++ b/crates/fdev/src/testing.rs @@ -168,3 +168,38 @@ async fn config_sim_network(base_config: &TestConfig) -> anyhow::Result>(); + dbg!(keys); + } +} diff --git a/crates/fdev/src/testing/network.rs b/crates/fdev/src/testing/network.rs index edfbeda16..2ff4becf8 100644 --- a/crates/fdev/src/testing/network.rs +++ b/crates/fdev/src/testing/network.rs @@ -21,8 +21,8 @@ use axum::{ Router, }; use freenet::dev_tool::{ - EventChain, MemoryEventsGen, NetworkEventGenerator, NetworkPeer, NodeConfig, NodeLabel, PeerId, - PeerMessage, PeerStatus, SimNetwork, + EventChain, MemoryEventsGen, NetworkEventGenerator, NetworkPeer, NodeConfig, NodeLabel, + PeerMessage, PeerStatus, SimNetwork, TransportPublicKey, }; use futures::{ stream::{SplitSink, SplitStream}, @@ -178,17 +178,13 @@ async fn start_supervisor(config: &TestConfig) -> anyhow::Result<(), Error> { } async fn start_child(config: &TestConfig, cmd_config: &NetworkProcessConfig) -> Result<(), Error> { - std::env::set_var( - "FREENET_PEER_ID", - cmd_config.clone().id.expect("id should be set"), - ); - freenet::config::set_logger(None, None); - if let Some(peer_id) = &cmd_config.id { - let peer = NetworkPeer::new(peer_id.clone()).await?; - peer.run(config, peer_id.clone()).await?; - } else { + let Some(peer_id) = &cmd_config.id else { bail!("Peer id not set"); - } + }; + std::env::set_var("FREENET_PEER_ID", peer_id); + freenet::config::set_logger(None, None); + let peer = NetworkPeer::new(peer_id.clone()).await?; + peer.run(config, peer_id.clone()).await?; Ok(()) } @@ -224,11 +220,11 @@ pub async fn run_network( supervisor.start_peer_gateways(&cmd_args).await?; supervisor.start_peer_nodes(&cmd_args).await?; - let peers: Vec<(NodeLabel, PeerId)> = supervisor + let peers: Vec<_> = supervisor .get_all_peers() .await .into_iter() - .map(|(label, config)| (label.clone(), config.get_peer_id().unwrap())) + .map(|(label, config)| (label.clone(), config.key_pair.public().clone())) .collect(); let events_sender = supervisor.user_ev_controller.lock().await.clone(); @@ -376,7 +372,7 @@ async fn handle_outgoing_messages( ) -> anyhow::Result<()> { let mut event_rx = supervisor.event_rx.lock().await; while let Some((event, peer_id)) = event_rx.recv().await { - tracing::info!("Received event {} for peer {}", event, peer_id); + tracing::info!("Sending event {} to peer {}", event, peer_id); let serialized_msg: Vec = bincode::serialize(&(event, peer_id.clone())) .map_err(|e| anyhow!("Failed to serialize message: {}", e))?; @@ -463,11 +459,11 @@ async fn handle_peer_message( pub struct Supervisor { peers_config: Arc>>, - processes: Mutex>, + processes: Mutex>, waiting_peers: Arc>>, waiting_gateways: Arc>>, - user_ev_controller: Arc>>, - event_rx: Arc>>, + user_ev_controller: Arc>>, + event_rx: Arc>>, } impl Supervisor { @@ -495,7 +491,7 @@ impl Supervisor { self.processes .lock() .await - .insert(config.get_peer_id().unwrap(), process); + .insert(config.key_pair.public().clone(), process); Ok(()) } @@ -601,7 +597,7 @@ pub trait Runnable { impl Runnable for NetworkPeer { async fn run(&self, config: &TestConfig, peer_id: String) -> anyhow::Result<()> { - let peer = self.config.get_peer_id().unwrap(); + let peer = self.config.key_pair.public().clone(); if self.config.is_gateway { tracing::info!(%peer, "Starting gateway {}", peer_id); } else {