From d660bd750adf31991785131c7d520b40129dbbc7 Mon Sep 17 00:00:00 2001 From: nazeh Date: Wed, 23 Oct 2024 10:07:18 +0300 Subject: [PATCH] feat: calculate dht size estimate without a dequeue --- src/common/routing_table.rs | 7 ++----- src/rpc.rs | 28 +++++++++++++++++----------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/common/routing_table.rs b/src/common/routing_table.rs index 011fa6d0..1c582991 100644 --- a/src/common/routing_table.rs +++ b/src/common/routing_table.rs @@ -213,11 +213,8 @@ pub(crate) fn estimate_dht_size(target: Id, nodes: &[Node]) -> usize { for node in nodes { let xor = target.xor(&node.id); - let di = - // Round up to the highest u128 and ignore the low part - u128::from_be_bytes(xor.as_bytes()[0..16].try_into().expect("infallible")) - // Round up to 1 to avoid dividing by zero - .max(1); + // Round up the lower 4 bytes to get a u128 from u160. + let di = u128::from_be_bytes(xor.as_bytes()[0..16].try_into().expect("infallible")) + 1; // The inverse of the probability of finding (i) nodes at distance (di) let estimated_n = i.saturating_mul((u128::MAX / di) as usize); diff --git a/src/rpc.rs b/src/rpc.rs index 0591d320..e5cc9f6a 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,7 +3,7 @@ mod query; mod socket; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::net::{SocketAddr, ToSocketAddrs}; use std::num::NonZeroUsize; use std::time::{Duration, Instant}; @@ -44,7 +44,7 @@ const MAX_CACHED_BUCKETS: usize = 1000; // If you are making a FIND_NODE requests subsequentially, // you can expect the oldest sample to be an hour ago. -const DHT_SIZE_ESTIMATE_SAMPLE_SIZE: usize = 1024; +const DHT_SIZE_ESTIMATE_WINDOW: i32 = 1024; #[derive(Debug)] /// Internal Rpc called in the Dht thread loop, useful to create your own actor setup. @@ -71,7 +71,9 @@ pub struct Rpc { /// get query to finish, update the closest_nodes, then `query_all` these. put_queries: HashMap, - dht_size_estimate_samples: VecDeque, + /// Moving average of the estimated dht size from the lookups within a [DHT_SIZE_ESTIMATE_WINDOW] + dht_size_estimate: i32, + dht_size_estimate_samples: i32, } impl Rpc { @@ -112,7 +114,8 @@ impl Rpc { .unwrap_or_else(Instant::now), last_table_ping: Instant::now(), - dht_size_estimate_samples: VecDeque::with_capacity(DHT_SIZE_ESTIMATE_SAMPLE_SIZE), + dht_size_estimate: 0, + dht_size_estimate_samples: 0, }) } @@ -142,8 +145,7 @@ impl Rpc { } pub fn dht_size_estimate(&self) -> usize { - self.dht_size_estimate_samples.iter().sum::() - / self.dht_size_estimate_samples.len().max(1) + self.dht_size_estimate as usize } // === Public Methods === @@ -175,11 +177,15 @@ impl Rpc { if is_done { let closest = query.closest(); - if self.dht_size_estimate_samples.len() >= DHT_SIZE_ESTIMATE_SAMPLE_SIZE { - self.dht_size_estimate_samples.remove(0); - } - let dht_size_estimate = estimate_dht_size(query.target, &closest); - self.dht_size_estimate_samples.push_back(dht_size_estimate); + // Calculate moving average + let estimate = estimate_dht_size(query.target, &closest) as i32; + self.dht_size_estimate_samples = + (self.dht_size_estimate_samples + 1).min(DHT_SIZE_ESTIMATE_WINDOW); + + // TODO: warn if a Horizontal Sybil attack is deteceted. + + self.dht_size_estimate = self.dht_size_estimate + + (estimate - self.dht_size_estimate) / self.dht_size_estimate_samples; if let Some(put_query) = self.put_queries.get_mut(id) { put_query.start(&mut self.socket, closest.clone())