Skip to content

Commit

Permalink
feat: calculate dht size estimate without a dequeue
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Oct 23, 2024
1 parent e769d3d commit d660bd7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
7 changes: 2 additions & 5 deletions src/common/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,8 @@ pub(crate) fn estimate_dht_size(target: Id, nodes: &[Node]) -> usize {
for node in nodes {
let xor = target.xor(&node.id);

let di =
// Round up to the highest u128 and ignore the low part
u128::from_be_bytes(xor.as_bytes()[0..16].try_into().expect("infallible"))
// Round up to 1 to avoid dividing by zero
.max(1);
// Round up the lower 4 bytes to get a u128 from u160.
let di = u128::from_be_bytes(xor.as_bytes()[0..16].try_into().expect("infallible")) + 1;

// The inverse of the probability of finding (i) nodes at distance (di)
let estimated_n = i.saturating_mul((u128::MAX / di) as usize);
Expand Down
28 changes: 17 additions & 11 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod query;
mod socket;

use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -44,7 +44,7 @@ const MAX_CACHED_BUCKETS: usize = 1000;

// If you are making a FIND_NODE requests subsequentially,
// you can expect the oldest sample to be an hour ago.
const DHT_SIZE_ESTIMATE_SAMPLE_SIZE: usize = 1024;
const DHT_SIZE_ESTIMATE_WINDOW: i32 = 1024;

#[derive(Debug)]
/// Internal Rpc called in the Dht thread loop, useful to create your own actor setup.
Expand All @@ -71,7 +71,9 @@ pub struct Rpc {
/// get query to finish, update the closest_nodes, then `query_all` these.
put_queries: HashMap<Id, PutQuery>,

dht_size_estimate_samples: VecDeque<usize>,
/// Moving average of the estimated dht size from the lookups within a [DHT_SIZE_ESTIMATE_WINDOW]
dht_size_estimate: i32,
dht_size_estimate_samples: i32,
}

impl Rpc {
Expand Down Expand Up @@ -112,7 +114,8 @@ impl Rpc {
.unwrap_or_else(Instant::now),
last_table_ping: Instant::now(),

dht_size_estimate_samples: VecDeque::with_capacity(DHT_SIZE_ESTIMATE_SAMPLE_SIZE),
dht_size_estimate: 0,
dht_size_estimate_samples: 0,
})
}

Expand Down Expand Up @@ -142,8 +145,7 @@ impl Rpc {
}

pub fn dht_size_estimate(&self) -> usize {
self.dht_size_estimate_samples.iter().sum::<usize>()
/ self.dht_size_estimate_samples.len().max(1)
self.dht_size_estimate as usize
}

// === Public Methods ===
Expand Down Expand Up @@ -175,11 +177,15 @@ impl Rpc {
if is_done {
let closest = query.closest();

if self.dht_size_estimate_samples.len() >= DHT_SIZE_ESTIMATE_SAMPLE_SIZE {
self.dht_size_estimate_samples.remove(0);
}
let dht_size_estimate = estimate_dht_size(query.target, &closest);
self.dht_size_estimate_samples.push_back(dht_size_estimate);
// Calculate moving average
let estimate = estimate_dht_size(query.target, &closest) as i32;
self.dht_size_estimate_samples =
(self.dht_size_estimate_samples + 1).min(DHT_SIZE_ESTIMATE_WINDOW);

// TODO: warn if a Horizontal Sybil attack is deteceted.

self.dht_size_estimate = self.dht_size_estimate
+ (estimate - self.dht_size_estimate) / self.dht_size_estimate_samples;

if let Some(put_query) = self.put_queries.get_mut(id) {
put_query.start(&mut self.socket, closest.clone())
Expand Down

0 comments on commit d660bd7

Please sign in to comment.