Skip to content

Commit

Permalink
Merge pull request #887 from freenet/186081753-add-topology-manager
Browse files Browse the repository at this point in the history
186081753 - Add topology manager
  • Loading branch information
iduartgomez authored Nov 6, 2023
2 parents 51e96f5 + 9b71bda commit c7e208d
Show file tree
Hide file tree
Showing 11 changed files with 1,418 additions and 242 deletions.
649 changes: 410 additions & 239 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ tracing = "0.1"
arbitrary = { version = "1", features = ["derive"] }
itertools = "0.11"
pico-args = "0.5"
statrs = "0.16.0"
freenet-stdlib = { workspace = true, features = ["testing", "net"] }

[features]
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod router;
mod runtime;
#[cfg(feature = "websocket")]
pub mod server;
mod topology;
pub mod util;

type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down
35 changes: 32 additions & 3 deletions crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
//! - next node
//! - final location
use std::hash::Hash;
use std::{
collections::BTreeMap,
convert::TryFrom,
fmt::Display,
hash::Hasher,
ops::Add,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering::SeqCst},
Arc,
Expand Down Expand Up @@ -446,6 +448,11 @@ impl Location {
Location(location)
}

/// Returns a new location rounded to ensure it is between 0.0 and 1.0
pub fn new_rounded(location: f64) -> Self {
Self::new(location.rem_euclid(1.0))
}

/// Returns a new random location.
pub fn random() -> Self {
use rand::prelude::*;
Expand All @@ -462,6 +469,10 @@ impl Location {
Distance::new(1.0f64 - d)
}
}

pub fn as_f64(&self) -> f64 {
self.0
}
}

/// Ensure at compile time locations can only be constructed from well formed contract keys
Expand Down Expand Up @@ -535,23 +546,41 @@ impl Distance {
pub fn new(value: f64) -> Self {
debug_assert!(!value.is_nan(), "Distance cannot be NaN");
debug_assert!(
(0.0..=0.5).contains(&value),
"Distance must be in the range [0, 0.5]"
(0.0..=1.0).contains(&value),
"Distance must be in the range [0, 1.0]"
);
Distance(value)
if value <= 0.5 {
Distance(value)
} else {
Distance(1.0 - value)
}
}

pub fn as_f64(&self) -> f64 {
self.0
}
}

impl Add for Distance {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
let d = self.0 + rhs.0;
if d > 0.5 {
Distance::new(1.0 - d)
} else {
Distance::new(d)
}
}
}

impl PartialEq for Distance {
fn eq(&self, other: &Self) -> bool {
(self.0 - other.0).abs() < f64::EPSILON
}
}

#[allow(clippy::incorrect_partial_ord_impl_on_ord_type)]
impl PartialOrd for Distance {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
Expand Down
63 changes: 63 additions & 0 deletions crates/core/src/topology/connection_evaluator/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::collections::VecDeque;
use std::time::{Duration, Instant};

/// `ConnectionEvaluator` is used to evaluate connection scores within a specified time window.
///
/// The evaluator records scores and determines whether a given score is better (higher) than
/// any other scores within a predefined time window. A score is considered better if it's higher
/// than all other scores in the time window, or if no scores were recorded within the window's
/// duration.
///
/// In the Freenet context, this will be used to titrate the rate of new connection requests accepted
/// by a node. The node will only accept a new connection if the score of the connection is better
/// than all other scores within the time window.
pub(crate) struct ConnectionEvaluator {
scores: VecDeque<(Instant, f64)>,
window_duration: Duration,
}

impl ConnectionEvaluator {
pub fn new(window_duration: Duration) -> Self {
ConnectionEvaluator {
scores: VecDeque::new(),
window_duration,
}
}

pub fn record_only(&mut self, score: f64) {
self.record_only_with_current_time(score, Instant::now());
}

pub fn record_only_with_current_time(&mut self, score: f64, current_time: Instant) {
self.remove_outdated_scores(current_time);
self.scores.push_back((current_time, score));
}

pub fn record_and_eval(&mut self, score: f64) -> bool {
self.record_and_eval_with_current_time(score, Instant::now())
}

pub fn record_and_eval_with_current_time(&mut self, score: f64, current_time: Instant) -> bool {
self.remove_outdated_scores(current_time);

let is_better = self.scores.is_empty() || self.scores.iter().all(|&(_, s)| score > s);

// Important to add new score *after* checking if it's better than all other scores
self.record_only_with_current_time(score, current_time);

is_better
}

fn remove_outdated_scores(&mut self, current_time: Instant) {
while let Some(&(time, _)) = self.scores.front() {
if current_time.duration_since(time) > self.window_duration {
self.scores.pop_front();
} else {
break;
}
}
}
}

#[cfg(test)]
mod tests;
84 changes: 84 additions & 0 deletions crates/core/src/topology/connection_evaluator/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use super::*;

#[test]
fn test_record_first_score() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10));
let current_time = Instant::now();
assert!(evaluator.record_and_eval_with_current_time(5.0, current_time));
// Assert evaluator.scores contains the new score
assert_eq!(evaluator.scores.len(), 1);
assert_eq!(evaluator.scores[0].1, 5.0);
assert_eq!(evaluator.scores[0].0, current_time);
}

#[test]
fn test_not_best_in_time_window() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10));

let start_time = Instant::now();
evaluator.record_and_eval_with_current_time(5.0, start_time);
assert!(!evaluator.record_and_eval_with_current_time(4.0, start_time + Duration::from_secs(5)),);
}

#[test]
fn test_best_in_time_window() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10));

let start_time = Instant::now();
evaluator.record_and_eval_with_current_time(5.0, start_time);
assert!(evaluator.record_and_eval_with_current_time(4.0, start_time + Duration::from_secs(11)),);
}

#[test]
fn test_remove_outdated_scores() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10));

let start_time = Instant::now();
evaluator.record_and_eval_with_current_time(5.0, start_time);
evaluator.record_and_eval_with_current_time(6.0, start_time + Duration::from_secs(5));
evaluator.record_and_eval_with_current_time(4.5, start_time + Duration::from_secs(11));
assert_eq!(evaluator.scores.len(), 2);
}

#[test]
fn test_empty_window_duration() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(0));
let current_time = Instant::now();
assert!(evaluator.record_and_eval_with_current_time(5.0, current_time));
assert!(!evaluator.record_and_eval_with_current_time(4.0, current_time));
}

#[test]
fn test_multiple_scores_same_timestamp() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10));
let current_time = Instant::now();
evaluator.record_only_with_current_time(5.0, current_time);
evaluator.record_only_with_current_time(6.0, current_time);
assert_eq!(evaluator.scores.len(), 2);
assert!(
!evaluator.record_and_eval_with_current_time(4.0, current_time + Duration::from_secs(5)),
);
}

#[test]
fn test_negative_scores() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10));
let start_time = Instant::now();
assert!(evaluator.record_and_eval_with_current_time(-5.0, start_time),);
assert!(evaluator.record_and_eval_with_current_time(-4.0, start_time + Duration::from_secs(5)),);
assert!(
!evaluator.record_and_eval_with_current_time(-6.0, start_time + Duration::from_secs(5)),
);
}

#[test]
fn test_large_number_of_scores() {
let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10));
let start_time = Instant::now();
for i in 0..1000 {
evaluator.record_only_with_current_time(i as f64, start_time + Duration::from_secs(i));
}
assert!(
evaluator.record_and_eval_with_current_time(1000.0, start_time + Duration::from_secs(1001)),
);
}
Loading

0 comments on commit c7e208d

Please sign in to comment.