From b443dfe10853667e83e4463c2038677dc3992ced Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Wed, 4 Oct 2023 17:11:17 +0300 Subject: [PATCH 1/3] feat: Add trusted peers and keep track of multiple connections per peer --- Cargo.lock | 5 +- node/Cargo.toml | 1 + node/src/exchange/client.rs | 7 +- node/src/p2p.rs | 93 +++++++++++++++----------- node/src/peer_tracker.rs | 128 ++++++++++++++++++++++++++++++------ 5 files changed, 171 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 842b3450..54053a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,7 @@ dependencies = [ "prost", "rand", "serde_json", + "smallvec", "tendermint-proto", "thiserror", "tokio", @@ -3657,9 +3658,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "smol_str" diff --git a/node/Cargo.toml b/node/Cargo.toml index 8126ae6a..c822fbed 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -22,6 +22,7 @@ libp2p = { version = "0.52.3", features = [ "kad", ] } prost = "0.12.0" +smallvec = { version = "1.11.1", features = ["union", "const_generics"] } thiserror = "1.0.48" tokio = { version = "1.32.0", features = ["macros", "sync"] } tracing = "0.1.37" diff --git a/node/src/exchange/client.rs b/node/src/exchange/client.rs index 6f11f6d5..14521c6b 100644 --- a/node/src/exchange/client.rs +++ b/node/src/exchange/client.rs @@ -295,6 +295,7 @@ mod tests { use celestia_types::consts::HASH_SIZE; use celestia_types::hash::Hash; use celestia_types::test_utils::{invalidate, unverify, ExtendedHeaderGenerator}; + use libp2p::swarm::ConnectionId; use std::collections::VecDeque; use std::sync::atomic::{AtomicU64, Ordering}; @@ -1053,8 +1054,10 @@ mod tests { fn peer_tracker_with_n_peers(amount: usize) -> Arc { let peers = Arc::new(PeerTracker::new()); - for _ in 0..amount { - peers.connected(PeerId::random(), None); + for i in 0..amount { + let peer = PeerId::random(); + peers.trusted(peer); + peers.connected(peer, ConnectionId::new_unchecked(i), None); } peers diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 6135bba3..2f5d82a2 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -17,8 +17,8 @@ use libp2p::{ multiaddr::Protocol, ping, swarm::{ - keep_alive, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmBuilder, SwarmEvent, - THandlerErr, + keep_alive, ConnectionId, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmBuilder, + SwarmEvent, THandlerErr, }, Multiaddr, PeerId, TransportError, }; @@ -29,6 +29,7 @@ use tracing::{debug, info, instrument, trace, warn}; use crate::exchange::{ExchangeBehaviour, ExchangeConfig}; use crate::executor::{spawn, Executor}; use crate::peer_tracker::PeerTracker; +use crate::peer_tracker::PeerTrackerInfo; use crate::store::Store; use crate::utils::{ celestia_protocol_id, gossipsub_ident_topic, MultiaddrExt, OneshotResultSender, @@ -77,6 +78,7 @@ impl From for P2pError { pub struct P2p { cmd_tx: mpsc::Sender, header_sub_watcher: watch::Receiver>, + peer_tracker_info_watcher: watch::Receiver, _store: PhantomData, } @@ -99,9 +101,6 @@ pub enum P2pCmd { request: HeaderRequest, respond_to: OneshotResultSender, P2pError>, }, - WaitConnected { - respond_to: oneshot::Sender<()>, - }, Listeners { respond_to: oneshot::Sender>, }, @@ -122,7 +121,11 @@ where async fn start(args: P2pArgs) -> Result { let (cmd_tx, cmd_rx) = mpsc::channel(16); let (header_sub_tx, header_sub_rx) = watch::channel(None); - let mut worker = Worker::new(args, cmd_rx, header_sub_tx)?; + + let peer_tracker = Arc::new(PeerTracker::new()); + let peer_tracker_info_watcher = peer_tracker.info_watcher(); + + let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?; spawn(async move { worker.run().await; @@ -131,6 +134,7 @@ where Ok(P2p { cmd_tx, header_sub_watcher: header_sub_rx, + peer_tracker_info_watcher, _store: PhantomData, }) } @@ -155,14 +159,22 @@ pub trait P2pService: type Store: Store; fn new_header_sub_watcher(&self) -> watch::Receiver>; + fn peer_tracker_info_watcher(&self) -> watch::Receiver; async fn wait_connected(&self) -> Result<()> { - let (tx, rx) = oneshot::channel(); - - self.send_command(P2pCmd::WaitConnected { respond_to: tx }) - .await?; + self.peer_tracker_info_watcher() + .wait_for(|info| info.num_connected_peers > 0) + .await + .map(drop) + .map_err(|_| P2pError::WorkerDied) + } - Ok(rx.await?) + async fn wait_connected_trusted(&self) -> Result<()> { + self.peer_tracker_info_watcher() + .wait_for(|info| info.num_connected_trusted_peers > 0) + .await + .map(drop) + .map_err(|_| P2pError::WorkerDied) } async fn network_info(&self) -> Result { @@ -263,6 +275,10 @@ where fn new_header_sub_watcher(&self) -> watch::Receiver> { self.header_sub_watcher.clone() } + + fn peer_tracker_info_watcher(&self) -> watch::Receiver { + self.peer_tracker_info_watcher.clone() + } } /// Our network behaviour. @@ -288,7 +304,6 @@ where header_sub_topic_hash: TopicHash, cmd_rx: mpsc::Receiver, peer_tracker: Arc, - wait_connected_tx: Option>>, header_sub_watcher: watch::Sender>, store: Arc, } @@ -301,8 +316,8 @@ where args: P2pArgs, cmd_rx: mpsc::Receiver, header_sub_watcher: watch::Sender>, + peer_tracker: Arc, ) -> Result { - let peer_tracker = Arc::new(PeerTracker::new()); let local_peer_id = PeerId::from(args.local_keypair.public()); let autonat = autonat::Behaviour::new(local_peer_id, autonat::Config::default()); @@ -343,6 +358,10 @@ where } for addr in args.bootstrap_peers { + // Bootstrap peers are always trusted + if let Some(peer_id) = addr.peer_id() { + peer_tracker.trusted(peer_id); + } swarm.dial(addr)?; } @@ -351,7 +370,6 @@ where swarm, header_sub_topic_hash: header_sub_topic.hash(), peer_tracker, - wait_connected_tx: None, header_sub_watcher, store: args.store.clone(), }) @@ -389,12 +407,19 @@ where | BehaviourEvent::HeaderEx(_) => {} }, SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. + peer_id, + connection_id, + endpoint, + .. } => { - self.on_peer_connected(peer_id, endpoint); + self.on_peer_connected(peer_id, connection_id, endpoint); } - SwarmEvent::ConnectionClosed { peer_id, .. } => { - self.on_peer_disconnected(peer_id); + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + .. + } => { + self.on_peer_disconnected(peer_id, connection_id); } _ => {} } @@ -416,9 +441,6 @@ where .header_ex .send_request(request, respond_to); } - P2pCmd::WaitConnected { respond_to } => { - self.on_wait_connected(respond_to); - } P2pCmd::Listeners { respond_to } => { let local_peer_id = self.swarm.local_peer_id().to_owned(); let listeners = self @@ -536,7 +558,12 @@ where } #[instrument(skip_all, fields(peer_id = %peer_id))] - fn on_peer_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + fn on_peer_connected( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + endpoint: ConnectedPoint, + ) { info!("Peer connected"); // Inform PeerTracker about the dialed address. @@ -552,26 +579,14 @@ where _ => None, }; - self.peer_tracker.connected(peer_id, dialed_addr); - - for tx in self.wait_connected_tx.take().into_iter().flatten() { - tx.maybe_send(()); - } + self.peer_tracker + .connected(peer_id, connection_id, dialed_addr); } #[instrument(skip_all, fields(peer_id = %peer_id))] - fn on_peer_disconnected(&mut self, peer_id: PeerId) { - info!("Peer disconnected"); - self.peer_tracker.disconnected(peer_id); - } - - fn on_wait_connected(&mut self, respond_to: oneshot::Sender<()>) { - if self.peer_tracker.is_anyone_connected() { - respond_to.maybe_send(()); - } else { - self.wait_connected_tx - .get_or_insert_with(Vec::new) - .push(respond_to); + fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + if self.peer_tracker.maybe_disconnected(peer_id, connection_id) { + info!("Peer disconnected"); } } diff --git a/node/src/peer_tracker.rs b/node/src/peer_tracker.rs index 850c879c..ffe5bf3b 100644 --- a/node/src/peer_tracker.rs +++ b/node/src/peer_tracker.rs @@ -3,17 +3,30 @@ use std::borrow::Borrow; use dashmap::mapref::entry::Entry; use dashmap::mapref::one::RefMut; use dashmap::DashMap; -use libp2p::{identify, Multiaddr, PeerId}; +use libp2p::{identify, swarm::ConnectionId, Multiaddr, PeerId}; +use smallvec::SmallVec; +use tokio::sync::watch; #[derive(Debug)] pub struct PeerTracker { peers: DashMap, + info_tx: watch::Sender, +} + +#[derive(Debug, Clone, Default)] +pub struct PeerTrackerInfo { + /// Number of the connected peers + pub num_connected_peers: u64, + /// Number of the connected trusted peers + pub num_connected_trusted_peers: u64, } #[derive(Debug)] struct PeerInfo { - addrs: Vec, state: PeerState, + addrs: SmallVec<[Multiaddr; 4]>, + connections: SmallVec<[ConnectionId; 1]>, + trusted: bool, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -34,15 +47,26 @@ impl PeerTracker { pub fn new() -> Self { PeerTracker { peers: DashMap::new(), + info_tx: watch::channel(PeerTrackerInfo::default()).0, } } + pub fn info(&self) -> PeerTrackerInfo { + self.info_tx.borrow().to_owned() + } + + pub fn info_watcher(&self) -> watch::Receiver { + self.info_tx.subscribe() + } + pub fn maybe_discovered(&self, peer: PeerId) -> bool { match self.peers.entry(peer) { Entry::Vacant(entry) => { entry.insert(PeerInfo { - addrs: Vec::new(), state: PeerState::Discovered, + addrs: SmallVec::new(), + connections: SmallVec::new(), + trusted: false, }); true } @@ -55,8 +79,10 @@ impl PeerTracker { /// If peer is not found it is added as `PeerState::Discovered`. fn get(&self, peer: PeerId) -> RefMut { self.peers.entry(peer).or_insert_with(|| PeerInfo { - addrs: Vec::new(), state: PeerState::Discovered, + addrs: SmallVec::new(), + connections: SmallVec::new(), + trusted: false, }) } @@ -81,49 +107,81 @@ impl PeerTracker { } } - pub fn connected(&self, peer: PeerId, address: impl Into>) { - let mut state = self.get(peer); + /// Adds a peer as trusted or not. + pub fn trusted(&self, peer: PeerId) { + self.get(peer).value_mut().trusted = true; + } + + /// Inform PeerTracker that there is a new connection for a peer + pub fn connected( + &self, + peer: PeerId, + connection_id: ConnectionId, + address: impl Into>, + ) { + let mut peer_info = self.get(peer); if let Some(address) = address.into() { - if !state.addrs.contains(&address) { - state.addrs.push(address); + if !peer_info.addrs.contains(&address) { + peer_info.addrs.push(address); } } - state.state = PeerState::Connected; + peer_info.state = PeerState::Connected; + peer_info.connections.push(connection_id); + + // If this is the first connection from the peer + if peer_info.connections.len() == 1 { + increase_connected_peers(&self.info_tx, peer_info.trusted); + } } - pub fn disconnected(&self, peer: PeerId) { - let mut state = self.get(peer); + /// Inform PeerTracker that a connection of a peer got disconnected + /// + /// Returns `true` if there are no other connections by the peer + pub fn maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool { + let mut peer_info = self.get(peer); + + peer_info.connections.retain(|id| *id != connection_id); + + // If this is the last connection from the peer + if peer_info.connections.is_empty() { + decrease_connected_peers(&self.info_tx, peer_info.trusted); - if state.addrs.is_empty() { - state.state = PeerState::Discovered; + if peer_info.addrs.is_empty() { + peer_info.state = PeerState::Discovered; + } else { + peer_info.state = PeerState::AddressesFound; + } + + true } else { - state.state = PeerState::AddressesFound; + false } } + /// Inform PeerTracker that we got [`indentify::Info`] for a peer pub fn identified(&self, peer: PeerId, info: &identify::Info) { - let mut state = self.get(peer); + let mut peer_info = self.get(peer); for addr in &info.listen_addrs { - if !state.addrs.contains(addr) { - state.addrs.push(addr.to_owned()); + if !peer_info.addrs.contains(addr) { + peer_info.addrs.push(addr.to_owned()); } } - state.state = PeerState::Identified + peer_info.state = PeerState::Identified; } pub fn is_anyone_connected(&self) -> bool { - self.peers.iter().any(|pair| pair.value().is_connected()) + self.info_tx.borrow().num_connected_peers > 0 } pub fn is_connected(&self, peer: PeerId) -> bool { self.get(peer).is_connected() } - pub fn addresses(&self, peer: PeerId) -> Vec { + pub fn addresses(&self, peer: PeerId) -> SmallVec<[Multiaddr; 4]> { self.get(peer).addrs.clone() } @@ -157,6 +215,16 @@ impl PeerTracker { // collect instead of returning an iter to not block the dashmap .collect() } + + pub fn trusted_n_peers(&self, limit: usize) -> Vec { + self.peers + .iter() + .filter(|pair| pair.value().is_connected() && pair.value().trusted) + .take(limit) + .map(|pair| pair.key().to_owned()) + // collect instead of returning an iter to not block the dashmap + .collect() + } } impl Default for PeerTracker { @@ -164,3 +232,23 @@ impl Default for PeerTracker { PeerTracker::new() } } + +fn increase_connected_peers(info_tx: &watch::Sender, trusted: bool) { + info_tx.send_modify(|tracker_info| { + tracker_info.num_connected_peers += 1; + + if trusted { + tracker_info.num_connected_trusted_peers += 1; + } + }); +} + +fn decrease_connected_peers(info_tx: &watch::Sender, trusted: bool) { + info_tx.send_modify(|tracker_info| { + tracker_info.num_connected_peers -= 1; + + if trusted { + tracker_info.num_connected_trusted_peers -= 1; + } + }); +} From b8e3096143af630a0938b4d1a3f5e626f3da3116 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 5 Oct 2023 15:50:40 +0300 Subject: [PATCH 2/3] name changes on methods and add documentation --- node/src/exchange/client.rs | 4 +-- node/src/p2p.rs | 13 +++++---- node/src/peer_tracker.rs | 57 +++++++++++++++++++++++-------------- 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/node/src/exchange/client.rs b/node/src/exchange/client.rs index 14521c6b..a61697d7 100644 --- a/node/src/exchange/client.rs +++ b/node/src/exchange/client.rs @@ -1056,8 +1056,8 @@ mod tests { for i in 0..amount { let peer = PeerId::random(); - peers.trusted(peer); - peers.connected(peer, ConnectionId::new_unchecked(i), None); + peers.set_trusted(peer); + peers.set_connected(peer, ConnectionId::new_unchecked(i), None); } peers diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 2f5d82a2..95671609 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -360,7 +360,7 @@ where for addr in args.bootstrap_peers { // Bootstrap peers are always trusted if let Some(peer_id) = addr.peer_id() { - peer_tracker.trusted(peer_id); + peer_tracker.set_trusted(peer_id); } swarm.dial(addr)?; } @@ -472,7 +472,7 @@ where let kademlia = &mut self.swarm.behaviour_mut().kademlia; // Inform peer tracker - self.peer_tracker.identified(peer_id, &info); + self.peer_tracker.set_identified(peer_id, &info); // Inform Kademlia for addr in info.listen_addrs { @@ -544,7 +544,7 @@ where #[instrument(skip_all, fields(peer_id = %peer_id))] fn peer_maybe_discovered(&mut self, peer_id: PeerId) { - if !self.peer_tracker.maybe_discovered(peer_id) { + if !self.peer_tracker.set_maybe_discovered(peer_id) { return; } @@ -580,12 +580,15 @@ where }; self.peer_tracker - .connected(peer_id, connection_id, dialed_addr); + .set_connected(peer_id, connection_id, dialed_addr); } #[instrument(skip_all, fields(peer_id = %peer_id))] fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) { - if self.peer_tracker.maybe_disconnected(peer_id, connection_id) { + if self + .peer_tracker + .set_maybe_disconnected(peer_id, connection_id) + { info!("Peer disconnected"); } } diff --git a/node/src/peer_tracker.rs b/node/src/peer_tracker.rs index ffe5bf3b..dd18c1d5 100644 --- a/node/src/peer_tracker.rs +++ b/node/src/peer_tracker.rs @@ -7,6 +7,7 @@ use libp2p::{identify, swarm::ConnectionId, Multiaddr, PeerId}; use smallvec::SmallVec; use tokio::sync::watch; +/// Keeps track various information about peers. #[derive(Debug)] pub struct PeerTracker { peers: DashMap, @@ -15,9 +16,9 @@ pub struct PeerTracker { #[derive(Debug, Clone, Default)] pub struct PeerTrackerInfo { - /// Number of the connected peers + /// Number of the connected peers. pub num_connected_peers: u64, - /// Number of the connected trusted peers + /// Number of the connected trusted peers. pub num_connected_trusted_peers: u64, } @@ -44,6 +45,7 @@ impl PeerInfo { } impl PeerTracker { + /// Constructs an empty PeerTracker. pub fn new() -> Self { PeerTracker { peers: DashMap::new(), @@ -51,15 +53,20 @@ impl PeerTracker { } } + /// Returns the current [`PeerTrackerInfo`]. pub fn info(&self) -> PeerTrackerInfo { self.info_tx.borrow().to_owned() } + /// Returns a watcher for any [`PeerTrackerInfo`] changes. pub fn info_watcher(&self) -> watch::Receiver { self.info_tx.subscribe() } - pub fn maybe_discovered(&self, peer: PeerId) -> bool { + /// Sets peer as discovered if this is it's first appearance. + /// + /// Returns `true` if peer was not known from before. + pub fn set_maybe_discovered(&self, peer: PeerId) -> bool { match self.peers.entry(peer) { Entry::Vacant(entry) => { entry.insert(PeerInfo { @@ -86,6 +93,7 @@ impl PeerTracker { }) } + /// Add an address for a peer. pub fn add_addresses(&self, peer: PeerId, addrs: I) where I: IntoIterator, @@ -107,13 +115,13 @@ impl PeerTracker { } } - /// Adds a peer as trusted or not. - pub fn trusted(&self, peer: PeerId) { + /// Sets peer as trusted. + pub fn set_trusted(&self, peer: PeerId) { self.get(peer).value_mut().trusted = true; } - /// Inform PeerTracker that there is a new connection for a peer - pub fn connected( + /// Sets peer as connected. + pub fn set_connected( &self, peer: PeerId, connection_id: ConnectionId, @@ -127,26 +135,30 @@ impl PeerTracker { } } - peer_info.state = PeerState::Connected; + // This is needed to avoid downgrading `Identified` state to `Connected` + if !peer_info.is_connected() { + peer_info.state = PeerState::Connected; + } + peer_info.connections.push(connection_id); // If this is the first connection from the peer if peer_info.connections.len() == 1 { - increase_connected_peers(&self.info_tx, peer_info.trusted); + increment_connected_peers(&self.info_tx, peer_info.trusted); } } - /// Inform PeerTracker that a connection of a peer got disconnected + /// Sets peer as disconnected if `connection_id` was the last connection. /// - /// Returns `true` if there are no other connections by the peer - pub fn maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool { + /// Returns `true` if was set to disconnected. + pub fn set_maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool { let mut peer_info = self.get(peer); peer_info.connections.retain(|id| *id != connection_id); // If this is the last connection from the peer if peer_info.connections.is_empty() { - decrease_connected_peers(&self.info_tx, peer_info.trusted); + decrement_connected_peers(&self.info_tx, peer_info.trusted); if peer_info.addrs.is_empty() { peer_info.state = PeerState::Discovered; @@ -160,8 +172,8 @@ impl PeerTracker { } } - /// Inform PeerTracker that we got [`indentify::Info`] for a peer - pub fn identified(&self, peer: PeerId, info: &identify::Info) { + /// Sets peer as identified. + pub fn set_identified(&self, peer: PeerId, info: &identify::Info) { let mut peer_info = self.get(peer); for addr in &info.listen_addrs { @@ -173,22 +185,22 @@ impl PeerTracker { peer_info.state = PeerState::Identified; } - pub fn is_anyone_connected(&self) -> bool { - self.info_tx.borrow().num_connected_peers > 0 - } - + /// Returns true if peer is connected. pub fn is_connected(&self, peer: PeerId) -> bool { self.get(peer).is_connected() } + /// Returns the addresses of the peer. pub fn addresses(&self, peer: PeerId) -> SmallVec<[Multiaddr; 4]> { self.get(peer).addrs.clone() } + /// Removes a peer. pub fn remove(&self, peer: PeerId) { self.peers.remove(&peer); } + /// Returns connected peers. pub fn connected_peers(&self) -> Vec { self.peers .iter() @@ -197,6 +209,7 @@ impl PeerTracker { .collect() } + /// Returns one of the best peers. pub fn best_peer(&self) -> Option { // TODO: Implement peer score and return the best. self.peers @@ -205,6 +218,7 @@ impl PeerTracker { .map(|pair| pair.key().to_owned()) } + /// Returns up to N amount of best peers. pub fn best_n_peers(&self, limit: usize) -> Vec { // TODO: Implement peer score and return the best N peers. self.peers @@ -216,6 +230,7 @@ impl PeerTracker { .collect() } + /// Returns up to N amount of trusted peers. pub fn trusted_n_peers(&self, limit: usize) -> Vec { self.peers .iter() @@ -233,7 +248,7 @@ impl Default for PeerTracker { } } -fn increase_connected_peers(info_tx: &watch::Sender, trusted: bool) { +fn increment_connected_peers(info_tx: &watch::Sender, trusted: bool) { info_tx.send_modify(|tracker_info| { tracker_info.num_connected_peers += 1; @@ -243,7 +258,7 @@ fn increase_connected_peers(info_tx: &watch::Sender, trusted: b }); } -fn decrease_connected_peers(info_tx: &watch::Sender, trusted: bool) { +fn decrement_connected_peers(info_tx: &watch::Sender, trusted: bool) { info_tx.send_modify(|tracker_info| { tracker_info.num_connected_peers -= 1; From 32586c6958efdb03ddb80f479da80ae621e5b4b8 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 5 Oct 2023 16:27:59 +0300 Subject: [PATCH 3/3] merge if --- node/src/peer_tracker.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/node/src/peer_tracker.rs b/node/src/peer_tracker.rs index dd18c1d5..8414a631 100644 --- a/node/src/peer_tracker.rs +++ b/node/src/peer_tracker.rs @@ -135,15 +135,11 @@ impl PeerTracker { } } - // This is needed to avoid downgrading `Identified` state to `Connected` - if !peer_info.is_connected() { - peer_info.state = PeerState::Connected; - } - peer_info.connections.push(connection_id); - // If this is the first connection from the peer - if peer_info.connections.len() == 1 { + // If peer was not already connected from before + if !peer_info.is_connected() { + peer_info.state = PeerState::Connected; increment_connected_peers(&self.info_tx, peer_info.trusted); } } @@ -158,14 +154,13 @@ impl PeerTracker { // If this is the last connection from the peer if peer_info.connections.is_empty() { - decrement_connected_peers(&self.info_tx, peer_info.trusted); - if peer_info.addrs.is_empty() { peer_info.state = PeerState::Discovered; } else { peer_info.state = PeerState::AddressesFound; } + decrement_connected_peers(&self.info_tx, peer_info.trusted); true } else { false