Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add trusted peers and keep track of multiple connections per peer #92

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions node/src/exchange/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,8 +1056,8 @@ mod tests {

for i in 0..amount {
let peer = PeerId::random();
peers.trusted(peer);
peers.connected(peer, ConnectionId::new_unchecked(i), None);
peers.set_trusted(peer);
peers.set_connected(peer, ConnectionId::new_unchecked(i), None);
}

peers
Expand Down
13 changes: 8 additions & 5 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ where
for addr in args.bootstrap_peers {
// Bootstrap peers are always trusted
if let Some(peer_id) = addr.peer_id() {
peer_tracker.trusted(peer_id);
peer_tracker.set_trusted(peer_id);
}
swarm.dial(addr)?;
}
Expand Down Expand Up @@ -472,7 +472,7 @@ where
let kademlia = &mut self.swarm.behaviour_mut().kademlia;

// Inform peer tracker
self.peer_tracker.identified(peer_id, &info);
self.peer_tracker.set_identified(peer_id, &info);

// Inform Kademlia
for addr in info.listen_addrs {
Expand Down Expand Up @@ -544,7 +544,7 @@ where

#[instrument(skip_all, fields(peer_id = %peer_id))]
fn peer_maybe_discovered(&mut self, peer_id: PeerId) {
if !self.peer_tracker.maybe_discovered(peer_id) {
if !self.peer_tracker.set_maybe_discovered(peer_id) {
return;
}

Expand Down Expand Up @@ -580,12 +580,15 @@ where
};

self.peer_tracker
.connected(peer_id, connection_id, dialed_addr);
.set_connected(peer_id, connection_id, dialed_addr);
}

#[instrument(skip_all, fields(peer_id = %peer_id))]
fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) {
if self.peer_tracker.maybe_disconnected(peer_id, connection_id) {
if self
.peer_tracker
.set_maybe_disconnected(peer_id, connection_id)
{
info!("Peer disconnected");
}
}
Expand Down
57 changes: 36 additions & 21 deletions node/src/peer_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use libp2p::{identify, swarm::ConnectionId, Multiaddr, PeerId};
use smallvec::SmallVec;
use tokio::sync::watch;

/// Keeps track various information about peers.
#[derive(Debug)]
pub struct PeerTracker {
peers: DashMap<PeerId, PeerInfo>,
Expand All @@ -15,9 +16,9 @@ pub struct PeerTracker {

#[derive(Debug, Clone, Default)]
pub struct PeerTrackerInfo {
/// Number of the connected peers
/// Number of the connected peers.
pub num_connected_peers: u64,
/// Number of the connected trusted peers
/// Number of the connected trusted peers.
pub num_connected_trusted_peers: u64,
}

Expand All @@ -44,22 +45,28 @@ impl PeerInfo {
}

impl PeerTracker {
/// Constructs an empty PeerTracker.
pub fn new() -> Self {
PeerTracker {
peers: DashMap::new(),
info_tx: watch::channel(PeerTrackerInfo::default()).0,
}
}

/// Returns the current [`PeerTrackerInfo`].
pub fn info(&self) -> PeerTrackerInfo {
self.info_tx.borrow().to_owned()
}

/// Returns a watcher for any [`PeerTrackerInfo`] changes.
pub fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
self.info_tx.subscribe()
}

pub fn maybe_discovered(&self, peer: PeerId) -> bool {
/// Sets peer as discovered if this is it's first appearance.
///
/// Returns `true` if peer was not known from before.
pub fn set_maybe_discovered(&self, peer: PeerId) -> bool {
match self.peers.entry(peer) {
Entry::Vacant(entry) => {
entry.insert(PeerInfo {
Expand All @@ -86,6 +93,7 @@ impl PeerTracker {
})
}

/// Add an address for a peer.
pub fn add_addresses<I, A>(&self, peer: PeerId, addrs: I)
where
I: IntoIterator<Item = A>,
Expand All @@ -107,13 +115,13 @@ impl PeerTracker {
}
}

/// Adds a peer as trusted or not.
pub fn trusted(&self, peer: PeerId) {
/// Sets peer as trusted.
pub fn set_trusted(&self, peer: PeerId) {
self.get(peer).value_mut().trusted = true;
}

/// Inform PeerTracker that there is a new connection for a peer
pub fn connected(
/// Sets peer as connected.
pub fn set_connected(
&self,
peer: PeerId,
connection_id: ConnectionId,
Expand All @@ -127,26 +135,30 @@ impl PeerTracker {
}
}

peer_info.state = PeerState::Connected;
// This is needed to avoid downgrading `Identified` state to `Connected`
if !peer_info.is_connected() {
peer_info.state = PeerState::Connected;
}

peer_info.connections.push(connection_id);

// If this is the first connection from the peer
if peer_info.connections.len() == 1 {
increase_connected_peers(&self.info_tx, peer_info.trusted);
increment_connected_peers(&self.info_tx, peer_info.trusted);
zvolin marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Inform PeerTracker that a connection of a peer got disconnected
/// Sets peer as disconnected if `connection_id` was the last connection.
///
/// Returns `true` if there are no other connections by the peer
pub fn maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool {
/// Returns `true` if was set to disconnected.
pub fn set_maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool {
let mut peer_info = self.get(peer);

peer_info.connections.retain(|id| *id != connection_id);

// If this is the last connection from the peer
if peer_info.connections.is_empty() {
decrease_connected_peers(&self.info_tx, peer_info.trusted);
decrement_connected_peers(&self.info_tx, peer_info.trusted);

if peer_info.addrs.is_empty() {
peer_info.state = PeerState::Discovered;
Expand All @@ -160,8 +172,8 @@ impl PeerTracker {
}
}

/// Inform PeerTracker that we got [`indentify::Info`] for a peer
pub fn identified(&self, peer: PeerId, info: &identify::Info) {
/// Sets peer as identified.
pub fn set_identified(&self, peer: PeerId, info: &identify::Info) {
let mut peer_info = self.get(peer);

for addr in &info.listen_addrs {
Expand All @@ -173,22 +185,22 @@ impl PeerTracker {
peer_info.state = PeerState::Identified;
}

pub fn is_anyone_connected(&self) -> bool {
self.info_tx.borrow().num_connected_peers > 0
}

/// Returns true if peer is connected.
pub fn is_connected(&self, peer: PeerId) -> bool {
self.get(peer).is_connected()
}

/// Returns the addresses of the peer.
pub fn addresses(&self, peer: PeerId) -> SmallVec<[Multiaddr; 4]> {
self.get(peer).addrs.clone()
}

/// Removes a peer.
pub fn remove(&self, peer: PeerId) {
self.peers.remove(&peer);
}

/// Returns connected peers.
pub fn connected_peers(&self) -> Vec<PeerId> {
self.peers
.iter()
Expand All @@ -197,6 +209,7 @@ impl PeerTracker {
.collect()
}

/// Returns one of the best peers.
pub fn best_peer(&self) -> Option<PeerId> {
// TODO: Implement peer score and return the best.
self.peers
Expand All @@ -205,6 +218,7 @@ impl PeerTracker {
.map(|pair| pair.key().to_owned())
}

/// Returns up to N amount of best peers.
pub fn best_n_peers(&self, limit: usize) -> Vec<PeerId> {
// TODO: Implement peer score and return the best N peers.
self.peers
Expand All @@ -216,6 +230,7 @@ impl PeerTracker {
.collect()
}

/// Returns up to N amount of trusted peers.
pub fn trusted_n_peers(&self, limit: usize) -> Vec<PeerId> {
self.peers
.iter()
Expand All @@ -233,7 +248,7 @@ impl Default for PeerTracker {
}
}

fn increase_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
fn increment_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
info_tx.send_modify(|tracker_info| {
tracker_info.num_connected_peers += 1;

Expand All @@ -243,7 +258,7 @@ fn increase_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: b
});
}

fn decrease_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
fn decrement_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
info_tx.send_modify(|tracker_info| {
tracker_info.num_connected_peers -= 1;

Expand Down