Skip to content

Commit

Permalink
Add support for limiting router to consider N closest peers (#903)
Browse files Browse the repository at this point in the history
* implementation for select closest peers

* refactor and add additional test

* appease code formatter

* appease formatter

* appease formatter

* rename hyperparameter to consider_n_closest_peers

* appease formatter

* allow dead code
  • Loading branch information
kernelkind authored Dec 12, 2023
1 parent e9dfbc6 commit 8ef468f
Showing 1 changed file with 109 additions and 1 deletion.
110 changes: 109 additions & 1 deletion crates/core/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) struct Router {
transfer_rate_estimator: IsotonicEstimator,
failure_estimator: IsotonicEstimator,
mean_transfer_size: Mean,
consider_n_closest_peers: usize,
}

impl Router {
Expand Down Expand Up @@ -106,9 +107,16 @@ impl Router {
EstimatorType::Negative,
),
mean_transfer_size,
consider_n_closest_peers: 20,
}
}

#[allow(dead_code)]
pub fn considering_n_closest_peers(mut self, n: u32) -> Self {
self.consider_n_closest_peers = n as usize;
self
}

pub fn add_event(&mut self, event: RouteEvent) {
match event.outcome {
RouteOutcome::Success {
Expand Down Expand Up @@ -145,6 +153,33 @@ impl Router {
}
}

fn select_closest_peers<'a>(
&self,
peers: impl IntoIterator<Item = &'a PeerKeyLocation>,
target_location: &Location,
) -> Vec<&'a PeerKeyLocation> {
let mut heap =
std::collections::BinaryHeap::with_capacity(self.consider_n_closest_peers + 1);

for peer_location in peers {
if let Some(location) = peer_location.location.as_ref() {
let distance = target_location.distance(location);
heap.push((distance, peer_location));

// Ensure we keep the heap size to specified capacity
if heap.len() > self.consider_n_closest_peers {
heap.pop();
}
}
}

// Convert the heap to a sorted vector
heap.into_sorted_vec()
.into_iter()
.map(|(_, peer_location)| peer_location)
.collect()
}

pub fn select_peer<'a>(
&self,
peers: impl IntoIterator<Item = &'a PeerKeyLocation>,
Expand All @@ -163,7 +198,7 @@ impl Router {
.map(|(peer, _)| peer)
} else {
// Find the peer with the minimum predicted routing outcome time
peers
self.select_closest_peers(peers, &target_location)
.into_iter()
.map(|peer: &PeerKeyLocation| {
let t = self.predict_routing_outcome(peer, target_location).expect(
Expand Down Expand Up @@ -279,6 +314,8 @@ pub enum RouteOutcome {
mod tests {
use rand::Rng;

use crate::ring::Distance;

use super::*;

#[test]
Expand Down Expand Up @@ -393,6 +430,46 @@ mod tests {
}
}

#[test]
fn test_select_closest_peers_size() {
const NUM_PEERS: u32 = 45;
const CAP: u32 = 30;

assert_eq!(
CAP as usize,
Router::new(&[])
.considering_n_closest_peers(CAP)
.select_closest_peers(&create_peers(NUM_PEERS), &Location::random())
.len()
);
}

#[test]
fn test_select_closest_peers_equality() {
const NUM_PEERS: u32 = 100;
const CLOSEST_CAP: u32 = 10;
let peers: Vec<PeerKeyLocation> = create_peers(NUM_PEERS);
let contract_location = Location::random();

let expected_closest = select_closest_peers_vec(CLOSEST_CAP, &peers, &contract_location);

// Create a router with no historical data
let router = Router::new(&[]).considering_n_closest_peers(CLOSEST_CAP);
let asserted_closest: Vec<&PeerKeyLocation> =
router.select_closest_peers(&peers, &contract_location);

let mut expected_iter = expected_closest.iter();
let mut asserted_iter = asserted_closest.iter();

while let (Some(expected_location), Some(asserted_location)) =
(expected_iter.next(), asserted_iter.next())
{
assert_eq!(**expected_location, **asserted_location);
}

assert_eq!(expected_iter.next(), asserted_iter.next());
}

fn simulate_prediction(
random: &mut rand::rngs::ThreadRng,
peer: PeerKeyLocation,
Expand All @@ -413,4 +490,35 @@ mod tests {
expected_total_time: time_to_response_start + transfer_time,
}
}

fn select_closest_peers_vec<'a>(
closest_peers_capacity: u32,
peers: impl IntoIterator<Item = &'a PeerKeyLocation>,
target_location: &Location,
) -> Vec<&'a PeerKeyLocation>
where
PeerKeyLocation: Clone,
{
let mut closest: Vec<&'a PeerKeyLocation> = peers.into_iter().collect();
closest.sort_by_key(|&peer| {
if let Some(location) = peer.location {
target_location.distance(location)
} else {
Distance::new(f64::MAX)
}
});

closest[..closest_peers_capacity as usize].to_vec()
}

fn create_peers(num_peers: u32) -> Vec<PeerKeyLocation> {
let mut peers: Vec<PeerKeyLocation> = vec![];

for _ in 0..num_peers {
let peer = PeerKeyLocation::random();
peers.push(peer);
}

peers
}
}

0 comments on commit 8ef468f

Please sign in to comment.