Skip to content

Commit

Permalink
src: stats: Propagate new structs names, rename some variables
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 16, 2024
1 parent 5cfaf50 commit 32d14b8
Showing 1 changed file with 62 additions and 46 deletions.
108 changes: 62 additions & 46 deletions src/stats/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,37 @@ use tracing::*;
use crate::{
hub::Hub,
stats::{
driver::{DriverStatsInfo, DriverStatsInfoInner},
DriverStats, DriverStatsInner, StatsCommand,
driver::{AccumulatedDriverStats, AccumulatedStatsInner},
DriverStats, DriversStats, StatsCommand, StatsInner,
},
};

pub struct StatsActor {
hub: Hub,
start_time: Arc<RwLock<u64>>,
update_period: Arc<RwLock<tokio::time::Duration>>,
last_raw: Arc<RwLock<Vec<(String, DriverStatsInfo)>>>,
driver_stats: Arc<RwLock<Vec<(String, DriverStats)>>>,
last_accumulated_drivers_stats: Arc<RwLock<Vec<(String, AccumulatedDriverStats)>>>,
drivers_stats: Arc<RwLock<DriversStats>>,
}

impl StatsActor {
pub async fn start(mut self, mut receiver: mpsc::Receiver<StatsCommand>) {
let task = tokio::spawn({
let drivers_stats_task = tokio::spawn({
let hub = self.hub.clone();
let update_period = Arc::clone(&self.update_period);
let last_raw = Arc::clone(&self.last_raw);
let driver_stats = Arc::clone(&self.driver_stats);
let last_accumulated_drivers_stats = Arc::clone(&self.last_accumulated_drivers_stats);
let drivers_stats = Arc::clone(&self.drivers_stats);
let start_time = Arc::clone(&self.start_time);

async move {
loop {
update_driver_stats(&hub, &last_raw, &driver_stats, &start_time).await;
update_driver_stats(
&hub,
&last_accumulated_drivers_stats,
&drivers_stats,
&start_time,
)
.await;

tokio::time::sleep(*update_period.read().await).await;
}
Expand All @@ -58,28 +64,28 @@ impl StatsActor {
}
}

task.abort();
drivers_stats_task.abort();
}

#[instrument(level = "debug", skip(hub))]
pub async fn new(hub: Hub, update_period: tokio::time::Duration) -> Self {
let update_period = Arc::new(RwLock::new(update_period));
let last_raw = Arc::new(RwLock::new(Vec::new()));
let driver_stats = Arc::new(RwLock::new(Vec::new()));
let last_accumulated_drivers_stats = Arc::new(RwLock::new(Vec::new()));
let drivers_stats = Arc::new(RwLock::new(Vec::new()));
let start_time = Arc::new(RwLock::new(chrono::Utc::now().timestamp_micros() as u64));

Self {
hub,
update_period,
last_raw,
driver_stats,
last_accumulated_drivers_stats,
drivers_stats,
start_time,
}
}

#[instrument(level = "debug", skip(self))]
pub async fn drivers_stats(&mut self) -> Result<Vec<(String, DriverStats)>> {
let drivers_stats = self.driver_stats.read().await.clone();
pub async fn drivers_stats(&mut self) -> Result<DriversStats> {
let drivers_stats = self.drivers_stats.read().await.clone();

Ok(drivers_stats)
}
Expand All @@ -93,13 +99,14 @@ impl StatsActor {

#[instrument(level = "debug", skip(self))]
pub async fn reset(&mut self) -> Result<()> {
// note: hold the driver_stats locked until the hub clear each driver stats to minimize weird states
let mut driver_stats = self.driver_stats.write().await;
// note: hold the guards until the hub clear each driver stats to minimize weird states
let mut driver_stats = self.drivers_stats.write().await;

if let Err(error) = self.hub.reset_all_stats().await {
error!("Failed resetting driver stats: {error:?}");
error!("Failed resetting stats: {error:?}");
}
*self.start_time.write().await = chrono::Utc::now().timestamp_micros() as u64;
self.last_raw.write().await.clear();
self.last_accumulated_drivers_stats.write().await.clear();
driver_stats.clear();

Ok(())
Expand All @@ -109,30 +116,35 @@ impl StatsActor {
#[instrument(level = "debug", skip_all)]
async fn update_driver_stats(
hub: &Hub,
last_raw: &Arc<RwLock<Vec<(String, DriverStatsInfo)>>>,
driver_stats: &Arc<RwLock<Vec<(String, DriverStats)>>>,
last_accumulated_drivers_stats: &Arc<RwLock<Vec<(String, AccumulatedDriverStats)>>>,
driver_stats: &Arc<RwLock<DriversStats>>,
start_time: &Arc<RwLock<u64>>,
) {
let last_raw_stats: Vec<(String, DriverStatsInfo)> = last_raw.read().await.clone();
let current_raw_stats: Vec<(String, DriverStatsInfo)> = hub.stats().await.unwrap();
let last_stats = last_accumulated_drivers_stats.read().await.clone();
let current_stats = hub.drivers_stats().await.unwrap();

let last_map: HashMap<_, _> = last_raw_stats.into_iter().collect();
let current_map: HashMap<_, _> = current_raw_stats
let last_map: HashMap<_, _> = last_stats.into_iter().collect();
let current_map: HashMap<_, _> = current_stats
.iter()
.map(|(name, raw)| (name.clone(), raw.clone()))
.collect();

let merged_keys: HashSet<String> = last_map.keys().chain(current_map.keys()).cloned().collect();

let merged_stats: Vec<(String, (Option<DriverStatsInfo>, Option<DriverStatsInfo>))> =
merged_keys
.into_iter()
.map(|name| {
let last = last_map.get(&name).cloned();
let current = current_map.get(&name).cloned();
(name, (last, current))
})
.collect();
let merged_stats: Vec<(
String,
(
Option<AccumulatedDriverStats>,
Option<AccumulatedDriverStats>,
),
)> = merged_keys
.into_iter()
.map(|name| {
let last = last_map.get(&name).cloned();
let current = current_map.get(&name).cloned();
(name, (last, current))
})
.collect();

let mut new_driver_stats = Vec::new();

Expand Down Expand Up @@ -164,18 +176,18 @@ async fn update_driver_stats(
trace!("{new_driver_stats:#?}");

*driver_stats.write().await = new_driver_stats;
*last_raw.write().await = current_raw_stats;
*last_accumulated_drivers_stats.write().await = current_stats;
}

/// Function to calculate the driver stats for either input or output, with proper averages
#[instrument(level = "debug")]
fn calculate_driver_stats(
last_stats: Option<DriverStatsInfoInner>,
current_stats: Option<DriverStatsInfoInner>,
last_stats: Option<AccumulatedStatsInner>,
current_stats: Option<AccumulatedStatsInner>,
start_time: u64,
) -> Option<DriverStatsInner> {
) -> Option<StatsInner> {
if let Some(current_stats) = current_stats {
let time_diff = time_diff(last_stats.as_ref(), &current_stats);
let time_diff = accumulated_driver_stats_time_diff(last_stats.as_ref(), &current_stats);
let total_time = total_time_since_start(start_time, &current_stats);

let diff_messages = current_stats.messages as u64
Expand All @@ -199,7 +211,7 @@ fn calculate_driver_stats(
);
let jitter = (delay - last_delay).abs();

Some(DriverStatsInner {
Some(StatsInner {
last_message_time: current_stats.last_update,
total_bytes,
bytes_per_second,
Expand All @@ -217,24 +229,28 @@ fn calculate_driver_stats(

/// Function to calculate the total time since the start (in seconds)
#[instrument(level = "debug")]
fn total_time_since_start(start_time: u64, current_stats: &DriverStatsInfoInner) -> f64 {
(current_stats.last_update as f64 - start_time as f64) / 1_000_000.0
fn total_time_since_start(start_time: u64, current_stats: &AccumulatedStatsInner) -> f64 {
calculate_time_diff(start_time, current_stats.last_update)
}

/// Function to calculate the time difference (in seconds)
#[instrument(level = "debug")]
fn time_diff(
last_stats: Option<&DriverStatsInfoInner>,
current_stats: &DriverStatsInfoInner,
fn accumulated_driver_stats_time_diff(
last_stats: Option<&AccumulatedStatsInner>,
current_stats: &AccumulatedStatsInner,
) -> f64 {
if let Some(last_stats) = last_stats {
// Microseconds to seconds
(current_stats.last_update as f64 - last_stats.last_update as f64) / 1_000_000.0
calculate_time_diff(last_stats.last_update, current_stats.last_update)
} else {
f64::INFINITY
}
}

fn calculate_time_diff(last_time: u64, current_time: u64) -> f64 {
(current_time as f64 - last_time as f64) / 1_000_000.0
}

#[instrument(level = "debug")]
fn divide_safe(numerator: f64, denominator: f64) -> f64 {
if denominator > 0.0 {
Expand Down

0 comments on commit 32d14b8

Please sign in to comment.