From a40035b183261d86a31a4c4ad6fb94e854ce8181 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 8 Nov 2023 20:00:28 -0600 Subject: [PATCH 01/12] create branch --- crates/core/src/resource_manager/meter.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/resource_manager/meter.rs b/crates/core/src/resource_manager/meter.rs index af6298765..ac2397ff4 100644 --- a/crates/core/src/resource_manager/meter.rs +++ b/crates/core/src/resource_manager/meter.rs @@ -10,7 +10,8 @@ use super::running_average::RunningAverage; const RUNNING_AVERAGE_WINDOW_SIZE: usize = 20; /// A structure that keeps track of the usage of dynamic resources which are consumed over time. -/// It provides methods to report and query resource usage, both total and attributed to specific sources. +/// It provides methods to report and query resource usage, both total and attributed to specific +/// sources. pub(super) struct Meter { totals_by_resource: ResourceTotals, attribution_meters: AttributionMeters, From dd4b7ebba7c746e2a4f26cef73daec5e503e95cd Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 9 Nov 2023 09:13:09 -0600 Subject: [PATCH 02/12] refactor module --- crates/core/src/lib.rs | 2 +- crates/core/src/{resource_manager.rs => resources.rs} | 2 +- crates/core/src/{resource_manager => resources}/meter.rs | 0 .../core/src/{resource_manager => resources}/running_average.rs | 0 4 files changed, 2 insertions(+), 2 deletions(-) rename crates/core/src/{resource_manager.rs => resources.rs} (98%) rename crates/core/src/{resource_manager => resources}/meter.rs (100%) rename crates/core/src/{resource_manager => resources}/running_average.rs (100%) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fa71d6a6d..665f70808 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,7 +4,7 @@ mod contract; mod message; mod node; mod operations; -mod resource_manager; +mod resources; mod ring; mod router; mod runtime; diff --git a/crates/core/src/resource_manager.rs b/crates/core/src/resources.rs similarity index 98% rename from crates/core/src/resource_manager.rs rename to crates/core/src/resources.rs index 566b4cd1b..e7a138c7d 100644 --- a/crates/core/src/resource_manager.rs +++ b/crates/core/src/resources.rs @@ -190,7 +190,7 @@ impl From for f64 { #[cfg(test)] mod tests { - use crate::resource_manager::{Bandwidth, InstructionsPerSecond, Limits, ResourceManager}; + use crate::resources::{Bandwidth, InstructionsPerSecond, Limits, ResourceManager}; use super::*; use std::time::Instant; diff --git a/crates/core/src/resource_manager/meter.rs b/crates/core/src/resources/meter.rs similarity index 100% rename from crates/core/src/resource_manager/meter.rs rename to crates/core/src/resources/meter.rs diff --git a/crates/core/src/resource_manager/running_average.rs b/crates/core/src/resources/running_average.rs similarity index 100% rename from crates/core/src/resource_manager/running_average.rs rename to crates/core/src/resources/running_average.rs From 83fdcae8f9742cd32ffb798a56689b8b04e98254 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 10 Nov 2023 10:26:43 -0600 Subject: [PATCH 03/12] wip --- crates/core/src/resource_manager.rs | 9 +-- crates/core/src/resource_manager/meter.rs | 72 ++++++++++++------- crates/core/src/resource_manager/rate.rs | 18 +++++ .../src/resource_manager/running_average.rs | 39 +++++----- 4 files changed, 86 insertions(+), 52 deletions(-) create mode 100644 crates/core/src/resource_manager/rate.rs diff --git a/crates/core/src/resource_manager.rs b/crates/core/src/resource_manager.rs index 566b4cd1b..05f1224b9 100644 --- a/crates/core/src/resource_manager.rs +++ b/crates/core/src/resource_manager.rs @@ -4,6 +4,7 @@ mod meter; mod running_average; +pub mod rate; use std::time::Instant; @@ -77,14 +78,14 @@ impl ResourceManager { where P: IntoIterator, { - let total_usage: f64 = self.meter.total_usage(resource_type); + let total_usage: f64 = self.meter.resource_usage_rate(resource_type); let total_limit: f64 = self.limits.get(resource_type); if total_usage > total_limit { let mut candidate_costs = vec![]; for PeerValue { peer, value } in candidates { let cost = self .meter - .attributed_usage(&AttributionSource::Peer(peer), resource_type); + .attributed_usage_rate(&AttributionSource::Peer(peer), resource_type); const MIN_VALUE: f64 = 0.0001; let cost_per_value = cost / value.max(MIN_VALUE); candidate_costs.push(CandidateCost { @@ -219,13 +220,13 @@ mod tests { assert_eq!( resource_manager .meter - .total_usage(ResourceType::InboundBandwidthBytes), + .resource_usage_rate(ResourceType::InboundBandwidthBytes), 100.0 ); assert_eq!( resource_manager .meter - .attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), 100.0 ); } diff --git a/crates/core/src/resource_manager/meter.rs b/crates/core/src/resource_manager/meter.rs index ac2397ff4..f1dc7bcd2 100644 --- a/crates/core/src/resource_manager/meter.rs +++ b/crates/core/src/resource_manager/meter.rs @@ -2,12 +2,12 @@ use std::hash::Hash; use dashmap::DashMap; use freenet_stdlib::prelude::*; +use crate::resource_manager::rate::Rate; use crate::ring::PeerKeyLocation; use super::running_average::RunningAverage; -const RUNNING_AVERAGE_WINDOW_SIZE: usize = 20; /// A structure that keeps track of the usage of dynamic resources which are consumed over time. /// It provides methods to report and query resource usage, both total and attributed to specific @@ -15,33 +15,35 @@ const RUNNING_AVERAGE_WINDOW_SIZE: usize = 20; pub(super) struct Meter { totals_by_resource: ResourceTotals, attribution_meters: AttributionMeters, + running_average_window_size: usize, } impl Meter { /// Creates a new `Meter`. - pub fn new() -> Self { + pub fn new_with_window_size(running_average_window_size : usize) -> Self { Meter { totals_by_resource: ResourceTotals::new(), attribution_meters: AttributionMeters::new(), + running_average_window_size, } } - pub fn total_usage(&self, resource: ResourceType) -> f64 { + pub fn resource_usage_rate(&self, resource: ResourceType) -> Option { // Try to get a mutable reference to the Meter for the given resource match self.totals_by_resource.map.get_mut(&resource) { Some(meter) => { // Get the current measurement value - meter.per_second_measurement() + meter.get_rate() } - None => 0.0, // No meter found for the given resource + None => None, // No meter found for the given resource } } - pub(crate) fn attributed_usage( + pub(crate) fn attributed_usage_rate( &self, attribution: &AttributionSource, resource: ResourceType, - ) -> f64 { + ) -> Option { // Try to get a mutable reference to the AttributionMeters for the given attribution match self.attribution_meters.get_mut(attribution) { Some(attribution_meters) => { @@ -49,12 +51,28 @@ impl Meter { match attribution_meters.map.get_mut(&resource) { Some(meter) => { // Get the current measurement value - meter.per_second_measurement() + meter.get_rate() } None => 0.0, // No meter found for the given resource } } - None => 0.0, // No AttributionMeters found for the given attribution + None => None, // No AttributionMeters found for the given attribution + } + } + + /// Report the use of a resource with multiple attribution sources, splitting the usage + /// evenly between the sources. + /// This should be done in the lowest-level functions that consume the resource, taking + /// an AttributionMeter as a parameter. + pub(crate) fn report_split( + &self, + attributions: &[AttributionSource], + resource: ResourceType, + value: f64, + ) { + let split_value = value / attributions.len() as f64; + for attribution in attributions { + self.report(attribution, resource, split_value); } } @@ -72,7 +90,7 @@ impl Meter { .totals_by_resource .map .entry(resource) - .or_insert_with(|| RunningAverage::new(RUNNING_AVERAGE_WINDOW_SIZE)); + .or_insert_with(|| RunningAverage::new(self.running_average_window_size)); total_value.insert(value); // Report the usage for a specific attribution @@ -83,7 +101,7 @@ impl Meter { let mut resource_value = resource_map .map .entry(resource) - .or_insert_with(|| RunningAverage::new(RUNNING_AVERAGE_WINDOW_SIZE)); + .or_insert_with(|| RunningAverage::new(self.running_average_window_size)); resource_value.insert(value); } } @@ -138,15 +156,15 @@ mod tests { let meter = Meter::new(); // Test that the total usage is 0.0 for all resources - assert_eq!(meter.total_usage(ResourceType::InboundBandwidthBytes), 0.0); - assert_eq!(meter.total_usage(ResourceType::OutboundBandwidthBytes), 0.0); - assert_eq!(meter.total_usage(ResourceType::CpuInstructions), 0.0); + assert_eq!(meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), 0.0); + assert_eq!(meter.resource_usage_rate(ResourceType::OutboundBandwidthBytes), 0.0); + assert_eq!(meter.resource_usage_rate(ResourceType::CpuInstructions), 0.0); // Report some usage and test that the total usage is updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), 100.0 ); } @@ -158,22 +176,22 @@ mod tests { // Test that the attributed usage is 0.0 for all resources let attribution = AttributionSource::Peer(PeerKeyLocation::random()); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), 0.0 ); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::OutboundBandwidthBytes), + meter.attributed_usage_rate(&attribution, ResourceType::OutboundBandwidthBytes), 0.0 ); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::CpuInstructions), + meter.attributed_usage_rate(&attribution, ResourceType::CpuInstructions), 0.0 ); // Report some usage and test that the attributed usage is updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), 100.0 ); } @@ -186,29 +204,29 @@ mod tests { let attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), 100.0 ); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), 100.0 ); // Report more usage and test that the total and attributed usage are updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 200.0); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), 300.0 ); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::InboundBandwidthBytes), + meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), 300.0 ); // Report usage for a different resource and test that the total and attributed usage are updated meter.report(&attribution, ResourceType::CpuInstructions, 50.0); - assert_eq!(meter.total_usage(ResourceType::CpuInstructions), 50.0); + assert_eq!(meter.resource_usage_rate(ResourceType::CpuInstructions), 50.0); assert_eq!( - meter.attributed_usage(&attribution, ResourceType::CpuInstructions), + meter.attributed_usage_rate(&attribution, ResourceType::CpuInstructions), 50.0 ); @@ -223,11 +241,11 @@ mod tests { 150.0, ); assert_eq!( - meter.total_usage(ResourceType::InboundBandwidthBytes), + meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), 450.0 ); assert_eq!( - meter.attributed_usage(&other_attribution, ResourceType::InboundBandwidthBytes), + meter.attributed_usage_rate(&other_attribution, ResourceType::InboundBandwidthBytes), 150.0 ); Ok(()) diff --git a/crates/core/src/resource_manager/rate.rs b/crates/core/src/resource_manager/rate.rs new file mode 100644 index 000000000..a1d715b14 --- /dev/null +++ b/crates/core/src/resource_manager/rate.rs @@ -0,0 +1,18 @@ +use std::time::Duration; + +pub struct Rate { + value: f64, +} + +impl Rate { + pub fn new(value: f64, divisor: Duration) -> Self { + Rate { + value: value / divisor.as_secs_f64(), + } + } + + pub fn per_second(&self) -> f64 { + self.value + } + +} diff --git a/crates/core/src/resource_manager/running_average.rs b/crates/core/src/resource_manager/running_average.rs index dc555d3aa..6805defcf 100644 --- a/crates/core/src/resource_manager/running_average.rs +++ b/crates/core/src/resource_manager/running_average.rs @@ -1,4 +1,6 @@ use std::{collections::VecDeque, time::Instant}; +use std::time::Duration; +use crate::resource_manager::rate::Rate; #[derive(Clone, Debug)] pub(super) struct RunningAverage { @@ -9,6 +11,7 @@ pub(super) struct RunningAverage { } impl RunningAverage { + pub fn new(max_samples: usize) -> Self { RunningAverage { max_samples, @@ -37,25 +40,19 @@ impl RunningAverage { } } - pub fn per_second_measurement(&self) -> f64 { - self.per_second_measurement_with_time(Instant::now()) + pub fn get_rate(&self) -> Option { + self.get_rate_at_time(Instant::now()) } - pub fn per_second_measurement_with_time(&self, now: Instant) -> f64 { + fn get_rate_at_time(&self, now: Instant) -> Option { if self.samples.is_empty() { - return 0.0; - } - if self.samples.len() == 1 { - return self.samples[0].1; // or return a custom value + return None; } - let oldest_sample = self.samples.front().unwrap().0; - let sample_duration = now - oldest_sample; - let sample_duration_secs = sample_duration.as_secs_f64(); - // Define a minimum time window (e.g., 1 second) - const MIN_TIME_WINDOW_SECS: f64 = 1.0; - // Use the maximum of the actual sample duration and the minimum time window as the divisor - let divisor = sample_duration_secs.max(MIN_TIME_WINDOW_SECS); - self.sum_samples / divisor + let oldest_sample_time = self.samples.front().unwrap().0; + let sample_duration = now - oldest_sample_time; + const MINIMUM_TIME_WINDOW: Duration = Duration::from_secs(1); + let divisor = sample_duration.max(MINIMUM_TIME_WINDOW); + Some(Rate::new(self.sum_samples, divisor)) } pub fn total_sample_count(&self) -> usize { @@ -72,7 +69,7 @@ mod tests { #[test] fn test_insert() { let max_samples = 3; - let mut running_avg = RunningAverage::new(max_samples); + let mut running_avg = RunningAverage::new_with_max_samples(max_samples); let now = Instant::now(); running_avg.insert_with_time(now, 2.0); @@ -96,16 +93,16 @@ mod tests { #[test] fn test_per_second_measurement() { let max_samples = 3; - let mut running_avg = RunningAverage::new(max_samples); + let mut running_avg = RunningAverage::new_with_max_samples(max_samples); let now = Instant::now(); // Test with no samples - assert_eq!(running_avg.per_second_measurement_with_time(now), 0.0); + assert_eq!(running_avg.get_rate_at_time(now), 0.0); // Test with one sample running_avg.insert_with_time(now, 2.0); assert_eq!( - running_avg.per_second_measurement_with_time(now + Duration::from_secs(1)), + running_avg.get_rate_at_time(now + Duration::from_secs(1)), 2.0 ); @@ -113,14 +110,14 @@ mod tests { running_avg.insert_with_time(now + Duration::from_secs(1), 4.0); running_avg.insert_with_time(now + Duration::from_secs(2), 6.0); assert_eq!( - running_avg.per_second_measurement_with_time(now + Duration::from_secs(3)), + running_avg.get_rate_at_time(now + Duration::from_secs(3)), 4.0 ); // Test with max_samples exceeded running_avg.insert_with_time(now + Duration::from_secs(3), 8.0); assert_eq!( - running_avg.per_second_measurement_with_time(now + Duration::from_secs(4)), + running_avg.get_rate_at_time(now + Duration::from_secs(4)), 6.0 ); } From f7defc3fc34a53c3519d2b1e8e47f92a7a131a88 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 10 Nov 2023 11:27:54 -0600 Subject: [PATCH 04/12] wip --- crates/core/src/resources/meter.rs | 10 +++++----- crates/core/src/resources/running_average.rs | 2 +- crates/core/src/runtime.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index f1dc7bcd2..a17aa570a 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -2,7 +2,7 @@ use std::hash::Hash; use dashmap::DashMap; use freenet_stdlib::prelude::*; -use crate::resource_manager::rate::Rate; +use crate::resources::rate::Rate; use crate::ring::PeerKeyLocation; @@ -63,7 +63,8 @@ impl Meter { /// Report the use of a resource with multiple attribution sources, splitting the usage /// evenly between the sources. /// This should be done in the lowest-level functions that consume the resource, taking - /// an AttributionMeter as a parameter. + /// an AttributionMeter as a parameter. This will be useful for contracts with multiple + /// subscribers - where the responsibility should be split evenly among the subscribers. pub(crate) fn report_split( &self, attributions: &[AttributionSource], @@ -109,7 +110,6 @@ impl Meter { #[derive(Eq, Hash, PartialEq, Clone, Debug)] pub(crate) enum AttributionSource { Peer(PeerKeyLocation), - RelayedContract(ContractInstanceId), Delegate(DelegateKey), } @@ -144,7 +144,7 @@ mod tests { #[test] fn test_meter() { - let meter = Meter::new(); + let meter = Meter::new_with_window_size(10); // Test that the new Meter has empty totals_by_resource and attribution_meters assert!(meter.totals_by_resource.map.is_empty()); @@ -234,7 +234,7 @@ mod tests { let mut gen = arbitrary::Unstructured::new(&bytes); // Report usage for a different attribution and test that the total and attributed usage are updated let other_attribution = - AttributionSource::RelayedContract(gen.arbitrary::()?); + AttributionSource::Peer(PeerKeyLocation::random()); meter.report( &other_attribution, ResourceType::InboundBandwidthBytes, diff --git a/crates/core/src/resources/running_average.rs b/crates/core/src/resources/running_average.rs index 6805defcf..44d1e87e3 100644 --- a/crates/core/src/resources/running_average.rs +++ b/crates/core/src/resources/running_average.rs @@ -1,6 +1,6 @@ use std::{collections::VecDeque, time::Instant}; use std::time::Duration; -use crate::resource_manager::rate::Rate; +use crate::resources::rate::Rate; #[derive(Clone, Debug)] pub(super) struct RunningAverage { diff --git a/crates/core/src/runtime.rs b/crates/core/src/runtime.rs index 609bb803d..ac0dba66a 100644 --- a/crates/core/src/runtime.rs +++ b/crates/core/src/runtime.rs @@ -13,7 +13,7 @@ mod wasm_runtime; pub(crate) use contract::ContractRuntimeInterface; pub use contract_store::ContractStore; -pub(crate) use delegate::{DelegateExecError, DelegateRuntimeInterface}; +pub(crate) use ::delegate::{DelegateExecError, DelegateRuntimeInterface}; pub use delegate_store::DelegateStore; pub(crate) use error::ContractError; pub(crate) use error::RuntimeResult; From 51777574632fb1378b014615ee5f7317248005f8 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 10 Nov 2023 11:30:58 -0600 Subject: [PATCH 05/12] undo inadvertant change --- crates/core/src/runtime.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/runtime.rs b/crates/core/src/runtime.rs index ac0dba66a..609bb803d 100644 --- a/crates/core/src/runtime.rs +++ b/crates/core/src/runtime.rs @@ -13,7 +13,7 @@ mod wasm_runtime; pub(crate) use contract::ContractRuntimeInterface; pub use contract_store::ContractStore; -pub(crate) use ::delegate::{DelegateExecError, DelegateRuntimeInterface}; +pub(crate) use delegate::{DelegateExecError, DelegateRuntimeInterface}; pub use delegate_store::DelegateStore; pub(crate) use error::ContractError; pub(crate) use error::RuntimeResult; From 093ef194742135b1ba6cd16f29db4d96813d8f2d Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 10 Nov 2023 12:06:13 -0600 Subject: [PATCH 06/12] tests work, formatting --- crates/core/src/resources.rs | 59 +++++----- crates/core/src/resources/meter.rs | 109 ++++++++++++++----- crates/core/src/resources/rate.rs | 11 ++ crates/core/src/resources/running_average.rs | 26 +++-- 4 files changed, 145 insertions(+), 60 deletions(-) diff --git a/crates/core/src/resources.rs b/crates/core/src/resources.rs index cc954e716..829308ac3 100644 --- a/crates/core/src/resources.rs +++ b/crates/core/src/resources.rs @@ -1,16 +1,10 @@ -// FIXME: remove this -#![allow(dead_code)] -#![allow(unused)] - mod meter; -mod running_average; pub mod rate; - -use std::time::Instant; - -use crate::ring::PeerKeyLocation; +mod running_average; use self::meter::{AttributionSource, Meter, ResourceType}; +use crate::ring::PeerKeyLocation; +use std::time::Instant; pub(crate) struct ResourceManager { limits: Limits, @@ -20,7 +14,7 @@ pub(crate) struct ResourceManager { impl ResourceManager { pub fn new(limits: Limits) -> Self { ResourceManager { - meter: Meter::new(), + meter: Meter::new_with_window_size(100), limits, } } @@ -78,25 +72,36 @@ impl ResourceManager { where P: IntoIterator, { - let total_usage: f64 = self.meter.resource_usage_rate(resource_type); + let total_usage = match self.meter.resource_usage_rate(resource_type) { + Some(rate) => rate.per_second(), + None => return vec![], // Or handle the error as appropriate + }; + let total_limit: f64 = self.limits.get(resource_type); if total_usage > total_limit { let mut candidate_costs = vec![]; for PeerValue { peer, value } in candidates { - let cost = self + if let Some(cost) = self .meter - .attributed_usage_rate(&AttributionSource::Peer(peer), resource_type); - const MIN_VALUE: f64 = 0.0001; - let cost_per_value = cost / value.max(MIN_VALUE); - candidate_costs.push(CandidateCost { - peer, - total_cost: cost, - cost_per_value, - }); + .attributed_usage_rate(&AttributionSource::Peer(peer), resource_type) + { + const MIN_VALUE: f64 = 0.0001; + let cost_per_second = cost.per_second(); + let cost_per_value = cost_per_second / value.max(MIN_VALUE); + candidate_costs.push(CandidateCost { + peer, + total_cost: cost_per_second, + cost_per_value, + }); + } // Else, you might want to handle the case where cost is None } - // sort candidate_costs by cost_per_value descending - candidate_costs - .sort_by(|a, b| b.cost_per_value.partial_cmp(&a.cost_per_value).unwrap()); + + // Sort candidate_costs by cost_per_value descending + candidate_costs.sort_by(|a, b| { + b.cost_per_value + .partial_cmp(&a.cost_per_value) + .unwrap_or(std::cmp::Ordering::Equal) + }); let mut to_delete = vec![]; let excess_usage = total_usage - total_limit; @@ -220,13 +225,17 @@ mod tests { assert_eq!( resource_manager .meter - .resource_usage_rate(ResourceType::InboundBandwidthBytes), + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); assert_eq!( resource_manager .meter - .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); } diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index a17aa570a..ba0652fc7 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -1,14 +1,14 @@ use std::hash::Hash; +use std::time::Duration; +use crate::resources::rate::Rate; use dashmap::DashMap; use freenet_stdlib::prelude::*; -use crate::resources::rate::Rate; use crate::ring::PeerKeyLocation; use super::running_average::RunningAverage; - /// A structure that keeps track of the usage of dynamic resources which are consumed over time. /// It provides methods to report and query resource usage, both total and attributed to specific /// sources. @@ -20,7 +20,7 @@ pub(super) struct Meter { impl Meter { /// Creates a new `Meter`. - pub fn new_with_window_size(running_average_window_size : usize) -> Self { + pub fn new_with_window_size(running_average_window_size: usize) -> Self { Meter { totals_by_resource: ResourceTotals::new(), attribution_meters: AttributionMeters::new(), @@ -53,7 +53,7 @@ impl Meter { // Get the current measurement value meter.get_rate() } - None => 0.0, // No meter found for the given resource + None => Some(Rate::new(0.0, Duration::from_secs(1))), // No meter found for the given resource } } None => None, // No AttributionMeters found for the given attribution @@ -139,12 +139,13 @@ impl ResourceTotals { #[cfg(test)] mod tests { use crate::DynError; + use std::time::Duration; use super::*; #[test] fn test_meter() { - let meter = Meter::new_with_window_size(10); + let meter = Meter::new_with_window_size(100); // Test that the new Meter has empty totals_by_resource and attribution_meters assert!(meter.totals_by_resource.map.is_empty()); @@ -153,99 +154,155 @@ mod tests { #[test] fn test_meter_total_usage() { - let meter = Meter::new(); + let meter = Meter::new_with_window_size(100); // Test that the total usage is 0.0 for all resources - assert_eq!(meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), 0.0); - assert_eq!(meter.resource_usage_rate(ResourceType::OutboundBandwidthBytes), 0.0); - assert_eq!(meter.resource_usage_rate(ResourceType::CpuInstructions), 0.0); + assert_eq!( + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap(), + Rate::new(0.0, Duration::from_secs(1)) + ); + assert_eq!( + meter + .resource_usage_rate(ResourceType::OutboundBandwidthBytes) + .unwrap(), + Rate::new(0.0, Duration::from_secs(1)) + ); + assert_eq!( + meter + .resource_usage_rate(ResourceType::CpuInstructions) + .unwrap(), + Rate::new(0.0, Duration::from_secs(1)) + ); // Report some usage and test that the total usage is updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); } #[test] fn test_meter_attributed_usage() { - let meter = Meter::new(); + let meter = Meter::new_with_window_size(100); // Test that the attributed usage is 0.0 for all resources let attribution = AttributionSource::Peer(PeerKeyLocation::random()); assert_eq!( - meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 0.0 ); assert_eq!( - meter.attributed_usage_rate(&attribution, ResourceType::OutboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::OutboundBandwidthBytes) + .unwrap() + .per_second(), 0.0 ); assert_eq!( - meter.attributed_usage_rate(&attribution, ResourceType::CpuInstructions), + meter + .attributed_usage_rate(&attribution, ResourceType::CpuInstructions) + .unwrap() + .per_second(), 0.0 ); // Report some usage and test that the attributed usage is updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); } #[test] fn test_meter_report() -> Result<(), DynError> { - let meter = Meter::new(); + let meter = Meter::new_with_window_size(100); // Report some usage and test that the total and attributed usage are updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); assert_eq!( - meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); assert_eq!( - meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 100.0 ); // Report more usage and test that the total and attributed usage are updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 200.0); assert_eq!( - meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 300.0 ); assert_eq!( - meter.attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 300.0 ); // Report usage for a different resource and test that the total and attributed usage are updated meter.report(&attribution, ResourceType::CpuInstructions, 50.0); - assert_eq!(meter.resource_usage_rate(ResourceType::CpuInstructions), 50.0); assert_eq!( - meter.attributed_usage_rate(&attribution, ResourceType::CpuInstructions), + meter + .resource_usage_rate(ResourceType::CpuInstructions) + .unwrap() + .per_second(), + 50.0 + ); + assert_eq!( + meter + .attributed_usage_rate(&attribution, ResourceType::CpuInstructions) + .unwrap() + .per_second(), 50.0 ); let bytes = crate::util::test::random_bytes_1kb(); let mut gen = arbitrary::Unstructured::new(&bytes); // Report usage for a different attribution and test that the total and attributed usage are updated - let other_attribution = - AttributionSource::Peer(PeerKeyLocation::random()); + let other_attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report( &other_attribution, ResourceType::InboundBandwidthBytes, 150.0, ); assert_eq!( - meter.resource_usage_rate(ResourceType::InboundBandwidthBytes), + meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 450.0 ); assert_eq!( - meter.attributed_usage_rate(&other_attribution, ResourceType::InboundBandwidthBytes), + meter + .attributed_usage_rate(&other_attribution, ResourceType::InboundBandwidthBytes) + .unwrap() + .per_second(), 150.0 ); Ok(()) diff --git a/crates/core/src/resources/rate.rs b/crates/core/src/resources/rate.rs index a1d715b14..09cb201c8 100644 --- a/crates/core/src/resources/rate.rs +++ b/crates/core/src/resources/rate.rs @@ -1,5 +1,6 @@ use std::time::Duration; +#[derive(Debug, PartialEq)] pub struct Rate { value: f64, } @@ -14,5 +15,15 @@ impl Rate { pub fn per_second(&self) -> f64 { self.value } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_rate() { + let rate = Rate::new(100.0, Duration::from_secs(2)); + assert_eq!(rate.per_second(), 50.0); + } } diff --git a/crates/core/src/resources/running_average.rs b/crates/core/src/resources/running_average.rs index 44d1e87e3..8bc247186 100644 --- a/crates/core/src/resources/running_average.rs +++ b/crates/core/src/resources/running_average.rs @@ -1,6 +1,6 @@ -use std::{collections::VecDeque, time::Instant}; -use std::time::Duration; use crate::resources::rate::Rate; +use std::time::Duration; +use std::{collections::VecDeque, time::Instant}; #[derive(Clone, Debug)] pub(super) struct RunningAverage { @@ -11,7 +11,6 @@ pub(super) struct RunningAverage { } impl RunningAverage { - pub fn new(max_samples: usize) -> Self { RunningAverage { max_samples, @@ -69,7 +68,7 @@ mod tests { #[test] fn test_insert() { let max_samples = 3; - let mut running_avg = RunningAverage::new_with_max_samples(max_samples); + let mut running_avg = RunningAverage::new(max_samples); let now = Instant::now(); running_avg.insert_with_time(now, 2.0); @@ -93,16 +92,19 @@ mod tests { #[test] fn test_per_second_measurement() { let max_samples = 3; - let mut running_avg = RunningAverage::new_with_max_samples(max_samples); + let mut running_avg = RunningAverage::new(max_samples); let now = Instant::now(); // Test with no samples - assert_eq!(running_avg.get_rate_at_time(now), 0.0); + assert!(running_avg.get_rate_at_time(now).is_none()); // Test with one sample running_avg.insert_with_time(now, 2.0); assert_eq!( - running_avg.get_rate_at_time(now + Duration::from_secs(1)), + running_avg + .get_rate_at_time(now + Duration::from_secs(1)) + .unwrap() + .per_second(), 2.0 ); @@ -110,14 +112,20 @@ mod tests { running_avg.insert_with_time(now + Duration::from_secs(1), 4.0); running_avg.insert_with_time(now + Duration::from_secs(2), 6.0); assert_eq!( - running_avg.get_rate_at_time(now + Duration::from_secs(3)), + running_avg + .get_rate_at_time(now + Duration::from_secs(3)) + .unwrap() + .per_second(), 4.0 ); // Test with max_samples exceeded running_avg.insert_with_time(now + Duration::from_secs(3), 8.0); assert_eq!( - running_avg.get_rate_at_time(now + Duration::from_secs(4)), + running_avg + .get_rate_at_time(now + Duration::from_secs(4)) + .unwrap() + .per_second(), 6.0 ); } From 3b51096e310327bd92fb4542a5fd13a24b0eb0fe Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 10 Nov 2023 12:11:55 -0600 Subject: [PATCH 07/12] clean up --- crates/core/src/resources.rs | 6 +++--- crates/core/src/resources/meter.rs | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/core/src/resources.rs b/crates/core/src/resources.rs index 829308ac3..18bd940fc 100644 --- a/crates/core/src/resources.rs +++ b/crates/core/src/resources.rs @@ -25,7 +25,7 @@ impl ResourceManager { /// Report the use of a resource. pub(crate) fn report( - &self, + &mut self, _time: Instant, attribution: &AttributionSource, resource: ResourceType, @@ -211,7 +211,7 @@ mod tests { max_memory_usage: 1000.0, max_storage_usage: 1000.0, }; - let resource_manager = ResourceManager::new(limits); + let mut resource_manager = ResourceManager::new(limits); // Report some usage and test that the total and attributed usage are updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); @@ -250,7 +250,7 @@ mod tests { max_memory_usage: 1000.0, max_storage_usage: 1000.0, }; - let resource_manager = ResourceManager::new(limits); + let mut resource_manager = ResourceManager::new(limits); // Report some usage let peer1 = PeerKeyLocation::random(); diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index ba0652fc7..247d53fca 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -66,7 +66,7 @@ impl Meter { /// an AttributionMeter as a parameter. This will be useful for contracts with multiple /// subscribers - where the responsibility should be split evenly among the subscribers. pub(crate) fn report_split( - &self, + &mut self, attributions: &[AttributionSource], resource: ResourceType, value: f64, @@ -81,7 +81,7 @@ impl Meter { /// functions that consume the resource, taking an AttributionMeter /// as a parameter. pub(crate) fn report( - &self, + &mut self, attribution: &AttributionSource, resource: ResourceType, value: f64, @@ -154,7 +154,7 @@ mod tests { #[test] fn test_meter_total_usage() { - let meter = Meter::new_with_window_size(100); + let mut meter = Meter::new_with_window_size(100); // Test that the total usage is 0.0 for all resources assert_eq!( @@ -190,7 +190,7 @@ mod tests { #[test] fn test_meter_attributed_usage() { - let meter = Meter::new_with_window_size(100); + let mut meter = Meter::new_with_window_size(100); // Test that the attributed usage is 0.0 for all resources let attribution = AttributionSource::Peer(PeerKeyLocation::random()); @@ -229,7 +229,7 @@ mod tests { #[test] fn test_meter_report() -> Result<(), DynError> { - let meter = Meter::new_with_window_size(100); + let mut meter = Meter::new_with_window_size(100); // Report some usage and test that the total and attributed usage are updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); @@ -283,7 +283,7 @@ mod tests { ); let bytes = crate::util::test::random_bytes_1kb(); - let mut gen = arbitrary::Unstructured::new(&bytes); + let gen = arbitrary::Unstructured::new(&bytes); // Report usage for a different attribution and test that the total and attributed usage are updated let other_attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report( From 6bb45c0631d5799c21ed577dfac26636d6039d91 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 10 Nov 2023 16:24:24 -0600 Subject: [PATCH 08/12] wip --- crates/core/src/resources.rs | 40 ++++++++++++++++++++++++++++-- crates/core/src/resources/meter.rs | 11 ++++++-- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/crates/core/src/resources.rs b/crates/core/src/resources.rs index 18bd940fc..af45c54a0 100644 --- a/crates/core/src/resources.rs +++ b/crates/core/src/resources.rs @@ -152,7 +152,7 @@ impl Limits { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub struct Bandwidth(f64); impl Bandwidth { pub fn new(bytes_per_second: f64) -> Self { @@ -166,7 +166,7 @@ impl From for f64 { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub struct InstructionsPerSecond(f64); impl InstructionsPerSecond { pub fn new(cpu_usage: f64) -> Self { @@ -303,4 +303,40 @@ mod tests { // Test that the peer with the highest usage is deleted assert_eq!(to_delete[0], peer1); } + + #[test] + fn test_update_limits() { + let limits = Limits { + max_upstream_bandwidth: Bandwidth::new(1000.0), + max_downstream_bandwidth: Bandwidth::new(1000.0), + max_cpu_usage: InstructionsPerSecond::new(1000.0), + max_memory_usage: 1000.0, + max_storage_usage: 1000.0, + }; + let mut resource_manager = ResourceManager::new(limits); + + let new_limits = Limits { + max_upstream_bandwidth: Bandwidth::new(2000.0), + max_downstream_bandwidth: Bandwidth::new(2000.0), + max_cpu_usage: InstructionsPerSecond::new(2000.0), + max_memory_usage: 2000.0, + max_storage_usage: 2000.0, + }; + resource_manager.update_limits(new_limits); + + assert_eq!( + resource_manager.limits.max_upstream_bandwidth, + Bandwidth::new(2000.0) + ); + assert_eq!( + resource_manager.limits.max_downstream_bandwidth, + Bandwidth::new(2000.0) + ); + assert_eq!( + resource_manager.limits.max_cpu_usage, + InstructionsPerSecond::new(2000.0) + ); + assert_eq!(resource_manager.limits.max_memory_usage, 2000.0); + assert_eq!(resource_manager.limits.max_storage_usage, 2000.0); + } } diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index 247d53fca..4f44eb3a7 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -4,6 +4,7 @@ use std::time::Duration; use crate::resources::rate::Rate; use dashmap::DashMap; use freenet_stdlib::prelude::*; +use crate::resources::Bandwidth; use crate::ring::PeerKeyLocation; @@ -77,6 +78,14 @@ impl Meter { } } + pub(crate) fn report_inbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: Bandwidth) { + self.report(attribution, ResourceType::InboundBandwidthBytes, bandwidth.into()); + } + + pub(crate) fn report_outbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: Bandwidth) { + self.report(attribution, ResourceType::OutboundBandwidthBytes, bandwidth.into()); + } + /// Report the use of a resource. This should be done in the lowest-level /// functions that consume the resource, taking an AttributionMeter /// as a parameter. @@ -282,8 +291,6 @@ mod tests { 50.0 ); - let bytes = crate::util::test::random_bytes_1kb(); - let gen = arbitrary::Unstructured::new(&bytes); // Report usage for a different attribution and test that the total and attributed usage are updated let other_attribution = AttributionSource::Peer(PeerKeyLocation::random()); meter.report( From 44d31c2edf428c26501333646607c81aaff511e4 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 10 Nov 2023 16:41:34 -0600 Subject: [PATCH 09/12] wip --- crates/core/src/resources.rs | 38 +++++++++++++++--------------- crates/core/src/resources/meter.rs | 10 +++++--- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/crates/core/src/resources.rs b/crates/core/src/resources.rs index af45c54a0..119535cda 100644 --- a/crates/core/src/resources.rs +++ b/crates/core/src/resources.rs @@ -134,8 +134,8 @@ struct CandidateCost { } pub struct Limits { - pub max_upstream_bandwidth: Bandwidth, - pub max_downstream_bandwidth: Bandwidth, + pub max_upstream_bandwidth: BytesPerSecond, + pub max_downstream_bandwidth: BytesPerSecond, pub max_cpu_usage: InstructionsPerSecond, pub max_memory_usage: f64, pub max_storage_usage: f64, @@ -153,15 +153,15 @@ impl Limits { } #[derive(Debug, Clone, Copy, PartialEq)] -pub struct Bandwidth(f64); -impl Bandwidth { +pub struct BytesPerSecond(f64); +impl BytesPerSecond { pub fn new(bytes_per_second: f64) -> Self { - Bandwidth(bytes_per_second) + BytesPerSecond(bytes_per_second) } } -impl From for f64 { - fn from(val: Bandwidth) -> Self { +impl From for f64 { + fn from(val: BytesPerSecond) -> Self { val.0 } } @@ -196,7 +196,7 @@ impl From for f64 { #[cfg(test)] mod tests { - use crate::resources::{Bandwidth, InstructionsPerSecond, Limits, ResourceManager}; + use crate::resources::{BytesPerSecond, InstructionsPerSecond, Limits, ResourceManager}; use super::*; use std::time::Instant; @@ -205,8 +205,8 @@ mod tests { fn test_resource_manager_report() { // Create a ResourceManager with arbitrary limits let limits = Limits { - max_upstream_bandwidth: Bandwidth::new(1000.0), - max_downstream_bandwidth: Bandwidth::new(1000.0), + max_upstream_bandwidth: BytesPerSecond::new(1000.0), + max_downstream_bandwidth: BytesPerSecond::new(1000.0), max_cpu_usage: InstructionsPerSecond::new(1000.0), max_memory_usage: 1000.0, max_storage_usage: 1000.0, @@ -244,8 +244,8 @@ mod tests { fn test_resource_manager_should_delete_peers() { // Create a ResourceManager with arbitrary limits let limits = Limits { - max_upstream_bandwidth: Bandwidth::new(1000.0), - max_downstream_bandwidth: Bandwidth::new(1000.0), + max_upstream_bandwidth: BytesPerSecond::new(1000.0), + max_downstream_bandwidth: BytesPerSecond::new(1000.0), max_cpu_usage: InstructionsPerSecond::new(1000.0), max_memory_usage: 1000.0, max_storage_usage: 1000.0, @@ -298,7 +298,7 @@ mod tests { let to_delete = resource_manager.should_delete_peers(ResourceType::InboundBandwidthBytes, candidates); - assert!(to_delete.len() == 1); + assert_eq!(to_delete.len(), 1); // Test that the peer with the highest usage is deleted assert_eq!(to_delete[0], peer1); @@ -307,8 +307,8 @@ mod tests { #[test] fn test_update_limits() { let limits = Limits { - max_upstream_bandwidth: Bandwidth::new(1000.0), - max_downstream_bandwidth: Bandwidth::new(1000.0), + max_upstream_bandwidth: BytesPerSecond::new(1000.0), + max_downstream_bandwidth: BytesPerSecond::new(1000.0), max_cpu_usage: InstructionsPerSecond::new(1000.0), max_memory_usage: 1000.0, max_storage_usage: 1000.0, @@ -316,8 +316,8 @@ mod tests { let mut resource_manager = ResourceManager::new(limits); let new_limits = Limits { - max_upstream_bandwidth: Bandwidth::new(2000.0), - max_downstream_bandwidth: Bandwidth::new(2000.0), + max_upstream_bandwidth: BytesPerSecond::new(2000.0), + max_downstream_bandwidth: BytesPerSecond::new(2000.0), max_cpu_usage: InstructionsPerSecond::new(2000.0), max_memory_usage: 2000.0, max_storage_usage: 2000.0, @@ -326,11 +326,11 @@ mod tests { assert_eq!( resource_manager.limits.max_upstream_bandwidth, - Bandwidth::new(2000.0) + BytesPerSecond::new(2000.0) ); assert_eq!( resource_manager.limits.max_downstream_bandwidth, - Bandwidth::new(2000.0) + BytesPerSecond::new(2000.0) ); assert_eq!( resource_manager.limits.max_cpu_usage, diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index 4f44eb3a7..7fb8db777 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -4,7 +4,7 @@ use std::time::Duration; use crate::resources::rate::Rate; use dashmap::DashMap; use freenet_stdlib::prelude::*; -use crate::resources::Bandwidth; +use crate::resources::{BytesPerSecond, InstructionsPerSecond}; use crate::ring::PeerKeyLocation; @@ -78,14 +78,18 @@ impl Meter { } } - pub(crate) fn report_inbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: Bandwidth) { + pub(crate) fn report_inbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: BytesPerSecond) { self.report(attribution, ResourceType::InboundBandwidthBytes, bandwidth.into()); } - pub(crate) fn report_outbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: Bandwidth) { + pub(crate) fn report_outbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: BytesPerSecond) { self.report(attribution, ResourceType::OutboundBandwidthBytes, bandwidth.into()); } + pub(crate) fn report_cpu_usage(&mut self, attribution : &AttributionSource, cpu_usage: InstructionsPerSecond) { + self.report(attribution, ResourceType::CpuInstructions, cpu_usage.into()); + } + /// Report the use of a resource. This should be done in the lowest-level /// functions that consume the resource, taking an AttributionMeter /// as a parameter. From 825d127a1b48af3ad3b0a8733bef38bb030ea419 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 12 Nov 2023 19:08:59 -0600 Subject: [PATCH 10/12] add module documentation --- crates/core/src/resources.rs | 65 ++++++++++++++++++++++++++++++ crates/core/src/resources/meter.rs | 12 ------ 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/crates/core/src/resources.rs b/crates/core/src/resources.rs index 119535cda..e2913c88f 100644 --- a/crates/core/src/resources.rs +++ b/crates/core/src/resources.rs @@ -1,3 +1,68 @@ +//! # Resource Management +//! +//! The resource management module is responsible for tracking resource usage, +//! ensuring that usage does not exceed specified limits, and ensure that those +//! resources are used to maximize the utility of the network. +//! +//! ## Resources +//! +//! The resource management module tracks usage of the following resources: +//! +//! * Upstream and downstream bandwidth +//! * CPU usage +//! +//! These resources will be tracked in the future: +//! +//! * Memory usage +//! * Storage +//! +//! ## Attribution +//! +//! When used this resource usage is attributed to either: +//! +//! * Remote +//! * A connected peer +//! * Local +//! * A local delegate +//! * The user interface +//! +//! ## Resource allocation for contract subscriptions +//! +//! When one or more peers are subscribed to a contract, the required +//! resources will be allocated as follows: +//! +//! * Upstream bandwidth is allocated to the subscribed peer to which +//! the data is sent +//! * Downstream bandwidth and CPU is split evenly between all subscribed +//! peers for that contract +//! +//! ## Resource limits +//! +//! Resource limits should be set to ensure that the peer does not disrupt the +//! user's experience of their computer. We should choose intelligent defaults +//! but the limits should be user-configurable. +//! +//! ## Code component overview +//! +//! The [ResourceManager] is responsible for tracking resource usage and identifying +//! which peers to remove if/when limits are exceeded. It does this by identifying +//! the peers with the highest usage of the limit-exceeding resource relative to +//! their usefulness. The usefulness of a peer is determined by the number of +//! requests sent to that peer over a certain period of time. +//! +//! A [Meter] is used by the ResourceManager to tracking resource usage over time. +//! Resource usage is reported to the meter, which tracks the usage over a sliding +//! window of time. The meter is responsible for calculating the rate of resource +//! usage along with which peers (or delegates, or UIs) are responsible for that +//! usage. +//! +//! ## Future Improvements +//! +//! * Track non-flow resources like memory and storage +//! * Responses to specific requests will contain information about the resources used +//! by downstream peers to fulfill the request, however how this information is used +//! will require careful consideration. + mod meter; pub mod rate; mod running_average; diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index 7fb8db777..380f2d30f 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -78,18 +78,6 @@ impl Meter { } } - pub(crate) fn report_inbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: BytesPerSecond) { - self.report(attribution, ResourceType::InboundBandwidthBytes, bandwidth.into()); - } - - pub(crate) fn report_outbound_bandwidth(&mut self, attribution : &AttributionSource, bandwidth: BytesPerSecond) { - self.report(attribution, ResourceType::OutboundBandwidthBytes, bandwidth.into()); - } - - pub(crate) fn report_cpu_usage(&mut self, attribution : &AttributionSource, cpu_usage: InstructionsPerSecond) { - self.report(attribution, ResourceType::CpuInstructions, cpu_usage.into()); - } - /// Report the use of a resource. This should be done in the lowest-level /// functions that consume the resource, taking an AttributionMeter /// as a parameter. From 239417607e9059b7eecf64c592558d9f4ffd1467 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 12 Nov 2023 19:19:25 -0600 Subject: [PATCH 11/12] fix tests --- crates/core/src/resources/meter.rs | 39 +++++++++--------------------- tests/test-contract-1/Cargo.lock | 4 --- tests/test-delegate-1/Cargo.lock | 4 --- 3 files changed, 12 insertions(+), 35 deletions(-) diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index 380f2d30f..dd8e36303 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -158,24 +158,18 @@ mod tests { let mut meter = Meter::new_with_window_size(100); // Test that the total usage is 0.0 for all resources - assert_eq!( + assert!( meter .resource_usage_rate(ResourceType::InboundBandwidthBytes) - .unwrap(), - Rate::new(0.0, Duration::from_secs(1)) - ); - assert_eq!( + .is_none()); + assert!( meter .resource_usage_rate(ResourceType::OutboundBandwidthBytes) - .unwrap(), - Rate::new(0.0, Duration::from_secs(1)) - ); - assert_eq!( + .is_none()); + assert!( meter .resource_usage_rate(ResourceType::CpuInstructions) - .unwrap(), - Rate::new(0.0, Duration::from_secs(1)) - ); + .is_none()); // Report some usage and test that the total usage is updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); @@ -195,27 +189,18 @@ mod tests { // Test that the attributed usage is 0.0 for all resources let attribution = AttributionSource::Peer(PeerKeyLocation::random()); - assert_eq!( + assert!( meter .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) - .unwrap() - .per_second(), - 0.0 - ); - assert_eq!( + .is_none()); + assert!( meter .attributed_usage_rate(&attribution, ResourceType::OutboundBandwidthBytes) - .unwrap() - .per_second(), - 0.0 - ); - assert_eq!( + .is_none()); + assert!( meter .attributed_usage_rate(&attribution, ResourceType::CpuInstructions) - .unwrap() - .per_second(), - 0.0 - ); + .is_none()); // Report some usage and test that the attributed usage is updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0); diff --git a/tests/test-contract-1/Cargo.lock b/tests/test-contract-1/Cargo.lock index fe1ff01f1..104bda63c 100644 --- a/tests/test-contract-1/Cargo.lock +++ b/tests/test-contract-1/Cargo.lock @@ -37,9 +37,6 @@ name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -dependencies = [ - "serde", -] [[package]] name = "autocfg" @@ -252,7 +249,6 @@ dependencies = [ name = "freenet-stdlib" version = "0.0.8" dependencies = [ - "arrayvec", "bincode", "blake3", "bs58", diff --git a/tests/test-delegate-1/Cargo.lock b/tests/test-delegate-1/Cargo.lock index e4698056d..9f7116812 100644 --- a/tests/test-delegate-1/Cargo.lock +++ b/tests/test-delegate-1/Cargo.lock @@ -37,9 +37,6 @@ name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -dependencies = [ - "serde", -] [[package]] name = "autocfg" @@ -252,7 +249,6 @@ dependencies = [ name = "freenet-stdlib" version = "0.0.8" dependencies = [ - "arrayvec", "bincode", "blake3", "bs58", From 63ae38656b59809615628545fefd55266e09c050 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 12 Nov 2023 19:22:54 -0600 Subject: [PATCH 12/12] cargo fmt --- crates/core/src/resources/meter.rs | 44 +++++++++++++----------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/crates/core/src/resources/meter.rs b/crates/core/src/resources/meter.rs index dd8e36303..27307011d 100644 --- a/crates/core/src/resources/meter.rs +++ b/crates/core/src/resources/meter.rs @@ -2,9 +2,9 @@ use std::hash::Hash; use std::time::Duration; use crate::resources::rate::Rate; +use crate::resources::{BytesPerSecond, InstructionsPerSecond}; use dashmap::DashMap; use freenet_stdlib::prelude::*; -use crate::resources::{BytesPerSecond, InstructionsPerSecond}; use crate::ring::PeerKeyLocation; @@ -158,18 +158,15 @@ mod tests { let mut meter = Meter::new_with_window_size(100); // Test that the total usage is 0.0 for all resources - assert!( - meter - .resource_usage_rate(ResourceType::InboundBandwidthBytes) - .is_none()); - assert!( - meter - .resource_usage_rate(ResourceType::OutboundBandwidthBytes) - .is_none()); - assert!( - meter - .resource_usage_rate(ResourceType::CpuInstructions) - .is_none()); + assert!(meter + .resource_usage_rate(ResourceType::InboundBandwidthBytes) + .is_none()); + assert!(meter + .resource_usage_rate(ResourceType::OutboundBandwidthBytes) + .is_none()); + assert!(meter + .resource_usage_rate(ResourceType::CpuInstructions) + .is_none()); // Report some usage and test that the total usage is updated let attribution = AttributionSource::Peer(PeerKeyLocation::random()); @@ -189,18 +186,15 @@ mod tests { // Test that the attributed usage is 0.0 for all resources let attribution = AttributionSource::Peer(PeerKeyLocation::random()); - assert!( - meter - .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) - .is_none()); - assert!( - meter - .attributed_usage_rate(&attribution, ResourceType::OutboundBandwidthBytes) - .is_none()); - assert!( - meter - .attributed_usage_rate(&attribution, ResourceType::CpuInstructions) - .is_none()); + assert!(meter + .attributed_usage_rate(&attribution, ResourceType::InboundBandwidthBytes) + .is_none()); + assert!(meter + .attributed_usage_rate(&attribution, ResourceType::OutboundBandwidthBytes) + .is_none()); + assert!(meter + .attributed_usage_rate(&attribution, ResourceType::CpuInstructions) + .is_none()); // Report some usage and test that the attributed usage is updated meter.report(&attribution, ResourceType::InboundBandwidthBytes, 100.0);