diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index 79f3412ae..664f2bdd9 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -16,11 +16,12 @@ use std::{ convert::TryFrom, fmt::Display, hash::Hasher, + ops::Add, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering::SeqCst}, Arc, }, - time::Duration, ops::Add, + time::Duration, }; use anyhow::bail; diff --git a/crates/core/src/topology/mod.rs b/crates/core/src/topology/mod.rs index 42d1e7be8..3135ae4f8 100644 --- a/crates/core/src/topology/mod.rs +++ b/crates/core/src/topology/mod.rs @@ -1,15 +1,19 @@ #![allow(unused_variables, dead_code)] -use std::{collections::BTreeMap, rc::Rc, time::{Duration, Instant}}; -use tracing::{debug, error, info}; -use request_density_tracker::cached_density_map::CachedDensityMap; use crate::ring::{Distance, Location}; +use request_density_tracker::cached_density_map::CachedDensityMap; +use std::{ + collections::BTreeMap, + rc::Rc, + time::{Duration, Instant}, +}; +use tracing::{debug, error, info}; use self::{request_density_tracker::DensityMapError, small_world_rand::random_link_distance}; +mod connection_evaluator; mod request_density_tracker; mod small_world_rand; -mod connection_evaluator; const SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION: Duration = Duration::from_secs(5 * 60); const FAST_CONNECTION_EVALUATOR_WINDOW_DURATION: Duration = Duration::from_secs(60); @@ -20,17 +24,17 @@ const RANDOM_CLOSEST_DISTANCE: f64 = 1.0 / 1000.0; /// The goal of `TopologyManager` is to select new connections such that the /// distribution of connections is as close as possible to the /// distribution of outbound requests. -/// +/// /// This is done by maintaining a `RequestDensityTracker` which tracks the /// distribution of requests in the network. The `TopologyManager` uses this /// tracker to create a `DensityMap` which is used to evaluate the density of /// requests at a given location. -/// +/// /// The `TopologyManager` uses the density map to select the best candidate /// location, which is assumed to be close to peer connections that are /// currently receiving a lot of requests. This should have the effect of /// "balancing" out requests over time. -/// +/// /// The `TopologyManager` also uses a `ConnectionEvaluator` to evaluate whether /// a given connection is better than all other connections within a predefined /// time window. The goal of this is to select the best connections over time @@ -45,19 +49,29 @@ pub(crate) struct TopologyManager { impl TopologyManager { /// Create a new TopologyManager specifying the peer's own Location - pub(crate) fn new(this_peer_location : Location) -> Self { + pub(crate) fn new(this_peer_location: Location) -> Self { info!("Creating a new TopologyManager instance"); TopologyManager { - slow_connection_evaluator: connection_evaluator::ConnectionEvaluator::new(SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION), - fast_connection_evaluator: connection_evaluator::ConnectionEvaluator::new(FAST_CONNECTION_EVALUATOR_WINDOW_DURATION), - request_density_tracker: request_density_tracker::RequestDensityTracker::new(REQUEST_DENSITY_TRACKER_WINDOW_SIZE), + slow_connection_evaluator: connection_evaluator::ConnectionEvaluator::new( + SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION, + ), + fast_connection_evaluator: connection_evaluator::ConnectionEvaluator::new( + FAST_CONNECTION_EVALUATOR_WINDOW_DURATION, + ), + request_density_tracker: request_density_tracker::RequestDensityTracker::new( + REQUEST_DENSITY_TRACKER_WINDOW_SIZE, + ), cached_density_map: CachedDensityMap::new(REGENERATE_DENSITY_MAP_INTERVAL), this_peer_location, } } /// Record a request and the location it's targeting - pub(crate) fn record_request(&mut self, requested_location: Location, request_type : RequestType) { + pub(crate) fn record_request( + &mut self, + requested_location: Location, + request_type: RequestType, + ) { debug!("Recording request for location: {:?}", requested_location); self.request_density_tracker.sample(requested_location); } @@ -65,45 +79,73 @@ impl TopologyManager { /// Decide whether to accept a connection from a new candidate peer based on its location /// and current neighbors and request density, along with how it compares to other /// recent candidates. - pub(crate) fn evaluate_new_connection(&mut self, current_neighbors: &BTreeMap, candidate_location: Location, acquisition_strategy : AcquisitionStrategy) -> Result { - self.evaluate_new_connection_with_current_time(current_neighbors, candidate_location, acquisition_strategy, Instant::now()) + pub(crate) fn evaluate_new_connection( + &mut self, + current_neighbors: &BTreeMap, + candidate_location: Location, + acquisition_strategy: AcquisitionStrategy, + ) -> Result { + self.evaluate_new_connection_with_current_time( + current_neighbors, + candidate_location, + acquisition_strategy, + Instant::now(), + ) } - fn evaluate_new_connection_with_current_time(&mut self, current_neighbors: &BTreeMap, candidate_location: Location, acquisition_strategy : AcquisitionStrategy, current_time : Instant) -> Result { - debug!("Evaluating new connection for candidate location: {:?}", candidate_location); + fn evaluate_new_connection_with_current_time( + &mut self, + current_neighbors: &BTreeMap, + candidate_location: Location, + acquisition_strategy: AcquisitionStrategy, + current_time: Instant, + ) -> Result { + debug!( + "Evaluating new connection for candidate location: {:?}", + candidate_location + ); let density_map = self.get_or_create_density_map(current_neighbors)?; let score = density_map.get_density_at(candidate_location)?; let accept = match acquisition_strategy { AcquisitionStrategy::Slow => { - self.fast_connection_evaluator.record_only_with_current_time(score, current_time); - self.slow_connection_evaluator.record_and_eval_with_current_time(score, current_time) - }, + self.fast_connection_evaluator + .record_only_with_current_time(score, current_time); + self.slow_connection_evaluator + .record_and_eval_with_current_time(score, current_time) + } AcquisitionStrategy::Fast => { - self.slow_connection_evaluator.record_only_with_current_time(score, current_time); - self.fast_connection_evaluator.record_and_eval_with_current_time(score, current_time) - }, + self.slow_connection_evaluator + .record_only_with_current_time(score, current_time); + self.fast_connection_evaluator + .record_and_eval_with_current_time(score, current_time) + } }; - + Ok(accept) } /// Get the ideal location for a new connection based on current neighbors and request density - pub(crate) fn get_best_candidate_location(&mut self, current_neighbors: &BTreeMap) -> Result { + pub(crate) fn get_best_candidate_location( + &mut self, + current_neighbors: &BTreeMap, + ) -> Result { debug!("Retrieving best candidate location"); let density_map = self.get_or_create_density_map(current_neighbors)?; - + let best_location = match density_map.get_max_density() { Ok(location) => { debug!("Max density found at location: {:?}", location); location - }, + } Err(_) => { - error!("An error occurred while getting max density, falling back to random location"); + error!( + "An error occurred while getting max density, falling back to random location" + ); self.random_location() - }, + } }; - + Ok(best_location) } @@ -117,13 +159,17 @@ impl TopologyManager { } else { self.this_peer_location.as_f64() + distance.as_f64() }; - let location_f64 = location_f64.rem_euclid(1.0); // Ensure result is in [0.0, 1.0) + let location_f64 = location_f64.rem_euclid(1.0); // Ensure result is in [0.0, 1.0) Location::new(location_f64) - } + } - fn get_or_create_density_map(&mut self, current_neighbors: &BTreeMap) -> Result, DensityMapError> { + fn get_or_create_density_map( + &mut self, + current_neighbors: &BTreeMap, + ) -> Result, DensityMapError> { debug!("Getting or creating density map"); - self.cached_density_map.get_or_create(&self.request_density_tracker, current_neighbors) + self.cached_density_map + .get_or_create(&self.request_density_tracker, current_neighbors) } } @@ -131,7 +177,7 @@ pub(crate) enum RequestType { Get, Put, Join, - Subscribe + Subscribe, } pub(crate) enum AcquisitionStrategy { @@ -144,8 +190,8 @@ pub(crate) enum AcquisitionStrategy { #[cfg(test)] mod tests { - use crate::ring::Location; use super::TopologyManager; + use crate::ring::Location; #[test] fn test_topology_manager() { @@ -165,7 +211,9 @@ mod tests { requests.push(requested_location); } - let best_candidate_location = topology_manager.get_best_candidate_location(¤t_neighbors).unwrap(); + let best_candidate_location = topology_manager + .get_best_candidate_location(¤t_neighbors) + .unwrap(); // Should be half way between 0.3 and 0.4 as that is where the most requests were assert_eq!(best_candidate_location, Location::new(0.35)); @@ -177,7 +225,9 @@ mod tests { let candidate_location = Location::new(i as f64 / 100.0); let score = topology_manager .get_or_create_density_map(¤t_neighbors) - .unwrap().get_density_at(candidate_location).unwrap(); + .unwrap() + .get_density_at(candidate_location) + .unwrap(); if score > best_score { best_score = score; best_location = candidate_location;