Skip to content

Commit

Permalink
feat: add ClosestNodes struct to simplify queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Oct 23, 2024
1 parent d660bd7 commit b701558
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 112 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ All notable changes to mainline dht will be documented in this file.

- Export `errors` module containing `PutError` as a part of the response of `Rpc::put`.
- `Dht::id()` and `AsyncDht::id()` to get the node's Id.
- `Dht::find_node()` and `AsyncDht::find_node()` to lookup a certain target, without calling `get_peers` and return a routing table of the responding nodes.
- `Dht::find_node()` and `AsyncDht::find_node()` to lookup a certain target, without calling `get_peers` and the closest responding nodes.
- `Dht::info()` and `AsyncDht::info()` some internal information about the node from one method.
- `Info::dht_size_estimate` to get the ongoing dht size estimate resulting from watching results of all queries.
- `measure_dht` example to estimate the DHT size.
Expand Down
8 changes: 4 additions & 4 deletions src/async_dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::net::SocketAddr;
use crate::{
common::{
hash_immutable, AnnouncePeerRequestArguments, FindNodeRequestArguments,
GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem,
GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem, Node,
PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific,
RequestTypeSpecific, RoutingTable,
RequestTypeSpecific,
},
dht::{ActorMessage, Dht, DhtPutError, DhtWasShutdown, Info},
rpc::{PutError, ResponseSender},
Expand Down Expand Up @@ -52,8 +52,8 @@ impl AsyncDht {

// === Find nodes ===

pub async fn find_node(&self, target: Id) -> Result<RoutingTable, DhtWasShutdown> {
let (sender, receiver) = flume::bounded::<RoutingTable>(1);
pub async fn find_node(&self, target: Id) -> Result<Vec<Node>, DhtWasShutdown> {
let (sender, receiver) = flume::bounded::<Vec<Node>>(1);

let request = RequestTypeSpecific::FindNode(FindNodeRequestArguments { target });

Expand Down
14 changes: 14 additions & 0 deletions src/common/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ impl Id {
}
}
}

/// Returns the number of intervals in the keyspace if divided by the distance
/// between this id and a given `target`.
///
/// Useful to estimate the Dht size see [crate::ClosestNodes::dht_size_estimate]
pub fn keyspace_intervals(&self, target: Id) -> usize {
let xor = self.xor(&target);

// Round up the lower 4 bytes to get a u128 from u160.
let distance =
u128::from_be_bytes(xor.as_bytes()[0..16].try_into().expect("infallible")) + 1;

(u128::MAX / distance) as usize
}
}

fn first_21_bits(bytes: &[u8]) -> [u8; 3] {
Expand Down
78 changes: 1 addition & 77 deletions src/common/routing_table.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Simplified Kademlia routing table
use std::collections::BTreeMap;
use std::slice::Iter;
use std::{collections::BTreeMap, convert::TryInto};

use crate::common::{Id, Node, MAX_DISTANCE};

Expand Down Expand Up @@ -135,58 +135,6 @@ impl RoutingTable {
nodes
}

/// An estimation of the Dht from the distribution of closest nodes
/// responding to a query.
///
/// In order to get an accurate calculation of the Dht size, you should take
/// as many lookups (at uniformly disrtibuted target) as you can,
/// and calculate the average of the estimations based on their responding nodes.
///
/// # Explanation
///
/// Consider a Dht with a 4 bit key space.
/// Then we can map nodes in that keyspace by their distance to a given target of a lookup.
///
/// Assuming a random but uniform distribution of nodes (which can be measured independently),
/// you should see nodes distributed somewhat like this:
///
/// ```md
/// (1) (2) (3) (4) (5) (6) (7) (8)
/// |------|------|------|------|------|------|------|------|------|------|------|------|------|------|------|
/// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/// ```
///
/// So if you make a lookup and optained this partial view of the network:
/// ```md
/// (1) (2) (3) (4) (5)
/// |------|------|------|------|------|------|------|------|------|------|------|------|------|------|------|
/// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/// ```
///
/// Note: you see exponentially less further nodes than closer ones, which is what you should expect from how
/// the routing table works.
///
/// Seeing one node at distance (d1=2), suggests that the routing table might contain 8 nodes,
/// since its full length is 8 times (d1).
///
/// Similarily, seeing two nodes at (d2=3), suggests that the routing table might contain ~11
/// nodes, since the key space is more than (d2).
///
/// If we repeat this estimation for as many nodes as the routing table's `k` bucket size,
/// and take their average, we get a more accurate estimation of the dht.
///
/// ## Formula
///
/// The estimated number of Dht size, at each distance `di`, is `en_i = i * d_max / di` where `i` is the
/// count of nodes discovered until this distance and `d_max` is the size of the key space.
///
/// The final Dht size estimation is the average of `en_1 + en_2 + .. + en_n`
///
/// Read more at [A New Method for Estimating P2P Network Size](https://eli.sohl.com/2020/06/05/dht-size-estimation.html#fnref:query-count)
pub fn estimate_dht_size(&self) -> usize {
estimate_dht_size(self.id, &self.to_vec())
}

// === Private Methods ===

#[cfg(test)]
Expand All @@ -202,30 +150,6 @@ impl RoutingTable {
}
}

pub(crate) fn estimate_dht_size(target: Id, nodes: &[Node]) -> usize {
if nodes.is_empty() {
return 0;
};

let mut sum = 0;
let mut i: usize = 0;

for node in nodes {
let xor = target.xor(&node.id);

// Round up the lower 4 bytes to get a u128 from u160.
let di = u128::from_be_bytes(xor.as_bytes()[0..16].try_into().expect("infallible")) + 1;

// The inverse of the probability of finding (i) nodes at distance (di)
let estimated_n = i.saturating_mul((u128::MAX / di) as usize);

i += 1;
sum += estimated_n;
}

sum / i
}

impl Default for RoutingTable {
fn default() -> Self {
Self::new()
Expand Down
7 changes: 4 additions & 3 deletions src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use crate::{
hash_immutable, AnnouncePeerRequestArguments, FindNodeRequestArguments,
GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem,
PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific,
RequestTypeSpecific, RoutingTable,
RequestTypeSpecific,
},
rpc::{PutError, ReceivedFrom, ReceivedMessage, ResponseSender, Rpc},
server::{DefaultServer, Server},
Node,
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -148,8 +149,8 @@ impl Dht {

// === Find nodes ===

pub fn find_node(&self, target: Id) -> Result<RoutingTable, DhtWasShutdown> {
let (sender, receiver) = flume::bounded::<RoutingTable>(1);
pub fn find_node(&self, target: Id) -> Result<Vec<Node>, DhtWasShutdown> {
let (sender, receiver) = flume::bounded::<Vec<Node>>(1);

let request = RequestTypeSpecific::FindNode(FindNodeRequestArguments { target });

Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ pub mod async_dht;
pub mod rpc;
pub mod server;

pub use crate::common::{Id, KBucket, MutableItem, Node, RoutingTable};
pub use bytes::Bytes;
pub use crate::common::{Id, MutableItem, Node};
pub use dht::{Dht, Settings, Testnet};

pub use bytes::Bytes;
pub use ed25519_dalek::SigningKey;

pub mod errors {
Expand Down
31 changes: 14 additions & 17 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! K-RPC implementatioStoreQueryMetdatan
mod closest_nodes;
mod query;
mod socket;

Expand All @@ -14,18 +15,18 @@ use lru::LruCache;
use tracing::{debug, error, info};

use crate::common::{
estimate_dht_size, validate_immutable, ErrorSpecific, FindNodeRequestArguments,
GetImmutableResponseArguments, GetMutableResponseArguments, GetPeersResponseArguments,
GetValueRequestArguments, Id, Message, MessageType, MutableItem,
NoMoreRecentValueResponseArguments, NoValuesResponseArguments, Node, PutRequestSpecific,
RequestSpecific, RequestTypeSpecific, ResponseSpecific, RoutingTable,
validate_immutable, ErrorSpecific, FindNodeRequestArguments, GetImmutableResponseArguments,
GetMutableResponseArguments, GetPeersResponseArguments, GetValueRequestArguments, Id, Message,
MessageType, MutableItem, NoMoreRecentValueResponseArguments, NoValuesResponseArguments, Node,
PutRequestSpecific, RequestSpecific, RequestTypeSpecific, ResponseSpecific, RoutingTable,
};

use crate::dht::Settings;
use query::{PutQuery, Query};
use socket::KrpcSocket;

pub use crate::common::messages;
pub use closest_nodes::ClosestNodes;
pub use query::PutError;
pub use socket::DEFAULT_PORT;
pub use socket::DEFAULT_REQUEST_TIMEOUT;
Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct Rpc {
/// Last time we pinged nodes in the routing table.
last_table_ping: Instant,
/// Closest nodes to specific target
closest_nodes: LruCache<Id, Vec<Node>>,
closest_nodes: LruCache<Id, ClosestNodes>,

// Active Queries
queries: HashMap<Id, Query>,
Expand Down Expand Up @@ -156,7 +157,6 @@ impl Rpc {
pub fn tick(&mut self) -> RpcTickReport {
// === Tick Queries ===

let mut closest_nodes = Vec::with_capacity(self.queries.len());
let mut done_get_queries = Vec::with_capacity(self.queries.len());
let mut done_put_queries = Vec::with_capacity(self.put_queries.len());

Expand All @@ -175,10 +175,10 @@ impl Rpc {
let is_done = query.tick(&mut self.socket);

if is_done {
let closest = query.closest();
let closest_nodes = query.closest_nodes();

// Calculate moving average
let estimate = estimate_dht_size(query.target, &closest) as i32;
let estimate = closest_nodes.dht_size_estimate() as i32;
self.dht_size_estimate_samples =
(self.dht_size_estimate_samples + 1).min(DHT_SIZE_ESTIMATE_WINDOW);

Expand All @@ -188,10 +188,10 @@ impl Rpc {
+ (estimate - self.dht_size_estimate) / self.dht_size_estimate_samples;

if let Some(put_query) = self.put_queries.get_mut(id) {
put_query.start(&mut self.socket, closest.clone())
put_query.start(&mut self.socket, closest_nodes.nodes())
}

closest_nodes.push((*id, closest));
self.closest_nodes.put(*id, closest_nodes.clone());
done_get_queries.push(*id);

if id == &self_id && table_size == 0 {
Expand All @@ -201,9 +201,6 @@ impl Rpc {
};
};
}
for (id, nodes) in closest_nodes {
self.closest_nodes.put(id, nodes);
}

// === Remove done queries ===
// Has to happen _after_ ticking queries otherwise we might
Expand Down Expand Up @@ -293,9 +290,9 @@ impl Rpc {
if let Some(closest_nodes) = self
.closest_nodes
.get(&target)
.filter(|nodes| !nodes.is_empty() && nodes.iter().any(|n| n.valid_token()))
.filter(|nodes| !nodes.is_empty() && nodes.into_iter().any(|n| n.valid_token()))
{
query.start(&mut self.socket, closest_nodes.to_vec())
query.start(&mut self.socket, closest_nodes.nodes())
} else {
let salt = match request {
PutRequestSpecific::PutMutable(args) => args.salt,
Expand Down Expand Up @@ -685,7 +682,7 @@ pub enum Response {

#[derive(Debug, Clone)]
pub enum ResponseSender {
ClosestNodes(Sender<RoutingTable>),
ClosestNodes(Sender<Vec<Node>>),
Peers(Sender<Vec<SocketAddr>>),
Mutable(Sender<MutableItem>),
Immutable(Sender<Bytes>),
Expand Down
Loading

0 comments on commit b701558

Please sign in to comment.