Skip to content

Commit

Permalink
Fix clippy errors (#1258)
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez authored Oct 9, 2024
1 parent 45cd492 commit eb4b4fd
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 106 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ pub(crate) mod test {
bincode::deserialize::<(EventId, TransportPublicKey)>(&data)
{
tracing::debug!(peer = %self.id, %id, "Received event from the supervisor");
if &pub_key == &self.id {
if pub_key == self.id {
let res = OpenRequest {
client_id: ClientId::FIRST,
request: self
Expand Down
13 changes: 10 additions & 3 deletions crates/core/src/client_events/combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,17 @@ impl<const N: usize> super::ClientEventsProxy for ClientEventsCombinator<N> {
// and we take ownership, so they will be alive for the duration of the program
let f = Box::pin(self.hosts_rx[i].recv())
as Pin<Box<dyn Future<Output = _> + Send + Sync + '_>>;

type ExtendedLife<'a, 'b> = Pin<
Box<
dyn Future<Output = Option<Result<OpenRequest<'a>, ClientError>>>
+ Send
+ Sync
+ 'b,
>,
>;
let new_pend = unsafe {
std::mem::transmute::<_, Pin<Box<dyn Future<Output = _> + Send + Sync + '_>>>(
f,
)
std::mem::transmute::<ExtendedLife<'_, '_>, ExtendedLife<'_, '_>>(f)
};
*fut = Some(new_pend);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/contract/storages/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl ReDb {
txn.commit()?;

Ok(db)
},
}
Err(e) => {
tracing::info!("failed to load contract store: {e}");
Err(e.into())
Expand Down
128 changes: 63 additions & 65 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
node::NetworkBridge,
operations::connect::{
forward_conn, ConnectMsg, ConnectOp, ConnectRequest, ConnectResponse, ConnectState,
ConnectivityInfo,
ConnectivityInfo, ForwardParams,
},
ring::{ConnectionManager, PeerKeyLocation, Ring},
router::Router,
Expand Down Expand Up @@ -46,7 +46,7 @@ pub(super) enum HandshakeError {
#[error(transparent)]
TransportError(#[from] TransportError),
#[error("receibed an unexpected message at this point: {0}")]
UnexpectedMessage(NetMessage),
UnexpectedMessage(Box<NetMessage>),
}

#[derive(Debug)]
Expand All @@ -58,8 +58,8 @@ pub(super) enum Event {
id: Transaction,
conn: PeerConnection,
joiner: PeerId,
op: Option<ConnectOp>,
forward_info: Option<ForwardInfo>,
op: Option<Box<ConnectOp>>,
forward_info: Option<Box<ForwardInfo>>,
},
/// An outbound connection to a peer was successfully established.
OutboundConnectionSuccessful {
Expand Down Expand Up @@ -88,10 +88,11 @@ pub(super) enum Event {
target: SocketAddr,
tx: Transaction,
forward_to: PeerId,
msg: NetMessage,
msg: Box<NetMessage>,
},
}

#[allow(clippy::large_enum_variant)]
enum ForwardResult {
Forward(PeerId, NetMessage, ConnectivityInfo),
Rejected,
Expand Down Expand Up @@ -279,7 +280,7 @@ impl HandshakeHandler {
tracing::debug!(from=%peer_id.addr, "Outbound connection failed: {error}");
self.connecting.remove(&peer_id.addr);
self.outbound_messages.remove(&peer_id.addr);
Ok(Event::OutboundConnectionFailed { peer_id, error: error.into() })
Ok(Event::OutboundConnectionFailed { peer_id, error })
}
Some(Ok(other)) => {
tracing::error!("Unexpected event: {other:?}");
Expand Down Expand Up @@ -343,11 +344,14 @@ impl HandshakeHandler {
&self.connection_manager,
self.router.clone(),
&mut nw_bridge,
(my_peer_id.clone(), joiner_pk_loc.clone()),
hops_to_live,
max_hops_to_live,
true,
skip_list,
ForwardParams {
left_htl: hops_to_live,
max_htl: max_hops_to_live,
skip_list,
accepted: true,
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
}
);

match f.await {
Expand Down Expand Up @@ -375,8 +379,8 @@ impl HandshakeHandler {
id,
conn,
joiner,
op: ok.map(|ok_value| ConnectOp::new(id, Some(ok_value), None, None)),
forward_info,
op: ok.map(|ok_value| Box::new(ConnectOp::new(id, Some(ok_value), None, None))),
forward_info: forward_info.map(Box::new),
})

} else {
Expand Down Expand Up @@ -404,7 +408,7 @@ impl HandshakeHandler {
target: remote,
tx: id,
forward_to: forward_target,
msg,
msg: Box::new(msg),
});
}
Ok(ForwardResult::Rejected) => {
Expand Down Expand Up @@ -473,11 +477,14 @@ impl HandshakeHandler {
&self.connection_manager,
self.router.clone(),
&mut nw_bridge,
(my_peer_id.clone(), joiner_pk_loc.clone()),
transaction.hops_to_live,
transaction.max_hops_to_live,
false,
transaction.skip_list.clone(),
ForwardParams {
left_htl: transaction.hops_to_live,
max_htl: transaction.max_hops_to_live,
skip_list: transaction.skip_list.clone(),
accepted: false,
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
},
)
.await
{
Expand Down Expand Up @@ -524,22 +531,19 @@ impl HandshakeHandler {
/// Handles outbound messages to peers.
async fn outbound(&mut self, addr: SocketAddr, op: NetMessage) -> Option<Event> {
if let Some(alive_conn) = self.outbound_messages.get_mut(&addr) {
match &op {
NetMessage::V1(NetMessageV1::Connect(op)) => {
let tx = *op.id();
if self
.connecting
.get(&addr)
.filter(|current_tx| *current_tx != &tx)
.is_some()
{
// avoid duplicate connection attempts
tracing::warn!("Duplicate connection attempt to {addr}, ignoring");
return Some(Event::RemoveTransaction(tx));
}
self.connecting.insert(addr, tx);
if let NetMessage::V1(NetMessageV1::Connect(op)) = &op {
let tx = *op.id();
if self
.connecting
.get(&addr)
.filter(|current_tx| *current_tx != &tx)
.is_some()
{
// avoid duplicate connection attempts
tracing::warn!("Duplicate connection attempt to {addr}, ignoring");
return Some(Event::RemoveTransaction(tx));
}
_ => {}
self.connecting.insert(addr, tx);
}

if alive_conn.send(op).await.is_err() {
Expand All @@ -549,22 +553,15 @@ impl HandshakeHandler {
None
} else {
let mut send_to_remote = None;
match &op {
NetMessage::V1(NetMessageV1::Connect(op)) => {
match op {
ConnectMsg::Response {
msg: ConnectResponse::AcceptedBy { joiner, .. },
..
} => {
// this may be a reply message from a downstream peer to which it was forwarded previously
// for a transient connection, in this case we must send this message to the proper
// gw_transient_peer_conn future that is waiting for it
send_to_remote = Some(joiner.addr);
}
_ => {}
}
}
_ => {}
if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
msg: ConnectResponse::AcceptedBy { joiner, .. },
..
})) = &op
{
// this may be a reply message from a downstream peer to which it was forwarded previously
// for a transient connection, in this case we must send this message to the proper
// gw_transient_peer_conn future that is waiting for it
send_to_remote = Some(joiner.addr);
}

if let Some(remote) = send_to_remote {
Expand Down Expand Up @@ -628,11 +625,10 @@ impl HandshakeHandler {
conn: PeerConnection,
max_hops_to_live: usize,
) -> Result<()> {
let tx = self
let tx = *self
.connecting
.get(&gw_peer_id.addr)
.ok_or_else(|| HandshakeError::ConnectionClosed(conn.remote_addr()))?
.clone();
.ok_or_else(|| HandshakeError::ConnectionClosed(conn.remote_addr()))?;
let this_peer = self.connection_manager.own_location().peer;
tracing::debug!(at=?conn.my_address(), %this_peer.addr, from=%conn.remote_addr(), remote_addr = %gw_peer_id, "Waiting for confirmation from gw");
self.ongoing_outbound_connections.push(
Expand Down Expand Up @@ -767,7 +763,7 @@ async fn wait_for_gw_confirmation(
// under this branch we just need to wait long enough for the gateway to reply with all the downstream
// connection attempts, and then we can drop the connection, so keep listening to it in a loop or timeout
let remote = tracker.gw_conn.remote_addr();
Ok(tokio::time::timeout(
tokio::time::timeout(
timeout_duration,
check_remaining_hops(tracker),
)
Expand All @@ -778,7 +774,7 @@ async fn wait_for_gw_confirmation(
gw_peer_id,
HandshakeError::ConnectionClosed(remote),
)
})??)
})?
}

async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResult {
Expand All @@ -800,7 +796,7 @@ async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResul
)
})
.await??;
let msg = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e.into()))?;
let msg = decode_msg(&msg).map_err(|e| (gw_peer_id.clone(), e))?;
match msg {
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
msg:
Expand Down Expand Up @@ -842,7 +838,12 @@ async fn check_remaining_hops(mut tracker: AcceptedTracker) -> OutboundConnResul
tracing::warn!(from=%tracker.gw_conn.remote_addr(), "Received FindOptimalPeer request, ignoring");
continue;
}
other => return Err((gw_peer_id, HandshakeError::UnexpectedMessage(other))),
other => {
return Err((
gw_peer_id,
HandshakeError::UnexpectedMessage(Box::new(other)),
))
}
}
}
Ok(InternalEvent::FinishedOutboundConnProcess(tracker))
Expand Down Expand Up @@ -1027,7 +1028,7 @@ impl TransientConnection {

#[inline(always)]
fn decode_msg(data: &[u8]) -> Result<NetMessage> {
bincode::deserialize(data).map_err(|err| HandshakeError::Serialization(err))
bincode::deserialize(data).map_err(HandshakeError::Serialization)
}

#[cfg(test)]
Expand Down Expand Up @@ -1130,14 +1131,11 @@ mod tests {
let sym_msg = SymmetricMessage::serialize_msg_to_packet_data(
self.packet_id,
msg,
&out_symm_key,
out_symm_key,
vec![],
)
.unwrap();
packet_sender
.send(sym_msg.as_unknown().into())
.await
.unwrap();
packet_sender.send(sym_msg.into_unknown()).await.unwrap();
self.packet_id += 1;
}

Expand Down Expand Up @@ -1511,7 +1509,7 @@ mod tests {
assert_eq!(forward_to.pub_key, peer_pub_key);
assert_eq!(forward_to.addr, peer_peer_id.addr);
assert!(matches!(
msg,
&*msg,
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
msg: ConnectRequest::CheckConnectivity { .. },
..
Expand Down Expand Up @@ -1749,7 +1747,7 @@ mod tests {
if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
msg: ConnectRequest::CheckConnectivity { sender, joiner, .. },
..
})) = msg
})) = &*msg
{
assert_eq!(sender.peer, gw_peer_id);
assert_eq!(joiner.peer, joiner_peer_id);
Expand Down
22 changes: 11 additions & 11 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ impl P2pConnManager {
NodeEvent::DropConnection(peer) => {
tracing::debug!(%peer, "Dropping connection");
if let Some(conn) = self.connections.remove(&peer) {
let _ = conn.send(Right(ConnEvent::NodeAction(
NodeEvent::DropConnection(peer),
)));
let _ = conn
.send(Right(ConnEvent::NodeAction(
NodeEvent::DropConnection(peer),
)))
.await;
}
}
NodeEvent::ConnectPeer {
Expand Down Expand Up @@ -381,9 +383,7 @@ impl P2pConnManager {
is_gw: bool,
) -> anyhow::Result<()> {
tracing::info!(tx = %tx, remote = %peer, "Connecting to peer");
state
.awaiting_connection
.insert(peer.addr.clone(), callback);
state.awaiting_connection.insert(peer.addr, callback);
match establish_connection
.establish_conn(peer.clone(), tx, is_gw)
.await
Expand Down Expand Up @@ -430,7 +430,7 @@ impl P2pConnManager {
if let Some(op) = op {
self.bridge
.op_manager
.push(id, crate::operations::OpEnum::Connect(op.into()))
.push(id, crate::operations::OpEnum::Connect(op))
.await?;
}
let task = peer_connection_listener(rx, conn).boxed();
Expand All @@ -439,7 +439,7 @@ impl P2pConnManager {
if let Some(ForwardInfo {
target: forward_to,
msg,
}) = forward_info
}) = forward_info.map(|b| *b)
{
self.try_to_forward(&forward_to, msg).await?;
}
Expand All @@ -461,7 +461,7 @@ impl P2pConnManager {
);
}
}
self.try_to_forward(&forward_to, msg).await?;
self.try_to_forward(&forward_to, *msg).await?;
}
HandshakeEvent::OutboundConnectionSuccessful {
peer_id,
Expand Down Expand Up @@ -506,7 +506,7 @@ impl P2pConnManager {
}

async fn try_to_forward(&mut self, forward_to: &PeerId, msg: NetMessage) -> anyhow::Result<()> {
if let Some(peer) = self.connections.get(&forward_to) {
if let Some(peer) = self.connections.get(forward_to) {
tracing::debug!(%forward_to, %msg, "Forwarding message to peer");
peer.send(Left(msg)).await?;
} else {
Expand Down Expand Up @@ -535,7 +535,7 @@ impl P2pConnManager {
let self_addr = connection
.my_address()
.ok_or_else(|| anyhow::anyhow!("self addr should be set"))?;
let key = (&*self.bridge.op_manager.ring.connection_manager.pub_key).clone();
let key = (*self.bridge.op_manager.ring.connection_manager.pub_key).clone();
PeerId::new(self_addr, key)
};
let _ = cb.send_result(Ok((peer_id, remaining_checks))).await;
Expand Down
Loading

0 comments on commit eb4b4fd

Please sign in to comment.