From 05b89763dbcc5e84a62e382fe13d27215eece459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 10 Sep 2024 11:07:21 -0300 Subject: [PATCH 1/8] src: hub: actor: drivers don't need to be internally mutable --- src/hub/actor.rs | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/src/hub/actor.rs b/src/hub/actor.rs index 07c73615..07f00f49 100644 --- a/src/hub/actor.rs +++ b/src/hub/actor.rs @@ -14,7 +14,7 @@ use crate::{ use super::protocol::HubCommand; pub struct HubActor { - drivers: Arc>>>, + drivers: HashMap>, bcst_sender: broadcast::Sender>, last_driver_id: Arc>, component_id: Arc>, @@ -76,7 +76,7 @@ impl HubActor { }); Self { - drivers: Arc::new(RwLock::new(HashMap::new())), + drivers: HashMap::new(), bcst_sender, last_driver_id: Arc::new(RwLock::new(0)), component_id, @@ -86,14 +86,12 @@ impl HubActor { } #[instrument(level = "debug", skip(self, driver))] - pub async fn add_driver(&self, driver: Arc) -> Result { + pub async fn add_driver(&mut self, driver: Arc) -> Result { let mut last_id = self.last_driver_id.write().await; let id = *last_id; *last_id += 1; - let mut drivers = self.drivers.write().await; - - if drivers.insert(id, driver.clone()).is_some() { + if self.drivers.insert(id, driver.clone()).is_some() { return Err(anyhow!( "Failed addinng driver: id {id:?} is already present" )); @@ -107,16 +105,16 @@ impl HubActor { } #[instrument(level = "debug", skip(self))] - pub async fn remove_driver(&self, id: u64) -> Result<()> { - let mut drivers = self.drivers.write().await; - drivers.remove(&id).context("Driver id {id:?} not found")?; + pub async fn remove_driver(&mut self, id: u64) -> Result<()> { + self.drivers + .remove(&id) + .context("Driver id {id:?} not found")?; Ok(()) } #[instrument(level = "debug", skip(self))] pub async fn drivers(&self) -> HashMap> { - let drivers = self.drivers.read().await; - drivers + self.drivers .iter() .map(|(&id, driver)| (id, driver.info())) .collect() @@ -170,10 +168,8 @@ impl HubActor { #[instrument(level = "debug", skip(self))] pub async fn get_stats(&self) -> Vec<(String, DriverStatsInfo)> { - let drivers = self.drivers.read().await; - - let mut drivers_stats = Vec::with_capacity(drivers.len()); - for (_id, driver) in drivers.iter() { + let mut drivers_stats = Vec::with_capacity(self.drivers.len()); + for (_id, driver) in self.drivers.iter() { let stats = driver.stats().await; let info = driver.info(); let name = info.name().to_owned(); @@ -185,10 +181,8 @@ impl HubActor { } #[instrument(level = "debug", skip(self))] - pub async fn reset_all_stats(&self) -> Result<()> { - let drivers = self.drivers.write().await; - - for (_id, driver) in drivers.iter() { + pub async fn reset_all_stats(&mut self) -> Result<()> { + for (_id, driver) in self.drivers.iter() { driver.reset_stats().await; } From 4dde7dfd48f61cc4b09f946999d535fa78f6bd3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 10 Sep 2024 11:23:42 -0300 Subject: [PATCH 2/8] src: reduce scope of the clones --- src/drivers/fake.rs | 98 +++++++++++++++++++++----------------- src/drivers/mod.rs | 14 +++--- src/drivers/tcp/client.rs | 3 +- src/drivers/tlog/reader.rs | 28 ++++++----- src/hub/actor.rs | 21 ++++---- 5 files changed, 86 insertions(+), 78 deletions(-) diff --git a/src/drivers/fake.rs b/src/drivers/fake.rs index f733efcb..10afef1c 100644 --- a/src/drivers/fake.rs +++ b/src/drivers/fake.rs @@ -207,34 +207,35 @@ impl Driver for FakeSource { buf.clear(); mavlink::write_v2_msg(&mut buf, header, &data).expect("Failed to write message"); - let hub_sender_cloned = hub_sender.clone(); - read_all_messages("FakeSource", &mut buf, move |message| { - let message = Arc::new(message); - let hub_sender = hub_sender_cloned.clone(); - - async move { - trace!("Fake message created: {message:?}"); - - self.stats - .write() - .await - .update_output(Arc::clone(&message)) - .await; - - for future in self.on_message_output.call_all(Arc::clone(&message)) { - if let Err(error) = future.await { - debug!( - "Dropping message: on_message_input callback returned error: {error:?}" - ); - continue; + read_all_messages("FakeSource", &mut buf, { + let hub_sender = hub_sender.clone(); + move |message| { + let message = Arc::new(message); + let hub_sender = hub_sender.clone(); + + async move { + trace!("Fake message created: {message:?}"); + + self.stats + .write() + .await + .update_output(Arc::clone(&message)) + .await; + + for future in self.on_message_output.call_all(Arc::clone(&message)) { + if let Err(error) = future.await { + debug!( + "Dropping message: on_message_input callback returned error: {error:?}" + ); + continue; + } } - } - if let Err(error) = hub_sender.send(message) { - error!("Failed to send message to hub: {error:?}"); + if let Err(error) = hub_sender.send(message) { + error!("Failed to send message to hub: {error:?}"); + } } - } - }) + }}) .await; tokio::time::sleep(self.period).await; @@ -319,14 +320,17 @@ mod test { let sink_messages = Arc::new(RwLock::new(Vec::>::with_capacity(1000))); // FakeSink and task - let sink_messages_clone = sink_messages.clone(); let sink = FakeSink::builder() - .on_message_input(move |message: Arc| { - let sink_messages = sink_messages_clone.clone(); + .on_message_input({ + let sink_messages = sink_messages.clone(); + + move |message: Arc| { + let sink_messages = sink_messages.clone(); - async move { - sink_messages.write().await.push(message); - Ok(()) + async move { + sink_messages.write().await.push(message); + Ok(()) + } } }) .build(); @@ -337,14 +341,16 @@ mod test { }); // FakeSource and task - let source_messages_clone = source_messages.clone(); let source = FakeSource::builder(message_period) - .on_message_output(move |message: Arc| { - let source_messages = source_messages_clone.clone(); - - async move { - source_messages.write().await.push(message); - Ok(()) + .on_message_output({ + let source_messages = source_messages.clone(); + move |message: Arc| { + let source_messages = source_messages.clone(); + + async move { + source_messages.write().await.push(message); + Ok(()) + } } }) .build(); @@ -355,14 +361,16 @@ mod test { }); // Monitoring task to wait the - let sink_messages_clone = sink_messages.clone(); - let sink_monitor_task = tokio::spawn(async move { - loop { - if sink_messages_clone.read().await.len() >= number_of_messages { - break; - } + let sink_monitor_task = tokio::spawn({ + let sink_messages = sink_messages.clone(); + async move { + loop { + if sink_messages.read().await.len() >= number_of_messages { + break; + } - tokio::time::sleep(std::time::Duration::from_millis(1)).await; + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } } }); let _ = tokio::time::timeout(timeout_time, sink_monitor_task) diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index 1c28f567..953a85ad 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -347,15 +347,17 @@ mod tests { let (sender, _receiver) = tokio::sync::broadcast::channel(1); let called = Arc::new(RwLock::new(false)); - let called_cloned = called.clone(); let driver = ExampleDriver::new() - .on_message_input(move |_msg| { - let called = called_cloned.clone(); + .on_message_input({ + let called = called.clone(); + move |_msg| { + let called = called.clone(); - async move { - *called.write().await = true; + async move { + *called.write().await = true; - Err(anyhow!("Finished from callback")) + Err(anyhow!("Finished from callback")) + } } }) .build(); diff --git a/src/drivers/tcp/client.rs b/src/drivers/tcp/client.rs index c64d8cce..f570402c 100644 --- a/src/drivers/tcp/client.rs +++ b/src/drivers/tcp/client.rs @@ -80,10 +80,9 @@ impl Driver for TcpClient { debug!("TcpClient successfully connected to {server_addr:?}"); let hub_receiver = hub_sender.subscribe(); - let hub_sender_cloned = Arc::clone(&hub_sender); tokio::select! { - result = tcp_receive_task(read, server_addr, hub_sender_cloned, &self.on_message_input, &self.stats) => { + result = tcp_receive_task(read, server_addr, Arc::clone(&hub_sender), &self.on_message_input, &self.stats) => { if let Err(e) = result { error!("Error in TCP receive task: {e:?}"); } diff --git a/src/drivers/tlog/reader.rs b/src/drivers/tlog/reader.rs index ffdeaba1..3150849b 100644 --- a/src/drivers/tlog/reader.rs +++ b/src/drivers/tlog/reader.rs @@ -233,25 +233,27 @@ mod tests { let messages_received_per_id = Arc::new(RwLock::new(BTreeMap::>>::new())); - let messages_received_cloned = messages_received_per_id.clone(); let tlog_file = PathBuf::from_str("tests/files/00025-2024-04-22_18-49-07.tlog").unwrap(); let driver = TlogReader::builder(tlog_file.clone()) - .on_message_input(move |message: Arc| { - let messages_received = messages_received_cloned.clone(); - - async move { - let message_id = message.message_id(); + .on_message_input({ + let messages_received_per_id = messages_received_per_id.clone(); + move |message: Arc| { + let messages_received = messages_received_per_id.clone(); + + async move { + let message_id = message.message_id(); + + let mut messages_received = messages_received.write().await; + if let Some(samples) = messages_received.get_mut(&message_id) { + samples.push(message); + } else { + messages_received.insert(message_id, Vec::from([message])); + } - let mut messages_received = messages_received.write().await; - if let Some(samples) = messages_received.get_mut(&message_id) { - samples.push(message); - } else { - messages_received.insert(message_id, Vec::from([message])); + Ok(()) } - - Ok(()) } }) .build(); diff --git a/src/hub/actor.rs b/src/hub/actor.rs index 07f00f49..3daf3cb7 100644 --- a/src/hub/actor.rs +++ b/src/hub/actor.rs @@ -61,18 +61,15 @@ impl HubActor { ) -> Self { let (bcst_sender, _) = broadcast::channel(buffer_size); - let bcst_sender_cloned = bcst_sender.clone(); - let component_id_cloned = component_id.clone(); - let system_id_cloned = system_id.clone(); - let frequency_cloned = frequency.clone(); - let heartbeat_task = tokio::spawn(async move { - Self::heartbeat_task( - bcst_sender_cloned, - component_id_cloned, - system_id_cloned, - frequency_cloned, - ) - .await + let heartbeat_task = tokio::spawn({ + let bcst_sender = bcst_sender.clone(); + let component_id = component_id.clone(); + let system_id = system_id.clone(); + let frequency = frequency.clone(); + + Self::heartbeat_task(bcst_sender, component_id, system_id, frequency) + }); + }); Self { From a0f0ca96091879f6d870fd80dbd2deba7b3ba86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Mon, 16 Sep 2024 13:18:06 -0300 Subject: [PATCH 3/8] src: Rename functions and structs related to driver stats --- src/drivers/fake.rs | 22 +++++++++++----------- src/drivers/mod.rs | 16 ++++++++-------- src/drivers/serial/mod.rs | 12 ++++++------ src/drivers/tcp/client.rs | 12 ++++++------ src/drivers/tcp/mod.rs | 6 +++--- src/drivers/tcp/server.rs | 14 +++++++------- src/drivers/tlog/reader.rs | 12 ++++++------ src/drivers/tlog/writer.rs | 12 ++++++------ src/drivers/udp/client.rs | 12 ++++++------ src/drivers/udp/server.rs | 12 ++++++------ 10 files changed, 65 insertions(+), 65 deletions(-) diff --git a/src/drivers/fake.rs b/src/drivers/fake.rs index 10afef1c..7e744eeb 100644 --- a/src/drivers/fake.rs +++ b/src/drivers/fake.rs @@ -8,13 +8,13 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct FakeSink { on_message_input: Callbacks>, print: bool, - stats: Arc>, + stats: Arc>, } impl FakeSink { @@ -22,7 +22,7 @@ impl FakeSink { FakeSinkBuilder(Self { on_message_input: Callbacks::new(), print: false, - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -93,13 +93,13 @@ impl Driver for FakeSink { } #[async_trait::async_trait] -impl DriverStats for FakeSink { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for FakeSink { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } @@ -146,7 +146,7 @@ impl DriverInfo for FakeSinkInfo { pub struct FakeSource { period: std::time::Duration, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } impl FakeSource { @@ -154,7 +154,7 @@ impl FakeSource { FakeSourceBuilder(Self { period, on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -248,13 +248,13 @@ impl Driver for FakeSource { } #[async_trait::async_trait] -impl DriverStats for FakeSource { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for FakeSource { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index 953a85ad..afe2e1d8 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -12,7 +12,7 @@ use tokio::sync::broadcast; use tracing::*; use url::Url; -use crate::{protocol::Protocol, stats::driver::DriverStats}; +use crate::{protocol::Protocol, stats::driver::AccumulatedDriverStatsProvider}; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub enum Type { @@ -36,7 +36,7 @@ pub struct DriverDescriptionLegacy { } #[async_trait::async_trait] -pub trait Driver: Send + Sync + DriverStats { +pub trait Driver: Send + Sync + AccumulatedDriverStatsProvider { async fn run(&self, hub_sender: broadcast::Sender>) -> Result<()>; fn info(&self) -> Box; } @@ -224,7 +224,7 @@ mod tests { use tokio::sync::RwLock; use tracing::*; - use crate::stats::driver::DriverStatsInfo; + use crate::stats::driver::AccumulatedDriverStats; use super::*; @@ -244,7 +244,7 @@ mod tests { pub struct ExampleDriver { on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } impl ExampleDriver { @@ -252,7 +252,7 @@ mod tests { ExampleDriverBuilder(Self { on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -306,13 +306,13 @@ mod tests { } #[async_trait::async_trait] - impl DriverStats for ExampleDriver { - async fn stats(&self) -> DriverStatsInfo { + impl AccumulatedDriverStatsProvider for ExampleDriver { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/serial/mod.rs b/src/drivers/serial/mod.rs index 70bfdc34..4c1a6546 100644 --- a/src/drivers/serial/mod.rs +++ b/src/drivers/serial/mod.rs @@ -12,7 +12,7 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct Serial { @@ -20,7 +20,7 @@ pub struct Serial { pub baud_rate: u32, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct SerialBuilder(Serial); @@ -55,7 +55,7 @@ impl Serial { baud_rate, on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -179,13 +179,13 @@ impl Driver for Serial { } #[async_trait::async_trait] -impl DriverStats for Serial { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for Serial { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tcp/client.rs b/src/drivers/tcp/client.rs index f570402c..782e82f1 100644 --- a/src/drivers/tcp/client.rs +++ b/src/drivers/tcp/client.rs @@ -14,14 +14,14 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; pub struct TcpClient { pub remote_addr: String, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TcpClientBuilder(TcpClient); @@ -55,7 +55,7 @@ impl TcpClient { remote_addr: remote_addr.to_string(), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } } @@ -105,13 +105,13 @@ impl Driver for TcpClient { } #[async_trait::async_trait] -impl DriverStats for TcpClient { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TcpClient { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tcp/mod.rs b/src/drivers/tcp/mod.rs index 987e93f6..0c546cba 100644 --- a/src/drivers/tcp/mod.rs +++ b/src/drivers/tcp/mod.rs @@ -11,7 +11,7 @@ use tracing::*; use crate::{ protocol::{read_all_messages, Protocol}, - stats::driver::DriverStatsInfo, + stats::driver::AccumulatedDriverStats, }; pub mod client; @@ -24,7 +24,7 @@ async fn tcp_receive_task( remote_addr: &str, hub_sender: Arc>>, on_message_input: &Callbacks>, - stats: &Arc>, + stats: &Arc>, ) -> Result<()> { let mut buf = Vec::with_capacity(1024); @@ -67,7 +67,7 @@ async fn tcp_send_task( remote_addr: &str, mut hub_receiver: broadcast::Receiver>, on_message_output: &Callbacks>, - stats: &Arc>, + stats: &Arc>, ) -> Result<()> { loop { let message = match hub_receiver.recv().await { diff --git a/src/drivers/tcp/server.rs b/src/drivers/tcp/server.rs index c824f726..711d39ee 100644 --- a/src/drivers/tcp/server.rs +++ b/src/drivers/tcp/server.rs @@ -14,7 +14,7 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; #[derive(Clone)] @@ -22,7 +22,7 @@ pub struct TcpServer { pub local_addr: String, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TcpServerBuilder(TcpServer); @@ -56,7 +56,7 @@ impl TcpServer { local_addr: local_addr.to_string(), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -71,7 +71,7 @@ impl TcpServer { hub_sender: Arc>>, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, ) -> Result<()> { let hub_receiver = hub_sender.subscribe(); @@ -132,13 +132,13 @@ impl Driver for TcpServer { } #[async_trait::async_trait] -impl DriverStats for TcpServer { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TcpServer { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tlog/reader.rs b/src/drivers/tlog/reader.rs index 3150849b..10777f41 100644 --- a/src/drivers/tlog/reader.rs +++ b/src/drivers/tlog/reader.rs @@ -10,13 +10,13 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; pub struct TlogReader { pub path: PathBuf, on_message_input: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TlogReaderBuilder(TlogReader); @@ -41,7 +41,7 @@ impl TlogReader { TlogReaderBuilder(Self { path, on_message_input: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -141,13 +141,13 @@ impl Driver for TlogReader { } #[async_trait::async_trait] -impl DriverStats for TlogReader { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TlogReader { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/tlog/writer.rs b/src/drivers/tlog/writer.rs index 3bb4f38c..8da74a26 100644 --- a/src/drivers/tlog/writer.rs +++ b/src/drivers/tlog/writer.rs @@ -11,13 +11,13 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, }; pub struct TlogWriter { pub path: PathBuf, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct TlogWriterBuilder(TlogWriter); @@ -42,7 +42,7 @@ impl TlogWriter { TlogWriterBuilder(Self { path, on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -109,13 +109,13 @@ impl Driver for TlogWriter { } #[async_trait::async_trait] -impl DriverStats for TlogWriter { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for TlogWriter { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/udp/client.rs b/src/drivers/udp/client.rs index c50d4fb7..cd8e9a26 100644 --- a/src/drivers/udp/client.rs +++ b/src/drivers/udp/client.rs @@ -11,14 +11,14 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct UdpClient { pub remote_addr: String, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct UdpClientBuilder(UdpClient); @@ -52,7 +52,7 @@ impl UdpClient { remote_addr: remote_addr.to_string(), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -209,13 +209,13 @@ impl Driver for UdpClient { } #[async_trait::async_trait] -impl DriverStats for UdpClient { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for UdpClient { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } diff --git a/src/drivers/udp/server.rs b/src/drivers/udp/server.rs index bfacd948..b8572804 100644 --- a/src/drivers/udp/server.rs +++ b/src/drivers/udp/server.rs @@ -11,7 +11,7 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::{read_all_messages, Protocol}, - stats::driver::{DriverStats, DriverStatsInfo}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct UdpServer { @@ -19,7 +19,7 @@ pub struct UdpServer { clients: Arc>>, on_message_input: Callbacks>, on_message_output: Callbacks>, - stats: Arc>, + stats: Arc>, } pub struct UdpServerBuilder(UdpServer); @@ -54,7 +54,7 @@ impl UdpServer { clients: Arc::new(RwLock::new(HashMap::new())), on_message_input: Callbacks::new(), on_message_output: Callbacks::new(), - stats: Arc::new(RwLock::new(DriverStatsInfo::default())), + stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())), }) } @@ -217,13 +217,13 @@ impl Driver for UdpServer { } #[async_trait::async_trait] -impl DriverStats for UdpServer { - async fn stats(&self) -> DriverStatsInfo { +impl AccumulatedDriverStatsProvider for UdpServer { + async fn stats(&self) -> AccumulatedDriverStats { self.stats.read().await.clone() } async fn reset_stats(&self) { - *self.stats.write().await = DriverStatsInfo { + *self.stats.write().await = AccumulatedDriverStats { input: None, output: None, } From 663e4b6cbf4590aa6d51c6b103485eeb37bbab66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Mon, 16 Sep 2024 14:41:13 -0300 Subject: [PATCH 4/8] src: stats: Implement Default for DriverStatsInner --- src/stats/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 0b8b264f..f29f4867 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -18,7 +18,7 @@ pub struct DriverStats { output: Option, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct DriverStatsInner { last_message_time: u64, From 7c8c7ba6dbbf52dd713a4001e0e373f2ab55de15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Mon, 16 Sep 2024 14:42:16 -0300 Subject: [PATCH 5/8] src: stats: Rename DriverStatsInner to StatsInner, add DriversStats alias --- src/stats/mod.rs | 10 ++++++---- src/stats/protocol.rs | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/stats/mod.rs b/src/stats/mod.rs index f29f4867..aef91755 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -12,14 +12,16 @@ use protocol::StatsCommand; use crate::hub::Hub; +pub type DriversStats = Vec<(String, DriverStats)>; + #[derive(Debug, Clone)] pub struct DriverStats { - input: Option, - output: Option, + input: Option, + output: Option, } #[derive(Debug, Clone, Default)] -pub struct DriverStatsInner { +pub struct StatsInner { last_message_time: u64, total_bytes: u64, @@ -48,7 +50,7 @@ impl Stats { Self { sender, task } } - pub async fn driver_stats(&mut self) -> Result> { + pub async fn driver_stats(&mut self) -> Result { let (response_tx, response_rx) = oneshot::channel(); self.sender .send(StatsCommand::GetDriversStats { diff --git a/src/stats/protocol.rs b/src/stats/protocol.rs index 297a155f..a95930e2 100644 --- a/src/stats/protocol.rs +++ b/src/stats/protocol.rs @@ -1,7 +1,7 @@ use anyhow::Result; use tokio::sync::oneshot; -use crate::stats::DriverStats; +use crate::stats::DriversStats; pub enum StatsCommand { SetPeriod { @@ -12,6 +12,6 @@ pub enum StatsCommand { response: oneshot::Sender>, }, GetDriversStats { - response: oneshot::Sender>>, + response: oneshot::Sender>, }, } From 5cfaf500337ebe8101bf24dec17c41d018aa337b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Mon, 16 Sep 2024 14:44:14 -0300 Subject: [PATCH 6/8] src: stats: driver: Rename structs --- src/stats/driver.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/stats/driver.rs b/src/stats/driver.rs index 18236a37..6f6aeebb 100644 --- a/src/stats/driver.rs +++ b/src/stats/driver.rs @@ -3,23 +3,23 @@ use std::sync::Arc; use crate::protocol::Protocol; #[async_trait::async_trait] -pub trait DriverStats { - async fn stats(&self) -> DriverStatsInfo; +pub trait AccumulatedDriverStatsProvider { + async fn stats(&self) -> AccumulatedDriverStats; async fn reset_stats(&self); } #[derive(Default, Debug, Clone)] -pub struct DriverStatsInfo { - pub input: Option, - pub output: Option, +pub struct AccumulatedDriverStats { + pub input: Option, + pub output: Option, } -impl DriverStatsInfo { +impl AccumulatedDriverStats { pub async fn update_input(&mut self, message: Arc) { if let Some(stats) = self.input.as_mut() { stats.update(message).await; } else { - self.input.replace(DriverStatsInfoInner::default()); + self.input.replace(AccumulatedStatsInner::default()); } } @@ -27,20 +27,20 @@ impl DriverStatsInfo { if let Some(stats) = self.output.as_mut() { stats.update(message).await; } else { - self.output.replace(DriverStatsInfoInner::default()); + self.output.replace(AccumulatedStatsInner::default()); } } } #[derive(Clone, Debug)] -pub struct DriverStatsInfoInner { +pub struct AccumulatedStatsInner { pub last_update: u64, pub messages: usize, pub bytes: usize, pub delay: u64, } -impl Default for DriverStatsInfoInner { +impl Default for AccumulatedStatsInner { fn default() -> Self { Self { last_update: chrono::Utc::now().timestamp_micros() as u64, @@ -51,7 +51,7 @@ impl Default for DriverStatsInfoInner { } } -impl DriverStatsInfoInner { +impl AccumulatedStatsInner { pub async fn update(&mut self, message: Arc) { self.last_update = chrono::Utc::now().timestamp_micros() as u64; self.bytes += message.raw_bytes().len(); From 32d14b85fbde4ed4d043015c535ac09b479ac025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Mon, 16 Sep 2024 14:45:33 -0300 Subject: [PATCH 7/8] src: stats: Propagate new structs names, rename some variables --- src/stats/actor.rs | 108 ++++++++++++++++++++++++++------------------- 1 file changed, 62 insertions(+), 46 deletions(-) diff --git a/src/stats/actor.rs b/src/stats/actor.rs index 12caf6c8..3c00eec1 100644 --- a/src/stats/actor.rs +++ b/src/stats/actor.rs @@ -10,8 +10,8 @@ use tracing::*; use crate::{ hub::Hub, stats::{ - driver::{DriverStatsInfo, DriverStatsInfoInner}, - DriverStats, DriverStatsInner, StatsCommand, + driver::{AccumulatedDriverStats, AccumulatedStatsInner}, + DriverStats, DriversStats, StatsCommand, StatsInner, }, }; @@ -19,22 +19,28 @@ pub struct StatsActor { hub: Hub, start_time: Arc>, update_period: Arc>, - last_raw: Arc>>, - driver_stats: Arc>>, + last_accumulated_drivers_stats: Arc>>, + drivers_stats: Arc>, } impl StatsActor { pub async fn start(mut self, mut receiver: mpsc::Receiver) { - let task = tokio::spawn({ + let drivers_stats_task = tokio::spawn({ let hub = self.hub.clone(); let update_period = Arc::clone(&self.update_period); - let last_raw = Arc::clone(&self.last_raw); - let driver_stats = Arc::clone(&self.driver_stats); + let last_accumulated_drivers_stats = Arc::clone(&self.last_accumulated_drivers_stats); + let drivers_stats = Arc::clone(&self.drivers_stats); let start_time = Arc::clone(&self.start_time); async move { loop { - update_driver_stats(&hub, &last_raw, &driver_stats, &start_time).await; + update_driver_stats( + &hub, + &last_accumulated_drivers_stats, + &drivers_stats, + &start_time, + ) + .await; tokio::time::sleep(*update_period.read().await).await; } @@ -58,28 +64,28 @@ impl StatsActor { } } - task.abort(); + drivers_stats_task.abort(); } #[instrument(level = "debug", skip(hub))] pub async fn new(hub: Hub, update_period: tokio::time::Duration) -> Self { let update_period = Arc::new(RwLock::new(update_period)); - let last_raw = Arc::new(RwLock::new(Vec::new())); - let driver_stats = Arc::new(RwLock::new(Vec::new())); + let last_accumulated_drivers_stats = Arc::new(RwLock::new(Vec::new())); + let drivers_stats = Arc::new(RwLock::new(Vec::new())); let start_time = Arc::new(RwLock::new(chrono::Utc::now().timestamp_micros() as u64)); Self { hub, update_period, - last_raw, - driver_stats, + last_accumulated_drivers_stats, + drivers_stats, start_time, } } #[instrument(level = "debug", skip(self))] - pub async fn drivers_stats(&mut self) -> Result> { - let drivers_stats = self.driver_stats.read().await.clone(); + pub async fn drivers_stats(&mut self) -> Result { + let drivers_stats = self.drivers_stats.read().await.clone(); Ok(drivers_stats) } @@ -93,13 +99,14 @@ impl StatsActor { #[instrument(level = "debug", skip(self))] pub async fn reset(&mut self) -> Result<()> { - // note: hold the driver_stats locked until the hub clear each driver stats to minimize weird states - let mut driver_stats = self.driver_stats.write().await; + // note: hold the guards until the hub clear each driver stats to minimize weird states + let mut driver_stats = self.drivers_stats.write().await; + if let Err(error) = self.hub.reset_all_stats().await { - error!("Failed resetting driver stats: {error:?}"); + error!("Failed resetting stats: {error:?}"); } *self.start_time.write().await = chrono::Utc::now().timestamp_micros() as u64; - self.last_raw.write().await.clear(); + self.last_accumulated_drivers_stats.write().await.clear(); driver_stats.clear(); Ok(()) @@ -109,30 +116,35 @@ impl StatsActor { #[instrument(level = "debug", skip_all)] async fn update_driver_stats( hub: &Hub, - last_raw: &Arc>>, - driver_stats: &Arc>>, + last_accumulated_drivers_stats: &Arc>>, + driver_stats: &Arc>, start_time: &Arc>, ) { - let last_raw_stats: Vec<(String, DriverStatsInfo)> = last_raw.read().await.clone(); - let current_raw_stats: Vec<(String, DriverStatsInfo)> = hub.stats().await.unwrap(); + let last_stats = last_accumulated_drivers_stats.read().await.clone(); + let current_stats = hub.drivers_stats().await.unwrap(); - let last_map: HashMap<_, _> = last_raw_stats.into_iter().collect(); - let current_map: HashMap<_, _> = current_raw_stats + let last_map: HashMap<_, _> = last_stats.into_iter().collect(); + let current_map: HashMap<_, _> = current_stats .iter() .map(|(name, raw)| (name.clone(), raw.clone())) .collect(); let merged_keys: HashSet = last_map.keys().chain(current_map.keys()).cloned().collect(); - let merged_stats: Vec<(String, (Option, Option))> = - merged_keys - .into_iter() - .map(|name| { - let last = last_map.get(&name).cloned(); - let current = current_map.get(&name).cloned(); - (name, (last, current)) - }) - .collect(); + let merged_stats: Vec<( + String, + ( + Option, + Option, + ), + )> = merged_keys + .into_iter() + .map(|name| { + let last = last_map.get(&name).cloned(); + let current = current_map.get(&name).cloned(); + (name, (last, current)) + }) + .collect(); let mut new_driver_stats = Vec::new(); @@ -164,18 +176,18 @@ async fn update_driver_stats( trace!("{new_driver_stats:#?}"); *driver_stats.write().await = new_driver_stats; - *last_raw.write().await = current_raw_stats; + *last_accumulated_drivers_stats.write().await = current_stats; } /// Function to calculate the driver stats for either input or output, with proper averages #[instrument(level = "debug")] fn calculate_driver_stats( - last_stats: Option, - current_stats: Option, + last_stats: Option, + current_stats: Option, start_time: u64, -) -> Option { +) -> Option { if let Some(current_stats) = current_stats { - let time_diff = time_diff(last_stats.as_ref(), ¤t_stats); + let time_diff = accumulated_driver_stats_time_diff(last_stats.as_ref(), ¤t_stats); let total_time = total_time_since_start(start_time, ¤t_stats); let diff_messages = current_stats.messages as u64 @@ -199,7 +211,7 @@ fn calculate_driver_stats( ); let jitter = (delay - last_delay).abs(); - Some(DriverStatsInner { + Some(StatsInner { last_message_time: current_stats.last_update, total_bytes, bytes_per_second, @@ -217,24 +229,28 @@ fn calculate_driver_stats( /// Function to calculate the total time since the start (in seconds) #[instrument(level = "debug")] -fn total_time_since_start(start_time: u64, current_stats: &DriverStatsInfoInner) -> f64 { - (current_stats.last_update as f64 - start_time as f64) / 1_000_000.0 +fn total_time_since_start(start_time: u64, current_stats: &AccumulatedStatsInner) -> f64 { + calculate_time_diff(start_time, current_stats.last_update) } /// Function to calculate the time difference (in seconds) #[instrument(level = "debug")] -fn time_diff( - last_stats: Option<&DriverStatsInfoInner>, - current_stats: &DriverStatsInfoInner, +fn accumulated_driver_stats_time_diff( + last_stats: Option<&AccumulatedStatsInner>, + current_stats: &AccumulatedStatsInner, ) -> f64 { if let Some(last_stats) = last_stats { // Microseconds to seconds - (current_stats.last_update as f64 - last_stats.last_update as f64) / 1_000_000.0 + calculate_time_diff(last_stats.last_update, current_stats.last_update) } else { f64::INFINITY } } +fn calculate_time_diff(last_time: u64, current_time: u64) -> f64 { + (current_time as f64 - last_time as f64) / 1_000_000.0 +} + #[instrument(level = "debug")] fn divide_safe(numerator: f64, denominator: f64) -> f64 { if denominator > 0.0 { From c9691cb24af90f85ffb5423a0525ae6deace3511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Mon, 16 Sep 2024 14:47:00 -0300 Subject: [PATCH 8/8] src: Propagate new structs names --- src/drivers/tcp/client.rs | 2 +- src/drivers/tcp/server.rs | 2 +- src/drivers/tlog/reader.rs | 2 +- src/drivers/tlog/writer.rs | 2 +- src/hub/actor.rs | 15 ++++++--------- src/hub/mod.rs | 12 ++++++------ src/hub/protocol.rs | 6 +++--- 7 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/drivers/tcp/client.rs b/src/drivers/tcp/client.rs index 782e82f1..152dcbd8 100644 --- a/src/drivers/tcp/client.rs +++ b/src/drivers/tcp/client.rs @@ -14,7 +14,7 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct TcpClient { diff --git a/src/drivers/tcp/server.rs b/src/drivers/tcp/server.rs index 711d39ee..808c9ce0 100644 --- a/src/drivers/tcp/server.rs +++ b/src/drivers/tcp/server.rs @@ -14,7 +14,7 @@ use crate::{ Driver, DriverInfo, }, protocol::Protocol, - stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; #[derive(Clone)] diff --git a/src/drivers/tlog/reader.rs b/src/drivers/tlog/reader.rs index 10777f41..21a41678 100644 --- a/src/drivers/tlog/reader.rs +++ b/src/drivers/tlog/reader.rs @@ -10,7 +10,7 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct TlogReader { diff --git a/src/drivers/tlog/writer.rs b/src/drivers/tlog/writer.rs index 8da74a26..c7627bbe 100644 --- a/src/drivers/tlog/writer.rs +++ b/src/drivers/tlog/writer.rs @@ -11,7 +11,7 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats}, + stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider}, }; pub struct TlogWriter { diff --git a/src/hub/actor.rs b/src/hub/actor.rs index 3daf3cb7..7b5061c2 100644 --- a/src/hub/actor.rs +++ b/src/hub/actor.rs @@ -7,12 +7,11 @@ use tracing::*; use crate::{ drivers::{Driver, DriverInfo}, + hub::HubCommand, protocol::Protocol, - stats::driver::DriverStatsInfo, + stats::driver::AccumulatedDriverStats, }; -use super::protocol::HubCommand; - pub struct HubActor { drivers: HashMap>, bcst_sender: broadcast::Sender>, @@ -41,9 +40,9 @@ impl HubActor { HubCommand::GetSender { response } => { let _ = response.send(self.bcst_sender.clone()); } - HubCommand::GetStats { response } => { - let stats = self.get_stats().await; - let _ = response.send(stats); + HubCommand::GetDriversStats { response } => { + let drivers_stats = self.get_drivers_stats().await; + let _ = response.send(drivers_stats); } HubCommand::ResetAllStats { response } => { let _ = response.send(self.reset_all_stats().await); @@ -70,8 +69,6 @@ impl HubActor { Self::heartbeat_task(bcst_sender, component_id, system_id, frequency) }); - }); - Self { drivers: HashMap::new(), bcst_sender, @@ -164,7 +161,7 @@ impl HubActor { } #[instrument(level = "debug", skip(self))] - pub async fn get_stats(&self) -> Vec<(String, DriverStatsInfo)> { + pub async fn get_drivers_stats(&self) -> Vec<(String, AccumulatedDriverStats)> { let mut drivers_stats = Vec::with_capacity(self.drivers.len()); for (_id, driver) in self.drivers.iter() { let stats = driver.stats().await; diff --git a/src/hub/mod.rs b/src/hub/mod.rs index e4bfa210..48908a99 100644 --- a/src/hub/mod.rs +++ b/src/hub/mod.rs @@ -12,7 +12,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::DriverStatsInfo, + stats::driver::AccumulatedDriverStats, }; use actor::HubActor; @@ -21,7 +21,7 @@ use protocol::HubCommand; #[derive(Clone)] pub struct Hub { sender: mpsc::Sender, - task: Arc>>, + _task: Arc>>, } impl Hub { @@ -33,8 +33,8 @@ impl Hub { ) -> Self { let (sender, receiver) = mpsc::channel(32); let hub = HubActor::new(buffer_size, component_id, system_id, frequency).await; - let task = Arc::new(Mutex::new(tokio::spawn(hub.start(receiver)))); - Self { sender, task } + let _task = Arc::new(Mutex::new(tokio::spawn(hub.start(receiver)))); + Self { sender, _task } } pub async fn add_driver(&self, driver: Arc) -> Result { @@ -81,10 +81,10 @@ impl Hub { Ok(res) } - pub async fn stats(&self) -> Result> { + pub async fn drivers_stats(&self) -> Result> { let (response_tx, response_rx) = oneshot::channel(); self.sender - .send(HubCommand::GetStats { + .send(HubCommand::GetDriversStats { response: response_tx, }) .await?; diff --git a/src/hub/protocol.rs b/src/hub/protocol.rs index b77ab8da..16118416 100644 --- a/src/hub/protocol.rs +++ b/src/hub/protocol.rs @@ -6,7 +6,7 @@ use tokio::sync::{broadcast, oneshot}; use crate::{ drivers::{Driver, DriverInfo}, protocol::Protocol, - stats::driver::DriverStatsInfo, + stats::driver::AccumulatedDriverStats, }; pub enum HubCommand { @@ -24,8 +24,8 @@ pub enum HubCommand { GetSender { response: oneshot::Sender>>, }, - GetStats { - response: oneshot::Sender>, + GetDriversStats { + response: oneshot::Sender>, }, ResetAllStats { response: oneshot::Sender>,