diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index dfb9757eb..8bba626ac 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -370,7 +370,7 @@ pub(crate) mod test { bincode::deserialize::<(EventId, TransportPublicKey)>(&data) { tracing::debug!(peer = %self.id, %id, "Received event from the supervisor"); - if &pub_key == &self.id { + if pub_key == self.id { let res = OpenRequest { client_id: ClientId::FIRST, request: self diff --git a/crates/core/src/client_events/combinator.rs b/crates/core/src/client_events/combinator.rs index 07563c0f1..145d4b858 100644 --- a/crates/core/src/client_events/combinator.rs +++ b/crates/core/src/client_events/combinator.rs @@ -70,10 +70,17 @@ impl super::ClientEventsProxy for ClientEventsCombinator { // and we take ownership, so they will be alive for the duration of the program let f = Box::pin(self.hosts_rx[i].recv()) as Pin + Send + Sync + '_>>; + + type ExtendedLife<'a, 'b> = Pin< + Box< + dyn Future, ClientError>>> + + Send + + Sync + + 'b, + >, + >; let new_pend = unsafe { - std::mem::transmute::<_, Pin + Send + Sync + '_>>>( - f, - ) + std::mem::transmute::, ExtendedLife<'_, '_>>(f) }; *fut = Some(new_pend); } diff --git a/crates/core/src/contract/storages/redb.rs b/crates/core/src/contract/storages/redb.rs index 226dd284c..122f46a63 100644 --- a/crates/core/src/contract/storages/redb.rs +++ b/crates/core/src/contract/storages/redb.rs @@ -32,7 +32,7 @@ impl ReDb { txn.commit()?; Ok(db) - }, + } Err(e) => { tracing::info!("failed to load contract store: {e}"); Err(e.into()) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index fde852bc2..7d978a554 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -17,7 +17,7 @@ use crate::{ node::NetworkBridge, operations::connect::{ forward_conn, ConnectMsg, ConnectOp, ConnectRequest, ConnectResponse, ConnectState, - ConnectivityInfo, + ConnectivityInfo, ForwardParams, }, ring::{ConnectionManager, PeerKeyLocation, Ring}, router::Router, @@ -46,7 +46,7 @@ pub(super) enum HandshakeError { #[error(transparent)] TransportError(#[from] TransportError), #[error("receibed an unexpected message at this point: {0}")] - UnexpectedMessage(NetMessage), + UnexpectedMessage(Box), } #[derive(Debug)] @@ -58,8 +58,8 @@ pub(super) enum Event { id: Transaction, conn: PeerConnection, joiner: PeerId, - op: Option, - forward_info: Option, + op: Option>, + forward_info: Option>, }, /// An outbound connection to a peer was successfully established. OutboundConnectionSuccessful { @@ -88,10 +88,11 @@ pub(super) enum Event { target: SocketAddr, tx: Transaction, forward_to: PeerId, - msg: NetMessage, + msg: Box, }, } +#[allow(clippy::large_enum_variant)] enum ForwardResult { Forward(PeerId, NetMessage, ConnectivityInfo), Rejected, @@ -279,7 +280,7 @@ impl HandshakeHandler { tracing::debug!(from=%peer_id.addr, "Outbound connection failed: {error}"); self.connecting.remove(&peer_id.addr); self.outbound_messages.remove(&peer_id.addr); - Ok(Event::OutboundConnectionFailed { peer_id, error: error.into() }) + Ok(Event::OutboundConnectionFailed { peer_id, error }) } Some(Ok(other)) => { tracing::error!("Unexpected event: {other:?}"); @@ -343,11 +344,14 @@ impl HandshakeHandler { &self.connection_manager, self.router.clone(), &mut nw_bridge, - (my_peer_id.clone(), joiner_pk_loc.clone()), - hops_to_live, - max_hops_to_live, - true, - skip_list, + ForwardParams { + left_htl: hops_to_live, + max_htl: max_hops_to_live, + skip_list, + accepted: true, + req_peer: my_peer_id.clone(), + joiner: joiner_pk_loc.clone(), + } ); match f.await { @@ -375,8 +379,8 @@ impl HandshakeHandler { id, conn, joiner, - op: ok.map(|ok_value| ConnectOp::new(id, Some(ok_value), None, None)), - forward_info, + op: ok.map(|ok_value| Box::new(ConnectOp::new(id, Some(ok_value), None, None))), + forward_info: forward_info.map(Box::new), }) } else { @@ -404,7 +408,7 @@ impl HandshakeHandler { target: remote, tx: id, forward_to: forward_target, - msg, + msg: Box::new(msg), }); } Ok(ForwardResult::Rejected) => { @@ -473,11 +477,14 @@ impl HandshakeHandler { &self.connection_manager, self.router.clone(), &mut nw_bridge, - (my_peer_id.clone(), joiner_pk_loc.clone()), - transaction.hops_to_live, - transaction.max_hops_to_live, - false, - transaction.skip_list.clone(), + ForwardParams { + left_htl: transaction.hops_to_live, + max_htl: transaction.max_hops_to_live, + skip_list: transaction.skip_list.clone(), + accepted: false, + req_peer: my_peer_id.clone(), + joiner: joiner_pk_loc.clone(), + }, ) .await { @@ -524,22 +531,19 @@ impl HandshakeHandler { /// Handles outbound messages to peers. async fn outbound(&mut self, addr: SocketAddr, op: NetMessage) -> Option { if let Some(alive_conn) = self.outbound_messages.get_mut(&addr) { - match &op { - NetMessage::V1(NetMessageV1::Connect(op)) => { - let tx = *op.id(); - if self - .connecting - .get(&addr) - .filter(|current_tx| *current_tx != &tx) - .is_some() - { - // avoid duplicate connection attempts - tracing::warn!("Duplicate connection attempt to {addr}, ignoring"); - return Some(Event::RemoveTransaction(tx)); - } - self.connecting.insert(addr, tx); + if let NetMessage::V1(NetMessageV1::Connect(op)) = &op { + let tx = *op.id(); + if self + .connecting + .get(&addr) + .filter(|current_tx| *current_tx != &tx) + .is_some() + { + // avoid duplicate connection attempts + tracing::warn!("Duplicate connection attempt to {addr}, ignoring"); + return Some(Event::RemoveTransaction(tx)); } - _ => {} + self.connecting.insert(addr, tx); } if alive_conn.send(op).await.is_err() { @@ -549,22 +553,15 @@ impl HandshakeHandler { None } else { let mut send_to_remote = None; - match &op { - NetMessage::V1(NetMessageV1::Connect(op)) => { - match op { - ConnectMsg::Response { - msg: ConnectResponse::AcceptedBy { joiner, .. }, - .. - } => { - // this may be a reply message from a downstream peer to which it was forwarded previously - // for a transient connection, in this case we must send this message to the proper - // gw_transient_peer_conn future that is waiting for it - send_to_remote = Some(joiner.addr); - } - _ => {} - } - } - _ => {} + if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { + msg: ConnectResponse::AcceptedBy { joiner, .. }, + .. + })) = &op + { + // this may be a reply message from a downstream peer to which it was forwarded previously + // for a transient connection, in this case we must send this message to the proper + // gw_transient_peer_conn future that is waiting for it + send_to_remote = Some(joiner.addr); } if let Some(remote) = send_to_remote { @@ -628,11 +625,10 @@ impl HandshakeHandler { conn: PeerConnection, max_hops_to_live: usize, ) -> Result<()> { - let tx = self + let tx = *self .connecting .get(&gw_peer_id.addr) - .ok_or_else(|| HandshakeError::ConnectionClosed(conn.remote_addr()))? - .clone(); + .ok_or_else(|| HandshakeError::ConnectionClosed(conn.remote_addr()))?; let this_peer = self.connection_manager.own_location().peer; tracing::debug!(at=?conn.my_address(), %this_peer.addr, from=%conn.remote_addr(), remote_addr = %gw_peer_id, "Waiting for confirmation from gw"); self.ongoing_outbound_connections.push( @@ -767,7 +763,7 @@ async fn wait_for_gw_confirmation( // under this branch we just need to wait long enough for the gateway to reply with all the downstream // connection attempts, and then we can drop the connection, so keep listening to it in a loop or timeout let remote = tracker.gw_conn.remote_addr(); - Ok(tokio::time::timeout( + tokio::time::timeout( timeout_duration, check_remaining_hops(tracker), ) @@ -778,7 +774,7 @@ async fn wait_for_gw_confirmation( gw_peer_id, HandshakeError::ConnectionClosed(remote), ) - })??) + })? } async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResult { @@ -800,7 +796,7 @@ async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResul ) }) .await??; - let msg = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e.into()))?; + let msg = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e))?; match msg { NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { msg: @@ -842,7 +838,12 @@ async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResul tracing::warn!(from=%tracker.gw_conn.remote_addr(), "Received FindOptimalPeer request, ignoring"); continue; } - other => return Err((gw_peer_id, HandshakeError::UnexpectedMessage(other))), + other => { + return Err(( + gw_peer_id, + HandshakeError::UnexpectedMessage(Box::new(other)), + )) + } } } Ok(InternalEvent::FinishedOutboundConnProcess(tracker)) @@ -1027,7 +1028,7 @@ impl TransientConnection { #[inline(always)] fn decode_msg(data: &[u8]) -> Result { - bincode::deserialize(data).map_err(|err| HandshakeError::Serialization(err)) + bincode::deserialize(data).map_err(HandshakeError::Serialization) } #[cfg(test)] @@ -1130,14 +1131,11 @@ mod tests { let sym_msg = SymmetricMessage::serialize_msg_to_packet_data( self.packet_id, msg, - &out_symm_key, + out_symm_key, vec![], ) .unwrap(); - packet_sender - .send(sym_msg.as_unknown().into()) - .await - .unwrap(); + packet_sender.send(sym_msg.into_unknown()).await.unwrap(); self.packet_id += 1; } @@ -1511,7 +1509,7 @@ mod tests { assert_eq!(forward_to.pub_key, peer_pub_key); assert_eq!(forward_to.addr, peer_peer_id.addr); assert!(matches!( - msg, + &*msg, NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { msg: ConnectRequest::CheckConnectivity { .. }, .. @@ -1749,7 +1747,7 @@ mod tests { if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { msg: ConnectRequest::CheckConnectivity { sender, joiner, .. }, .. - })) = msg + })) = &*msg { assert_eq!(sender.peer, gw_peer_id); assert_eq!(joiner.peer, joiner_peer_id); diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index f3b840d18..3d0e6932e 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -234,9 +234,11 @@ impl P2pConnManager { NodeEvent::DropConnection(peer) => { tracing::debug!(%peer, "Dropping connection"); if let Some(conn) = self.connections.remove(&peer) { - let _ = conn.send(Right(ConnEvent::NodeAction( - NodeEvent::DropConnection(peer), - ))); + let _ = conn + .send(Right(ConnEvent::NodeAction( + NodeEvent::DropConnection(peer), + ))) + .await; } } NodeEvent::ConnectPeer { @@ -381,9 +383,7 @@ impl P2pConnManager { is_gw: bool, ) -> anyhow::Result<()> { tracing::info!(tx = %tx, remote = %peer, "Connecting to peer"); - state - .awaiting_connection - .insert(peer.addr.clone(), callback); + state.awaiting_connection.insert(peer.addr, callback); match establish_connection .establish_conn(peer.clone(), tx, is_gw) .await @@ -430,7 +430,7 @@ impl P2pConnManager { if let Some(op) = op { self.bridge .op_manager - .push(id, crate::operations::OpEnum::Connect(op.into())) + .push(id, crate::operations::OpEnum::Connect(op)) .await?; } let task = peer_connection_listener(rx, conn).boxed(); @@ -439,7 +439,7 @@ impl P2pConnManager { if let Some(ForwardInfo { target: forward_to, msg, - }) = forward_info + }) = forward_info.map(|b| *b) { self.try_to_forward(&forward_to, msg).await?; } @@ -461,7 +461,7 @@ impl P2pConnManager { ); } } - self.try_to_forward(&forward_to, msg).await?; + self.try_to_forward(&forward_to, *msg).await?; } HandshakeEvent::OutboundConnectionSuccessful { peer_id, @@ -506,7 +506,7 @@ impl P2pConnManager { } async fn try_to_forward(&mut self, forward_to: &PeerId, msg: NetMessage) -> anyhow::Result<()> { - if let Some(peer) = self.connections.get(&forward_to) { + if let Some(peer) = self.connections.get(forward_to) { tracing::debug!(%forward_to, %msg, "Forwarding message to peer"); peer.send(Left(msg)).await?; } else { @@ -535,7 +535,7 @@ impl P2pConnManager { let self_addr = connection .my_address() .ok_or_else(|| anyhow::anyhow!("self addr should be set"))?; - let key = (&*self.bridge.op_manager.ring.connection_manager.pub_key).clone(); + let key = (*self.bridge.op_manager.ring.connection_manager.pub_key).clone(); PeerId::new(self_addr, key) }; let _ = cb.send_result(Ok((peer_id, remaining_checks))).await; diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index e42f5c1ae..0bbb3b472 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -330,11 +330,14 @@ impl Operation for ConnectOp { &op_manager.ring.connection_manager, op_manager.ring.router.clone(), network_bridge, - (sender.clone(), joiner.clone()), - *hops_to_live, - *max_hops_to_live, - should_accept, - skip_list.clone(), + ForwardParams { + left_htl: *hops_to_live, + max_htl: *max_hops_to_live, + accepted: should_accept, + skip_list: skip_list.clone(), + req_peer: sender.clone(), + joiner: joiner.clone(), + }, ) .await? { @@ -851,20 +854,33 @@ async fn connect_request( } } +pub(crate) struct ForwardParams { + pub left_htl: usize, + pub max_htl: usize, + pub accepted: bool, + pub skip_list: Vec, + pub req_peer: PeerKeyLocation, + pub joiner: PeerKeyLocation, +} + pub(crate) async fn forward_conn( id: Transaction, connection_manager: &ConnectionManager, router: Arc>, network_bridge: &mut NB, - (req_peer, joiner): (PeerKeyLocation, PeerKeyLocation), - left_htl: usize, - max_htl: usize, - accepted: bool, - mut skip_list: Vec, + params: ForwardParams, ) -> Result, OpError> where NB: NetworkBridge, { + let ForwardParams { + left_htl, + max_htl, + accepted, + mut skip_list, + req_peer, + joiner, + } = params; if left_htl == 0 { tracing::debug!( tx = %id, diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 9e7595c09..8b54569da 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -235,7 +235,7 @@ impl ConnectionManager { pub fn try_set_peer_key(&self, addr: SocketAddr) -> Option { let mut this_peer = self.peer_key.lock(); if this_peer.is_none() { - *this_peer = Some(PeerId::new(addr, (&*self.pub_key).clone())); + *this_peer = Some(PeerId::new(addr, (*self.pub_key).clone())); None } else { this_peer.clone() @@ -254,7 +254,7 @@ impl ConnectionManager { let connection_type = if is_alive { "active" } else { "in transit" }; tracing::debug!(%peer, "Pruning {} connection", connection_type); - let Some(loc) = self.location_for_peer.write().remove(&peer) else { + let Some(loc) = self.location_for_peer.write().remove(peer) else { if is_alive { self.open_connections .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); @@ -339,7 +339,7 @@ impl ConnectionManager { self.connections_by_location.read().len() } - pub(super) fn connected_peers<'a>(&'a self) -> impl Iterator { + pub(super) fn connected_peers(&self) -> impl Iterator { let read = self.location_for_peer.read(); read.keys().cloned().collect::>().into_iter() } diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index ea5063f43..7742f237d 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -58,7 +58,7 @@ impl TransportPublicKey { impl std::fmt::Debug for TransportPublicKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - ::fmt(&self, f) + ::fmt(self, f) } } diff --git a/crates/core/src/transport/packet_data.rs b/crates/core/src/transport/packet_data.rs index 495968cb3..3ac7d26c0 100644 --- a/crates/core/src/transport/packet_data.rs +++ b/crates/core/src/transport/packet_data.rs @@ -189,7 +189,7 @@ impl PacketData { #[cfg(test)] impl PacketData { - pub fn as_unknown(self) -> PacketData { + pub fn into_unknown(self) -> PacketData { PacketData { data: self.data, size: self.size, diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 3da0c2f1e..2665a3504 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -106,6 +106,20 @@ impl std::fmt::Debug for PeerConnection { } } +#[cfg(test)] +type PeerConnectionMock = ( + PeerConnection, + mpsc::Sender>, + mpsc::Receiver<(SocketAddr, Arc<[u8]>)>, +); + +#[cfg(test)] +type RemoteConnectionMock = ( + RemoteConnection, + mpsc::Sender>, + mpsc::Receiver<(SocketAddr, Arc<[u8]>)>, +); + impl PeerConnection { pub(super) fn new(remote_conn: RemoteConnection) -> Self { Self { @@ -123,11 +137,7 @@ impl PeerConnection { my_address: SocketAddr, outbound_symmetric_key: Aes128Gcm, inbound_symmetric_key: Aes128Gcm, - ) -> ( - Self, - mpsc::Sender>, - mpsc::Receiver<(SocketAddr, Arc<[u8]>)>, - ) { + ) -> PeerConnectionMock { use parking_lot::Mutex; let (outbound_packets, outbound_packets_recv) = mpsc::channel(1); let (inbound_packet_sender, inbound_packet_recv) = mpsc::channel(1); @@ -155,11 +165,7 @@ impl PeerConnection { my_address: SocketAddr, outbound_symmetric_key: Aes128Gcm, inbound_symmetric_key: Aes128Gcm, - ) -> ( - RemoteConnection, - mpsc::Sender>, - mpsc::Receiver<(SocketAddr, Arc<[u8]>)>, - ) { + ) -> RemoteConnectionMock { use parking_lot::Mutex; let (outbound_packets, outbound_packets_recv) = mpsc::channel(1); let (inbound_packet_sender, inbound_packet_recv) = mpsc::channel(1);