Skip to content

Commit

Permalink
tm: cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Nov 6, 2023
1 parent e72341f commit 9b71bda
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 38 deletions.
3 changes: 2 additions & 1 deletion crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::{
convert::TryFrom,
fmt::Display,
hash::Hasher,
ops::Add,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering::SeqCst},
Arc,
},
time::Duration, ops::Add,
time::Duration,
};

use anyhow::bail;
Expand Down
124 changes: 87 additions & 37 deletions crates/core/src/topology/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
#![allow(unused_variables, dead_code)]

use std::{collections::BTreeMap, rc::Rc, time::{Duration, Instant}};
use tracing::{debug, error, info};
use request_density_tracker::cached_density_map::CachedDensityMap;
use crate::ring::{Distance, Location};
use request_density_tracker::cached_density_map::CachedDensityMap;
use std::{
collections::BTreeMap,
rc::Rc,
time::{Duration, Instant},
};
use tracing::{debug, error, info};

use self::{request_density_tracker::DensityMapError, small_world_rand::random_link_distance};

mod connection_evaluator;
mod request_density_tracker;
mod small_world_rand;
mod connection_evaluator;

const SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION: Duration = Duration::from_secs(5 * 60);
const FAST_CONNECTION_EVALUATOR_WINDOW_DURATION: Duration = Duration::from_secs(60);
Expand All @@ -20,17 +24,17 @@ const RANDOM_CLOSEST_DISTANCE: f64 = 1.0 / 1000.0;
/// The goal of `TopologyManager` is to select new connections such that the
/// distribution of connections is as close as possible to the
/// distribution of outbound requests.
///
///
/// This is done by maintaining a `RequestDensityTracker` which tracks the
/// distribution of requests in the network. The `TopologyManager` uses this
/// tracker to create a `DensityMap` which is used to evaluate the density of
/// requests at a given location.
///
///
/// The `TopologyManager` uses the density map to select the best candidate
/// location, which is assumed to be close to peer connections that are
/// currently receiving a lot of requests. This should have the effect of
/// "balancing" out requests over time.
///
///
/// The `TopologyManager` also uses a `ConnectionEvaluator` to evaluate whether
/// a given connection is better than all other connections within a predefined
/// time window. The goal of this is to select the best connections over time
Expand All @@ -45,65 +49,103 @@ pub(crate) struct TopologyManager {

impl TopologyManager {
/// Create a new TopologyManager specifying the peer's own Location
pub(crate) fn new(this_peer_location : Location) -> Self {
pub(crate) fn new(this_peer_location: Location) -> Self {
info!("Creating a new TopologyManager instance");
TopologyManager {
slow_connection_evaluator: connection_evaluator::ConnectionEvaluator::new(SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION),
fast_connection_evaluator: connection_evaluator::ConnectionEvaluator::new(FAST_CONNECTION_EVALUATOR_WINDOW_DURATION),
request_density_tracker: request_density_tracker::RequestDensityTracker::new(REQUEST_DENSITY_TRACKER_WINDOW_SIZE),
slow_connection_evaluator: connection_evaluator::ConnectionEvaluator::new(
SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION,
),
fast_connection_evaluator: connection_evaluator::ConnectionEvaluator::new(
FAST_CONNECTION_EVALUATOR_WINDOW_DURATION,
),
request_density_tracker: request_density_tracker::RequestDensityTracker::new(
REQUEST_DENSITY_TRACKER_WINDOW_SIZE,
),
cached_density_map: CachedDensityMap::new(REGENERATE_DENSITY_MAP_INTERVAL),
this_peer_location,
}
}

/// Record a request and the location it's targeting
pub(crate) fn record_request(&mut self, requested_location: Location, request_type : RequestType) {
pub(crate) fn record_request(
&mut self,
requested_location: Location,
request_type: RequestType,
) {
debug!("Recording request for location: {:?}", requested_location);
self.request_density_tracker.sample(requested_location);
}

/// Decide whether to accept a connection from a new candidate peer based on its location
/// and current neighbors and request density, along with how it compares to other
/// recent candidates.
pub(crate) fn evaluate_new_connection(&mut self, current_neighbors: &BTreeMap<Location, usize>, candidate_location: Location, acquisition_strategy : AcquisitionStrategy) -> Result<bool, DensityMapError> {
self.evaluate_new_connection_with_current_time(current_neighbors, candidate_location, acquisition_strategy, Instant::now())
pub(crate) fn evaluate_new_connection(
&mut self,
current_neighbors: &BTreeMap<Location, usize>,
candidate_location: Location,
acquisition_strategy: AcquisitionStrategy,
) -> Result<bool, DensityMapError> {
self.evaluate_new_connection_with_current_time(
current_neighbors,
candidate_location,
acquisition_strategy,
Instant::now(),
)
}

fn evaluate_new_connection_with_current_time(&mut self, current_neighbors: &BTreeMap<Location, usize>, candidate_location: Location, acquisition_strategy : AcquisitionStrategy, current_time : Instant) -> Result<bool, DensityMapError> {
debug!("Evaluating new connection for candidate location: {:?}", candidate_location);
fn evaluate_new_connection_with_current_time(
&mut self,
current_neighbors: &BTreeMap<Location, usize>,
candidate_location: Location,
acquisition_strategy: AcquisitionStrategy,
current_time: Instant,
) -> Result<bool, DensityMapError> {
debug!(
"Evaluating new connection for candidate location: {:?}",
candidate_location
);
let density_map = self.get_or_create_density_map(current_neighbors)?;
let score = density_map.get_density_at(candidate_location)?;

let accept = match acquisition_strategy {
AcquisitionStrategy::Slow => {
self.fast_connection_evaluator.record_only_with_current_time(score, current_time);
self.slow_connection_evaluator.record_and_eval_with_current_time(score, current_time)
},
self.fast_connection_evaluator
.record_only_with_current_time(score, current_time);
self.slow_connection_evaluator
.record_and_eval_with_current_time(score, current_time)
}
AcquisitionStrategy::Fast => {
self.slow_connection_evaluator.record_only_with_current_time(score, current_time);
self.fast_connection_evaluator.record_and_eval_with_current_time(score, current_time)
},
self.slow_connection_evaluator
.record_only_with_current_time(score, current_time);
self.fast_connection_evaluator
.record_and_eval_with_current_time(score, current_time)
}
};

Ok(accept)
}

/// Get the ideal location for a new connection based on current neighbors and request density
pub(crate) fn get_best_candidate_location(&mut self, current_neighbors: &BTreeMap<Location, usize>) -> Result<Location, DensityMapError> {
pub(crate) fn get_best_candidate_location(
&mut self,
current_neighbors: &BTreeMap<Location, usize>,
) -> Result<Location, DensityMapError> {
debug!("Retrieving best candidate location");
let density_map = self.get_or_create_density_map(current_neighbors)?;

let best_location = match density_map.get_max_density() {
Ok(location) => {
debug!("Max density found at location: {:?}", location);
location
},
}
Err(_) => {
error!("An error occurred while getting max density, falling back to random location");
error!(
"An error occurred while getting max density, falling back to random location"
);
self.random_location()
},
}
};

Ok(best_location)
}

Expand All @@ -117,21 +159,25 @@ impl TopologyManager {
} else {
self.this_peer_location.as_f64() + distance.as_f64()
};
let location_f64 = location_f64.rem_euclid(1.0); // Ensure result is in [0.0, 1.0)
let location_f64 = location_f64.rem_euclid(1.0); // Ensure result is in [0.0, 1.0)
Location::new(location_f64)
}
}

fn get_or_create_density_map(&mut self, current_neighbors: &BTreeMap<Location, usize>) -> Result<Rc<request_density_tracker::DensityMap>, DensityMapError> {
fn get_or_create_density_map(
&mut self,
current_neighbors: &BTreeMap<Location, usize>,
) -> Result<Rc<request_density_tracker::DensityMap>, DensityMapError> {
debug!("Getting or creating density map");
self.cached_density_map.get_or_create(&self.request_density_tracker, current_neighbors)
self.cached_density_map
.get_or_create(&self.request_density_tracker, current_neighbors)
}
}

pub(crate) enum RequestType {
Get,
Put,
Join,
Subscribe
Subscribe,
}

pub(crate) enum AcquisitionStrategy {
Expand All @@ -144,8 +190,8 @@ pub(crate) enum AcquisitionStrategy {

#[cfg(test)]
mod tests {
use crate::ring::Location;
use super::TopologyManager;
use crate::ring::Location;

#[test]
fn test_topology_manager() {
Expand All @@ -165,7 +211,9 @@ mod tests {
requests.push(requested_location);
}

let best_candidate_location = topology_manager.get_best_candidate_location(&current_neighbors).unwrap();
let best_candidate_location = topology_manager
.get_best_candidate_location(&current_neighbors)
.unwrap();
// Should be half way between 0.3 and 0.4 as that is where the most requests were
assert_eq!(best_candidate_location, Location::new(0.35));

Expand All @@ -177,7 +225,9 @@ mod tests {
let candidate_location = Location::new(i as f64 / 100.0);
let score = topology_manager
.get_or_create_density_map(&current_neighbors)
.unwrap().get_density_at(candidate_location).unwrap();
.unwrap()
.get_density_at(candidate_location)
.unwrap();
if score > best_score {
best_score = score;
best_location = candidate_location;
Expand Down

0 comments on commit 9b71bda

Please sign in to comment.