Skip to content

Commit

Permalink
Merge pull request #895 from freenet/184058031-contract-resource-usage
Browse files Browse the repository at this point in the history
Resource tracking and usage management
  • Loading branch information
sanity authored Nov 13, 2023
2 parents 7ea1067 + be751b4 commit 19c4322
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 110 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod contract;
mod message;
mod node;
mod operations;
mod resource_manager;
mod resources;
mod ring;
mod router;
mod runtime;
Expand Down
193 changes: 152 additions & 41 deletions crates/core/src/resource_manager.rs → crates/core/src/resources.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,75 @@
// FIXME: remove this
#![allow(dead_code)]
#![allow(unused)]
//! # Resource Management
//!
//! The resource management module is responsible for tracking resource usage,
//! ensuring that usage does not exceed specified limits, and ensure that those
//! resources are used to maximize the utility of the network.
//!
//! ## Resources
//!
//! The resource management module tracks usage of the following resources:
//!
//! * Upstream and downstream bandwidth
//! * CPU usage
//!
//! These resources will be tracked in the future:
//!
//! * Memory usage
//! * Storage
//!
//! ## Attribution
//!
//! When used this resource usage is attributed to either:
//!
//! * Remote
//! * A connected peer
//! * Local
//! * A local delegate
//! * The user interface
//!
//! ## Resource allocation for contract subscriptions
//!
//! When one or more peers are subscribed to a contract, the required
//! resources will be allocated as follows:
//!
//! * Upstream bandwidth is allocated to the subscribed peer to which
//! the data is sent
//! * Downstream bandwidth and CPU is split evenly between all subscribed
//! peers for that contract
//!
//! ## Resource limits
//!
//! Resource limits should be set to ensure that the peer does not disrupt the
//! user's experience of their computer. We should choose intelligent defaults
//! but the limits should be user-configurable.
//!
//! ## Code component overview
//!
//! The [ResourceManager] is responsible for tracking resource usage and identifying
//! which peers to remove if/when limits are exceeded. It does this by identifying
//! the peers with the highest usage of the limit-exceeding resource relative to
//! their usefulness. The usefulness of a peer is determined by the number of
//! requests sent to that peer over a certain period of time.
//!
//! A [Meter] is used by the ResourceManager to tracking resource usage over time.
//! Resource usage is reported to the meter, which tracks the usage over a sliding
//! window of time. The meter is responsible for calculating the rate of resource
//! usage along with which peers (or delegates, or UIs) are responsible for that
//! usage.
//!
//! ## Future Improvements
//!
//! * Track non-flow resources like memory and storage
//! * Responses to specific requests will contain information about the resources used
//! by downstream peers to fulfill the request, however how this information is used
//! will require careful consideration.
mod meter;
pub mod rate;
mod running_average;

use std::time::Instant;

use crate::ring::PeerKeyLocation;

use self::meter::{AttributionSource, Meter, ResourceType};
use crate::ring::PeerKeyLocation;
use std::time::Instant;

pub(crate) struct ResourceManager {
limits: Limits,
Expand All @@ -19,7 +79,7 @@ pub(crate) struct ResourceManager {
impl ResourceManager {
pub fn new(limits: Limits) -> Self {
ResourceManager {
meter: Meter::new(),
meter: Meter::new_with_window_size(100),
limits,
}
}
Expand All @@ -30,7 +90,7 @@ impl ResourceManager {

/// Report the use of a resource.
pub(crate) fn report(
&self,
&mut self,
_time: Instant,
attribution: &AttributionSource,
resource: ResourceType,
Expand Down Expand Up @@ -77,25 +137,36 @@ impl ResourceManager {
where
P: IntoIterator<Item = PeerValue>,
{
let total_usage: f64 = self.meter.total_usage(resource_type);
let total_usage = match self.meter.resource_usage_rate(resource_type) {
Some(rate) => rate.per_second(),
None => return vec![], // Or handle the error as appropriate
};

let total_limit: f64 = self.limits.get(resource_type);
if total_usage > total_limit {
let mut candidate_costs = vec![];
for PeerValue { peer, value } in candidates {
let cost = self
if let Some(cost) = self
.meter
.attributed_usage(&AttributionSource::Peer(peer), resource_type);
const MIN_VALUE: f64 = 0.0001;
let cost_per_value = cost / value.max(MIN_VALUE);
candidate_costs.push(CandidateCost {
peer,
total_cost: cost,
cost_per_value,
});
.attributed_usage_rate(&AttributionSource::Peer(peer), resource_type)
{
const MIN_VALUE: f64 = 0.0001;
let cost_per_second = cost.per_second();
let cost_per_value = cost_per_second / value.max(MIN_VALUE);
candidate_costs.push(CandidateCost {
peer,
total_cost: cost_per_second,
cost_per_value,
});
} // Else, you might want to handle the case where cost is None
}
// sort candidate_costs by cost_per_value descending
candidate_costs
.sort_by(|a, b| b.cost_per_value.partial_cmp(&a.cost_per_value).unwrap());

// Sort candidate_costs by cost_per_value descending
candidate_costs.sort_by(|a, b| {
b.cost_per_value
.partial_cmp(&a.cost_per_value)
.unwrap_or(std::cmp::Ordering::Equal)
});

let mut to_delete = vec![];
let excess_usage = total_usage - total_limit;
Expand Down Expand Up @@ -128,8 +199,8 @@ struct CandidateCost {
}

pub struct Limits {
pub max_upstream_bandwidth: Bandwidth,
pub max_downstream_bandwidth: Bandwidth,
pub max_upstream_bandwidth: BytesPerSecond,
pub max_downstream_bandwidth: BytesPerSecond,
pub max_cpu_usage: InstructionsPerSecond,
pub max_memory_usage: f64,
pub max_storage_usage: f64,
Expand All @@ -146,21 +217,21 @@ impl Limits {
}
}

#[derive(Debug, Clone, Copy)]
pub struct Bandwidth(f64);
impl Bandwidth {
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct BytesPerSecond(f64);
impl BytesPerSecond {
pub fn new(bytes_per_second: f64) -> Self {
Bandwidth(bytes_per_second)
BytesPerSecond(bytes_per_second)
}
}

impl From<Bandwidth> for f64 {
fn from(val: Bandwidth) -> Self {
impl From<BytesPerSecond> for f64 {
fn from(val: BytesPerSecond) -> Self {
val.0
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct InstructionsPerSecond(f64);
impl InstructionsPerSecond {
pub fn new(cpu_usage: f64) -> Self {
Expand Down Expand Up @@ -190,7 +261,7 @@ impl From<ByteCount> for f64 {

#[cfg(test)]
mod tests {
use crate::resource_manager::{Bandwidth, InstructionsPerSecond, Limits, ResourceManager};
use crate::resources::{BytesPerSecond, InstructionsPerSecond, Limits, ResourceManager};

use super::*;
use std::time::Instant;
Expand All @@ -199,13 +270,13 @@ mod tests {
fn test_resource_manager_report() {
// Create a ResourceManager with arbitrary limits
let limits = Limits {
max_upstream_bandwidth: Bandwidth::new(1000.0),
max_downstream_bandwidth: Bandwidth::new(1000.0),
max_upstream_bandwidth: BytesPerSecond::new(1000.0),
max_downstream_bandwidth: BytesPerSecond::new(1000.0),
max_cpu_usage: InstructionsPerSecond::new(1000.0),
max_memory_usage: 1000.0,
max_storage_usage: 1000.0,
};
let resource_manager = ResourceManager::new(limits);
let mut resource_manager = ResourceManager::new(limits);

// Report some usage and test that the total and attributed usage are updated
let attribution = AttributionSource::Peer(PeerKeyLocation::random());
Expand All @@ -219,13 +290,17 @@ mod tests {
assert_eq!(
resource_manager
.meter
.total_usage(ResourceType::InboundBandwidthBytes),
.resource_usage_rate(ResourceType::InboundBandwidthBytes)
.unwrap()
.per_second(),
100.0
);
assert_eq!(
resource_manager
.meter
.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes),
.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes)
.unwrap()
.per_second(),
100.0
);
}
Expand All @@ -234,13 +309,13 @@ mod tests {
fn test_resource_manager_should_delete_peers() {
// Create a ResourceManager with arbitrary limits
let limits = Limits {
max_upstream_bandwidth: Bandwidth::new(1000.0),
max_downstream_bandwidth: Bandwidth::new(1000.0),
max_upstream_bandwidth: BytesPerSecond::new(1000.0),
max_downstream_bandwidth: BytesPerSecond::new(1000.0),
max_cpu_usage: InstructionsPerSecond::new(1000.0),
max_memory_usage: 1000.0,
max_storage_usage: 1000.0,
};
let resource_manager = ResourceManager::new(limits);
let mut resource_manager = ResourceManager::new(limits);

// Report some usage
let peer1 = PeerKeyLocation::random();
Expand Down Expand Up @@ -288,9 +363,45 @@ mod tests {

let to_delete =
resource_manager.should_delete_peers(ResourceType::InboundBandwidthBytes, candidates);
assert!(to_delete.len() == 1);
assert_eq!(to_delete.len(), 1);

// Test that the peer with the highest usage is deleted
assert_eq!(to_delete[0], peer1);
}

#[test]
fn test_update_limits() {
let limits = Limits {
max_upstream_bandwidth: BytesPerSecond::new(1000.0),
max_downstream_bandwidth: BytesPerSecond::new(1000.0),
max_cpu_usage: InstructionsPerSecond::new(1000.0),
max_memory_usage: 1000.0,
max_storage_usage: 1000.0,
};
let mut resource_manager = ResourceManager::new(limits);

let new_limits = Limits {
max_upstream_bandwidth: BytesPerSecond::new(2000.0),
max_downstream_bandwidth: BytesPerSecond::new(2000.0),
max_cpu_usage: InstructionsPerSecond::new(2000.0),
max_memory_usage: 2000.0,
max_storage_usage: 2000.0,
};
resource_manager.update_limits(new_limits);

assert_eq!(
resource_manager.limits.max_upstream_bandwidth,
BytesPerSecond::new(2000.0)
);
assert_eq!(
resource_manager.limits.max_downstream_bandwidth,
BytesPerSecond::new(2000.0)
);
assert_eq!(
resource_manager.limits.max_cpu_usage,
InstructionsPerSecond::new(2000.0)
);
assert_eq!(resource_manager.limits.max_memory_usage, 2000.0);
assert_eq!(resource_manager.limits.max_storage_usage, 2000.0);
}
}
Loading

0 comments on commit 19c4322

Please sign in to comment.