diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fa71d6a6d..665f70808 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,7 +4,7 @@ mod contract; mod message; mod node; mod operations; -mod resource_manager; +mod resources; mod ring; mod router; mod runtime; diff --git a/crates/core/src/resource_manager.rs b/crates/core/src/resources.rs similarity index 55% rename from crates/core/src/resource_manager.rs rename to crates/core/src/resources.rs index 566b4cd1b..e2913c88f 100644 --- a/crates/core/src/resource_manager.rs +++ b/crates/core/src/resources.rs @@ -1,15 +1,75 @@ -// FIXME: remove this -#![allow(dead_code)] -#![allow(unused)] +//! # Resource Management +//! +//! The resource management module is responsible for tracking resource usage, +//! ensuring that usage does not exceed specified limits, and ensure that those +//! resources are used to maximize the utility of the network. +//! +//! ## Resources +//! +//! The resource management module tracks usage of the following resources: +//! +//! * Upstream and downstream bandwidth +//! * CPU usage +//! +//! These resources will be tracked in the future: +//! +//! * Memory usage +//! * Storage +//! +//! ## Attribution +//! +//! When used this resource usage is attributed to either: +//! +//! * Remote +//! * A connected peer +//! * Local +//! * A local delegate +//! * The user interface +//! +//! ## Resource allocation for contract subscriptions +//! +//! When one or more peers are subscribed to a contract, the required +//! resources will be allocated as follows: +//! +//! * Upstream bandwidth is allocated to the subscribed peer to which +//! the data is sent +//! * Downstream bandwidth and CPU is split evenly between all subscribed +//! peers for that contract +//! +//! ## Resource limits +//! +//! Resource limits should be set to ensure that the peer does not disrupt the +//! user's experience of their computer. We should choose intelligent defaults +//! but the limits should be user-configurable. +//! +//! ## Code component overview +//! +//! The [ResourceManager] is responsible for tracking resource usage and identifying +//! which peers to remove if/when limits are exceeded. It does this by identifying +//! the peers with the highest usage of the limit-exceeding resource relative to +//! their usefulness. The usefulness of a peer is determined by the number of +//! requests sent to that peer over a certain period of time. +//! +//! A [Meter] is used by the ResourceManager to tracking resource usage over time. +//! Resource usage is reported to the meter, which tracks the usage over a sliding +//! window of time. The meter is responsible for calculating the rate of resource +//! usage along with which peers (or delegates, or UIs) are responsible for that +//! usage. +//! +//! ## Future Improvements +//! +//! * Track non-flow resources like memory and storage +//! * Responses to specific requests will contain information about the resources used +//! by downstream peers to fulfill the request, however how this information is used +//! will require careful consideration. mod meter; +pub mod rate; mod running_average; -use std::time::Instant; - -use crate::ring::PeerKeyLocation; - use self::meter::{AttributionSource, Meter, ResourceType}; +use crate::ring::PeerKeyLocation; +use std::time::Instant; pub(crate) struct ResourceManager { limits: Limits, @@ -19,7 +79,7 @@ pub(crate) struct ResourceManager { impl ResourceManager { pub fn new(limits: Limits) -> Self { ResourceManager { - meter: Meter::new(), + meter: Meter::new_with_window_size(100), limits, } } @@ -30,7 +90,7 @@ impl ResourceManager { /// Report the use of a resource. pub(crate) fn report( - &self, + &mut self, _time: Instant, attribution: &AttributionSource, resource: ResourceType, @@ -77,25 +137,36 @@ impl ResourceManager { where P: IntoIterator, { - let total_usage: f64 = self.meter.total_usage(resource_type); + let total_usage = match self.meter.resource_usage_rate(resource_type) { + Some(rate) => rate.per_second(), + None => return vec![], // Or handle the error as appropriate + }; + let total_limit: f64 = self.limits.get(resource_type); if total_usage > total_limit { let mut candidate_costs = vec![]; for PeerValue { peer, value } in candidates { - let cost = self + if let Some(cost) = self .meter - .attributed_usage(&AttributionSource::Peer(peer), resource_type); - const MIN_VALUE: f64 = 0.0001; - let cost_per_value = cost / value.max(MIN_VALUE); - candidate_costs.push(CandidateCost { - peer, - total_cost: cost, - cost_per_value, - }); + .attributed_usage_rate(&AttributionSource::Peer(peer), resource_type) + { + const MIN_VALUE: f64 = 0.0001; + let cost_per_second = cost.per_second(); + let cost_per_value = cost_per_second / value.max(MIN_VALUE); + candidate_costs.push(CandidateCost { + peer, + total_cost: cost_per_second, + cost_per_value, + }); + } // Else, you might want to handle the case where cost is None } - // sort candidate_costs by cost_per_value descending - candidate_costs - .sort_by(|a, b| b.cost_per_value.partial_cmp(&a.cost_per_value).unwrap()); + + // Sort candidate_costs by cost_per_value descending + candidate_costs.sort_by(|a, b| { + b.cost_per_value + .partial_cmp(&a.cost_per_value) + .unwrap_or(std::cmp::Ordering::Equal) + }); let mut to_delete = vec![]; let excess_usage = total_usage - total_limit; @@ -128,8 +199,8 @@ struct CandidateCost { } pub struct Limits { - pub max_upstream_bandwidth: Bandwidth, - pub max_downstream_bandwidth: Bandwidth, + pub max_upstream_bandwidth: BytesPerSecond, + pub max_downstream_bandwidth: BytesPerSecond, pub max_cpu_usage: InstructionsPerSecond, pub max_memory_usage: f64, pub max_storage_usage: f64, @@ -146,21 +217,21 @@ impl Limits { } } -#[derive(Debug, Clone, Copy)] -pub struct Bandwidth(f64); -impl Bandwidth { +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct BytesPerSecond(f64); +impl BytesPerSecond { pub fn new(bytes_per_second: f64) -> Self { - Bandwidth(bytes_per_second) + BytesPerSecond(bytes_per_second) } } -impl From for f64 { - fn from(val: Bandwidth) -> Self { +impl From for f64 { + fn from(val: BytesPerSecond) -> Self { val.0 } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub struct InstructionsPerSecond(f64); impl InstructionsPerSecond { pub fn new(cpu_usage: f64) -> Self { @@ -190,7 +261,7 @@ impl From for f64 { #[cfg(test)] mod tests { - use crate::resource_manager::{Bandwidth, InstructionsPerSecond, Limits, ResourceManager}; + use crate::resources::{BytesPerSecond, InstructionsPerSecond, Limits, ResourceManager}; use super::*; use std::time::Instant; @@ -199,13 +270,13 @@ mod tests { fn test_resource_manager_report() { // Create a ResourceManager with arbitrary limits let limits = Limits { - max_upstream_bandwidth: Bandwidth::new(1000.0), - max_downstream_bandwidth: Bandwidth::new(1000.0), + max_upstream_bandwidth: BytesPerSecond::new(1000.0), + max_downstream_bandwidth: BytesPerSecond::new(1000.0), max_cpu_usage: InstructionsPerSecond::new(1000.0), max_memory_usage: 1000.0, max_storage_usage: 1000.0, }; - let resource_manager = ResourceManager::new(limits); + let mut resource_manager = ResourceManager::new(limits); // Report some usage and test that the total and attributed usage are updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); @@ -219,13 +290,17 @@ mod tests { assert_eq!( resource_manager .meter - .total_usage(ResourceType::InboundBandwidthBytes), + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); assert_eq!( resource_manager .meter - .attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); } @@ -234,13 +309,13 @@ mod tests { fn test_resource_manager_should_delete_peers() { // Create a ResourceManager with arbitrary limits let limits = Limits { - max_upstream_bandwidth: Bandwidth::new(1000.0), - max_downstream_bandwidth: Bandwidth::new(1000.0), + max_upstream_bandwidth: BytesPerSecond::new(1000.0), + max_downstream_bandwidth: BytesPerSecond::new(1000.0), max_cpu_usage: InstructionsPerSecond::new(1000.0), max_memory_usage: 1000.0, max_storage_usage: 1000.0, }; - let resource_manager = ResourceManager::new(limits); + let mut resource_manager = ResourceManager::new(limits); // Report some usage let peer1 = PeerKeyLocation::random(); @@ -288,9 +363,45 @@ mod tests { let to_delete = resource_manager.should_delete_peers(ResourceType::InboundBandwidthBytes, candidates); - assert!(to_delete.len() == 1); + assert_eq!(to_delete.len(), 1); // Test that the peer with the highest usage is deleted assert_eq!(to_delete[0], peer1); } + + #[test] + fn test_update_limits() { + let limits = Limits { + max_upstream_bandwidth: BytesPerSecond::new(1000.0), + max_downstream_bandwidth: BytesPerSecond::new(1000.0), + max_cpu_usage: InstructionsPerSecond::new(1000.0), + max_memory_usage: 1000.0, + max_storage_usage: 1000.0, + }; + let mut resource_manager = ResourceManager::new(limits); + + let new_limits = Limits { + max_upstream_bandwidth: BytesPerSecond::new(2000.0), + max_downstream_bandwidth: BytesPerSecond::new(2000.0), + max_cpu_usage: InstructionsPerSecond::new(2000.0), + max_memory_usage: 2000.0, + max_storage_usage: 2000.0, + }; + resource_manager.update_limits(new_limits); + + assert_eq!( + resource_manager.limits.max_upstream_bandwidth, + BytesPerSecond::new(2000.0) + ); + assert_eq!( + resource_manager.limits.max_downstream_bandwidth, + BytesPerSecond::new(2000.0) + ); + assert_eq!( + resource_manager.limits.max_cpu_usage, + InstructionsPerSecond::new(2000.0) + ); + assert_eq!(resource_manager.limits.max_memory_usage, 2000.0); + assert_eq!(resource_manager.limits.max_storage_usage, 2000.0); + } } diff --git a/crates/core/src/resource_manager/meter.rs b/crates/core/src/resources/meter.rs similarity index 55% rename from crates/core/src/resource_manager/meter.rs rename to crates/core/src/resources/meter.rs index af6298765..27307011d 100644 --- a/crates/core/src/resource_manager/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -1,5 +1,8 @@ use std::hash::Hash; +use std::time::Duration; +use crate::resources::rate::Rate; +use crate::resources::{BytesPerSecond, InstructionsPerSecond}; use dashmap::DashMap; use freenet_stdlib::prelude::*; @@ -7,40 +10,41 @@ use crate::ring::PeerKeyLocation; use super::running_average::RunningAverage; -const RUNNING_AVERAGE_WINDOW_SIZE: usize = 20; - /// A structure that keeps track of the usage of dynamic resources which are consumed over time. -/// It provides methods to report and query resource usage, both total and attributed to specific sources. +/// It provides methods to report and query resource usage, both total and attributed to specific +/// sources. pub(super) struct Meter { totals_by_resource: ResourceTotals, attribution_meters: AttributionMeters, + running_average_window_size: usize, } impl Meter { /// Creates a new `Meter`. - pub fn new() -> Self { + pub fn new_with_window_size(running_average_window_size: usize) -> Self { Meter { totals_by_resource: ResourceTotals::new(), attribution_meters: AttributionMeters::new(), + running_average_window_size, } } - pub fn total_usage(&self, resource: ResourceType) -> f64 { + pub fn resource_usage_rate(&self, resource: ResourceType) -> Option { // Try to get a mutable reference to the Meter for the given resource match self.totals_by_resource.map.get_mut(&resource) { Some(meter) => { // Get the current measurement value - meter.per_second_measurement() + meter.get_rate() } - None => 0.0, // No meter found for the given resource + None => None, // No meter found for the given resource } } - pub(crate) fn attributed_usage( + pub(crate) fn attributed_usage_rate( &self, attribution: &AttributionSource, resource: ResourceType, - ) -> f64 { + ) -> Option { // Try to get a mutable reference to the AttributionMeters for the given attribution match self.attribution_meters.get_mut(attribution) { Some(attribution_meters) => { @@ -48,12 +52,29 @@ impl Meter { match attribution_meters.map.get_mut(&resource) { Some(meter) => { // Get the current measurement value - meter.per_second_measurement() + meter.get_rate() } - None => 0.0, // No meter found for the given resource + None => Some(Rate::new(0.0, Duration::from_secs(1))), // No meter found for the given resource } } - None => 0.0, // No AttributionMeters found for the given attribution + None => None, // No AttributionMeters found for the given attribution + } + } + + /// Report the use of a resource with multiple attribution sources, splitting the usage + /// evenly between the sources. + /// This should be done in the lowest-level functions that consume the resource, taking + /// an AttributionMeter as a parameter. This will be useful for contracts with multiple + /// subscribers - where the responsibility should be split evenly among the subscribers. + pub(crate) fn report_split( + &mut self, + attributions: &[AttributionSource], + resource: ResourceType, + value: f64, + ) { + let split_value = value / attributions.len() as f64; + for attribution in attributions { + self.report(attribution, resource, split_value); } } @@ -61,7 +82,7 @@ impl Meter { /// functions that consume the resource, taking an AttributionMeter /// as a parameter. pub(crate) fn report( - &self, + &mut self, attribution: &AttributionSource, resource: ResourceType, value: f64, @@ -71,7 +92,7 @@ impl Meter { .totals_by_resource .map .entry(resource) - .or_insert_with(|| RunningAverage::new(RUNNING_AVERAGE_WINDOW_SIZE)); + .or_insert_with(|| RunningAverage::new(self.running_average_window_size)); total_value.insert(value); // Report the usage for a specific attribution @@ -82,7 +103,7 @@ impl Meter { let mut resource_value = resource_map .map .entry(resource) - .or_insert_with(|| RunningAverage::new(RUNNING_AVERAGE_WINDOW_SIZE)); + .or_insert_with(|| RunningAverage::new(self.running_average_window_size)); resource_value.insert(value); } } @@ -90,7 +111,6 @@ impl Meter { #[derive(Eq, Hash, PartialEq, Clone, Debug)] pub(crate) enum AttributionSource { Peer(PeerKeyLocation), - RelayedContract(ContractInstanceId), Delegate(DelegateKey), } @@ -120,12 +140,13 @@ impl ResourceTotals { #[cfg(test)] mod tests { use crate::DynError; + use std::time::Duration; use super::*; #[test] fn test_meter() { - let meter = Meter::new(); + let meter = Meter::new_with_window_size(100); // Test that the new Meter has empty totals_by_resource and attribution_meters assert!(meter.totals_by_resource.map.is_empty()); @@ -134,99 +155,132 @@ mod tests { #[test] fn test_meter_total_usage() { - let meter = Meter::new(); + let mut meter = Meter::new_with_window_size(100); // Test that the total usage is 0.0 for all resources - assert_eq!(meter.total_usage(ResourceType::InboundBandwidthBytes), 0.0); - assert_eq!(meter.total_usage(ResourceType::OutboundBandwidthBytes), 0.0); - assert_eq!(meter.total_usage(ResourceType::CpuInstructions), 0.0); + assert!(meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .is_none()); + assert!(meter + .resource_usage_rate(ResourceType::OutboundBandwidthBytes) + .is_none()); + assert!(meter + .resource_usage_rate(ResourceType::CpuInstructions) + .is_none()); // Report some usage and test that the total usage is updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); } #[test] fn test_meter_attributed_usage() { - let meter = Meter::new(); + let mut meter = Meter::new_with_window_size(100); // Test that the attributed usage is 0.0 for all resources let attribution = AttributionSource::Peer(PeerKeyLocation::random()); - assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), - 0.0 - ); - assert_eq!( - meter.attributed_usage(&attribution, ResourceType::OutboundBandwidthBytes), - 0.0 - ); - assert_eq!( - meter.attributed_usage(&attribution, ResourceType::CpuInstructions), - 0.0 - ); + assert!(meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .is_none()); + assert!(meter + .attributed_usage_rate(&attribution, ResourceType::OutboundBandwidthBytes) + .is_none()); + assert!(meter + .attributed_usage_rate(&attribution, ResourceType::CpuInstructions) + .is_none()); // Report some usage and test that the attributed usage is updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); } #[test] fn test_meter_report() -> Result<(), DynError> { - let meter = Meter::new(); + let mut meter = Meter::new_with_window_size(100); // Report some usage and test that the total and attributed usage are updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); // Report more usage and test that the total and attributed usage are updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 200.0); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 300.0 ); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 300.0 ); // Report usage for a different resource and test that the total and attributed usage are updated meter.report(&attribution, ResourceType::CpuInstructions, 50.0); - assert_eq!(meter.total_usage(ResourceType::CpuInstructions), 50.0); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::CpuInstructions), + meter + .resource_usage_rate(ResourceType::CpuInstructions) + .unwrap() + .per_second(), + 50.0 + ); + assert_eq!( + meter + .attributed_usage_rate(&attribution, ResourceType::CpuInstructions) + .unwrap() + .per_second(), 50.0 ); - let bytes = crate::util::test::random_bytes_1kb(); - let mut gen = arbitrary::Unstructured::new(&bytes); // Report usage for a different attribution and test that the total and attributed usage are updated - let other_attribution = - AttributionSource::RelayedContract(gen.arbitrary::()?); + let other_attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report( &other_attribution, ResourceType::InboundBandwidthBytes, 150.0, ); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 450.0 ); assert_eq!( - meter.attributed_usage(&other_attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&other_attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 150.0 ); Ok(()) diff --git a/crates/core/src/resources/rate.rs b/crates/core/src/resources/rate.rs new file mode 100644 index 000000000..09cb201c8 --- /dev/null +++ b/crates/core/src/resources/rate.rs @@ -0,0 +1,29 @@ +use std::time::Duration; + +#[derive(Debug, PartialEq)] +pub struct Rate { + value: f64, +} + +impl Rate { + pub fn new(value: f64, divisor: Duration) -> Self { + Rate { + value: value / divisor.as_secs_f64(), + } + } + + pub fn per_second(&self) -> f64 { + self.value + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rate() { + let rate = Rate::new(100.0, Duration::from_secs(2)); + assert_eq!(rate.per_second(), 50.0); + } +} diff --git a/crates/core/src/resource_manager/running_average.rs b/crates/core/src/resources/running_average.rs similarity index 72% rename from crates/core/src/resource_manager/running_average.rs rename to crates/core/src/resources/running_average.rs index dc555d3aa..8bc247186 100644 --- a/crates/core/src/resource_manager/running_average.rs +++ b/crates/core/src/resources/running_average.rs @@ -1,3 +1,5 @@ +use crate::resources::rate::Rate; +use std::time::Duration; use std::{collections::VecDeque, time::Instant}; #[derive(Clone, Debug)] @@ -37,25 +39,19 @@ impl RunningAverage { } } - pub fn per_second_measurement(&self) -> f64 { - self.per_second_measurement_with_time(Instant::now()) + pub fn get_rate(&self) -> Option { + self.get_rate_at_time(Instant::now()) } - pub fn per_second_measurement_with_time(&self, now: Instant) -> f64 { + fn get_rate_at_time(&self, now: Instant) -> Option { if self.samples.is_empty() { - return 0.0; + return None; } - if self.samples.len() == 1 { - return self.samples[0].1; // or return a custom value - } - let oldest_sample = self.samples.front().unwrap().0; - let sample_duration = now - oldest_sample; - let sample_duration_secs = sample_duration.as_secs_f64(); - // Define a minimum time window (e.g., 1 second) - const MIN_TIME_WINDOW_SECS: f64 = 1.0; - // Use the maximum of the actual sample duration and the minimum time window as the divisor - let divisor = sample_duration_secs.max(MIN_TIME_WINDOW_SECS); - self.sum_samples / divisor + let oldest_sample_time = self.samples.front().unwrap().0; + let sample_duration = now - oldest_sample_time; + const MINIMUM_TIME_WINDOW: Duration = Duration::from_secs(1); + let divisor = sample_duration.max(MINIMUM_TIME_WINDOW); + Some(Rate::new(self.sum_samples, divisor)) } pub fn total_sample_count(&self) -> usize { @@ -100,12 +96,15 @@ mod tests { let now = Instant::now(); // Test with no samples - assert_eq!(running_avg.per_second_measurement_with_time(now), 0.0); + assert!(running_avg.get_rate_at_time(now).is_none()); // Test with one sample running_avg.insert_with_time(now, 2.0); assert_eq!( - running_avg.per_second_measurement_with_time(now + Duration::from_secs(1)), + running_avg + .get_rate_at_time(now + Duration::from_secs(1)) + .unwrap() + .per_second(), 2.0 ); @@ -113,14 +112,20 @@ mod tests { running_avg.insert_with_time(now + Duration::from_secs(1), 4.0); running_avg.insert_with_time(now + Duration::from_secs(2), 6.0); assert_eq!( - running_avg.per_second_measurement_with_time(now + Duration::from_secs(3)), + running_avg + .get_rate_at_time(now + Duration::from_secs(3)) + .unwrap() + .per_second(), 4.0 ); // Test with max_samples exceeded running_avg.insert_with_time(now + Duration::from_secs(3), 8.0); assert_eq!( - running_avg.per_second_measurement_with_time(now + Duration::from_secs(4)), + running_avg + .get_rate_at_time(now + Duration::from_secs(4)) + .unwrap() + .per_second(), 6.0 ); }