Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource tracking and usage management #895

Merged
merged 16 commits into from
Nov 13, 2023
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod contract;
mod message;
mod node;
mod operations;
mod resource_manager;
mod resources;
mod ring;
mod router;
mod runtime;
Expand Down
193 changes: 152 additions & 41 deletions crates/core/src/resource_manager.rs → crates/core/src/resources.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,75 @@
// FIXME: remove this
#![allow(dead_code)]
#![allow(unused)]
//! # Resource Management
//!
//! The resource management module is responsible for tracking resource usage,
//! ensuring that usage does not exceed specified limits, and ensure that those
//! resources are used to maximize the utility of the network.
//!
//! ## Resources
//!
//! The resource management module tracks usage of the following resources:
//!
//! * Upstream and downstream bandwidth
//! * CPU usage
//!
//! These resources will be tracked in the future:
//!
//! * Memory usage
//! * Storage
//!
//! ## Attribution
//!
//! When used this resource usage is attributed to either:
//!
//! * Remote
//! * A connected peer
//! * Local
//! * A local delegate
//! * The user interface
//!
//! ## Resource allocation for contract subscriptions
//!
//! When one or more peers are subscribed to a contract, the required
//! resources will be allocated as follows:
//!
//! * Upstream bandwidth is allocated to the subscribed peer to which
//! the data is sent
//! * Downstream bandwidth and CPU is split evenly between all subscribed
//! peers for that contract
//!
//! ## Resource limits
//!
//! Resource limits should be set to ensure that the peer does not disrupt the
//! user's experience of their computer. We should choose intelligent defaults
//! but the limits should be user-configurable.
//!
//! ## Code component overview
//!
//! The [ResourceManager] is responsible for tracking resource usage and identifying
//! which peers to remove if/when limits are exceeded. It does this by identifying
//! the peers with the highest usage of the limit-exceeding resource relative to
//! their usefulness. The usefulness of a peer is determined by the number of
//! requests sent to that peer over a certain period of time.
//!
//! A [Meter] is used by the ResourceManager to tracking resource usage over time.
//! Resource usage is reported to the meter, which tracks the usage over a sliding
//! window of time. The meter is responsible for calculating the rate of resource
//! usage along with which peers (or delegates, or UIs) are responsible for that
//! usage.
//!
//! ## Future Improvements
//!
//! * Track non-flow resources like memory and storage
//! * Responses to specific requests will contain information about the resources used
//! by downstream peers to fulfill the request, however how this information is used
//! will require careful consideration.

mod meter;
pub mod rate;
mod running_average;

use std::time::Instant;

use crate::ring::PeerKeyLocation;

use self::meter::{AttributionSource, Meter, ResourceType};
use crate::ring::PeerKeyLocation;
use std::time::Instant;

pub(crate) struct ResourceManager {
limits: Limits,
Expand All @@ -19,7 +79,7 @@ pub(crate) struct ResourceManager {
impl ResourceManager {
pub fn new(limits: Limits) -> Self {
ResourceManager {
meter: Meter::new(),
meter: Meter::new_with_window_size(100),
limits,
}
}
Expand All @@ -30,7 +90,7 @@ impl ResourceManager {

/// Report the use of a resource.
pub(crate) fn report(
&self,
&mut self,
_time: Instant,
attribution: &AttributionSource,
resource: ResourceType,
Expand Down Expand Up @@ -77,25 +137,36 @@ impl ResourceManager {
where
P: IntoIterator<Item = PeerValue>,
{
let total_usage: f64 = self.meter.total_usage(resource_type);
let total_usage = match self.meter.resource_usage_rate(resource_type) {
Some(rate) => rate.per_second(),
None => return vec![], // Or handle the error as appropriate
};

let total_limit: f64 = self.limits.get(resource_type);
if total_usage > total_limit {
let mut candidate_costs = vec![];
for PeerValue { peer, value } in candidates {
let cost = self
if let Some(cost) = self
.meter
.attributed_usage(&AttributionSource::Peer(peer), resource_type);
const MIN_VALUE: f64 = 0.0001;
let cost_per_value = cost / value.max(MIN_VALUE);
candidate_costs.push(CandidateCost {
peer,
total_cost: cost,
cost_per_value,
});
.attributed_usage_rate(&AttributionSource::Peer(peer), resource_type)
{
const MIN_VALUE: f64 = 0.0001;
let cost_per_second = cost.per_second();
let cost_per_value = cost_per_second / value.max(MIN_VALUE);
candidate_costs.push(CandidateCost {
peer,
total_cost: cost_per_second,
cost_per_value,
});
} // Else, you might want to handle the case where cost is None
}
// sort candidate_costs by cost_per_value descending
candidate_costs
.sort_by(|a, b| b.cost_per_value.partial_cmp(&a.cost_per_value).unwrap());

// Sort candidate_costs by cost_per_value descending
candidate_costs.sort_by(|a, b| {
b.cost_per_value
.partial_cmp(&a.cost_per_value)
.unwrap_or(std::cmp::Ordering::Equal)
});

let mut to_delete = vec![];
let excess_usage = total_usage - total_limit;
Expand Down Expand Up @@ -128,8 +199,8 @@ struct CandidateCost {
}

pub struct Limits {
pub max_upstream_bandwidth: Bandwidth,
pub max_downstream_bandwidth: Bandwidth,
pub max_upstream_bandwidth: BytesPerSecond,
pub max_downstream_bandwidth: BytesPerSecond,
pub max_cpu_usage: InstructionsPerSecond,
pub max_memory_usage: f64,
pub max_storage_usage: f64,
Expand All @@ -146,21 +217,21 @@ impl Limits {
}
}

#[derive(Debug, Clone, Copy)]
pub struct Bandwidth(f64);
impl Bandwidth {
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct BytesPerSecond(f64);
impl BytesPerSecond {
pub fn new(bytes_per_second: f64) -> Self {
Bandwidth(bytes_per_second)
BytesPerSecond(bytes_per_second)
}
}

impl From<Bandwidth> for f64 {
fn from(val: Bandwidth) -> Self {
impl From<BytesPerSecond> for f64 {
fn from(val: BytesPerSecond) -> Self {
val.0
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct InstructionsPerSecond(f64);
impl InstructionsPerSecond {
pub fn new(cpu_usage: f64) -> Self {
Expand Down Expand Up @@ -190,7 +261,7 @@ impl From<ByteCount> for f64 {

#[cfg(test)]
mod tests {
use crate::resource_manager::{Bandwidth, InstructionsPerSecond, Limits, ResourceManager};
use crate::resources::{BytesPerSecond, InstructionsPerSecond, Limits, ResourceManager};

use super::*;
use std::time::Instant;
Expand All @@ -199,13 +270,13 @@ mod tests {
fn test_resource_manager_report() {
// Create a ResourceManager with arbitrary limits
let limits = Limits {
max_upstream_bandwidth: Bandwidth::new(1000.0),
max_downstream_bandwidth: Bandwidth::new(1000.0),
max_upstream_bandwidth: BytesPerSecond::new(1000.0),
max_downstream_bandwidth: BytesPerSecond::new(1000.0),
max_cpu_usage: InstructionsPerSecond::new(1000.0),
max_memory_usage: 1000.0,
max_storage_usage: 1000.0,
};
let resource_manager = ResourceManager::new(limits);
let mut resource_manager = ResourceManager::new(limits);

// Report some usage and test that the total and attributed usage are updated
let attribution = AttributionSource::Peer(PeerKeyLocation::random());
Expand All @@ -219,13 +290,17 @@ mod tests {
assert_eq!(
resource_manager
.meter
.total_usage(ResourceType::InboundBandwidthBytes),
.resource_usage_rate(ResourceType::InboundBandwidthBytes)
.unwrap()
.per_second(),
100.0
);
assert_eq!(
resource_manager
.meter
.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes),
.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes)
.unwrap()
.per_second(),
100.0
);
}
Expand All @@ -234,13 +309,13 @@ mod tests {
fn test_resource_manager_should_delete_peers() {
// Create a ResourceManager with arbitrary limits
let limits = Limits {
max_upstream_bandwidth: Bandwidth::new(1000.0),
max_downstream_bandwidth: Bandwidth::new(1000.0),
max_upstream_bandwidth: BytesPerSecond::new(1000.0),
max_downstream_bandwidth: BytesPerSecond::new(1000.0),
max_cpu_usage: InstructionsPerSecond::new(1000.0),
max_memory_usage: 1000.0,
max_storage_usage: 1000.0,
};
let resource_manager = ResourceManager::new(limits);
let mut resource_manager = ResourceManager::new(limits);

// Report some usage
let peer1 = PeerKeyLocation::random();
Expand Down Expand Up @@ -288,9 +363,45 @@ mod tests {

let to_delete =
resource_manager.should_delete_peers(ResourceType::InboundBandwidthBytes, candidates);
assert!(to_delete.len() == 1);
assert_eq!(to_delete.len(), 1);

// Test that the peer with the highest usage is deleted
assert_eq!(to_delete[0], peer1);
}

#[test]
fn test_update_limits() {
let limits = Limits {
max_upstream_bandwidth: BytesPerSecond::new(1000.0),
max_downstream_bandwidth: BytesPerSecond::new(1000.0),
max_cpu_usage: InstructionsPerSecond::new(1000.0),
max_memory_usage: 1000.0,
max_storage_usage: 1000.0,
};
let mut resource_manager = ResourceManager::new(limits);

let new_limits = Limits {
max_upstream_bandwidth: BytesPerSecond::new(2000.0),
max_downstream_bandwidth: BytesPerSecond::new(2000.0),
max_cpu_usage: InstructionsPerSecond::new(2000.0),
max_memory_usage: 2000.0,
max_storage_usage: 2000.0,
};
resource_manager.update_limits(new_limits);

assert_eq!(
resource_manager.limits.max_upstream_bandwidth,
BytesPerSecond::new(2000.0)
);
assert_eq!(
resource_manager.limits.max_downstream_bandwidth,
BytesPerSecond::new(2000.0)
);
assert_eq!(
resource_manager.limits.max_cpu_usage,
InstructionsPerSecond::new(2000.0)
);
assert_eq!(resource_manager.limits.max_memory_usage, 2000.0);
assert_eq!(resource_manager.limits.max_storage_usage, 2000.0);
}
}
Loading
Loading