diff --git a/src/drivers/fake.rs b/src/drivers/fake.rs index 7e744eeb..cf879d31 100644 --- a/src/drivers/fake.rs +++ b/src/drivers/fake.rs @@ -54,19 +54,9 @@ impl Driver for FakeSink { let mut hub_receiver = hub_sender.subscribe(); while let Ok(message) = hub_receiver.recv().await { - self.stats - .write() - .await - .update_input(Arc::clone(&message)) - .await; - - self.stats - .write() - .await - .update_input(Arc::clone(&message)) - .await; - - for future in self.on_message_input.call_all(Arc::clone(&message)) { + self.stats.write().await.update_input(message.clone()).await; + + for future in self.on_message_input.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_input callback returned error: {error:?}"); continue; @@ -219,10 +209,10 @@ impl Driver for FakeSource { self.stats .write() .await - .update_output(Arc::clone(&message)) + .update_output(message.clone()) .await; - for future in self.on_message_output.call_all(Arc::clone(&message)) { + for future in self.on_message_output.call_all(message.clone()) { if let Err(error) = future.await { debug!( "Dropping message: on_message_input callback returned error: {error:?}" diff --git a/src/drivers/mod.rs b/src/drivers/mod.rs index ee0fb645..a0968d7c 100644 --- a/src/drivers/mod.rs +++ b/src/drivers/mod.rs @@ -279,13 +279,9 @@ mod tests { let mut hub_receiver = hub_sender.subscribe(); while let Ok(message) = hub_receiver.recv().await { - self.stats - .write() - .await - .update_input(Arc::clone(&message)) - .await; + self.stats.write().await.update_input(message.clone()).await; - for future in self.on_message_input.call_all(Arc::clone(&message)) { + for future in self.on_message_input.call_all(message.clone()) { if let Err(error) = future.await { debug!( "Dropping message: on_message_input callback returned error: {error:?}" diff --git a/src/drivers/serial/mod.rs b/src/drivers/serial/mod.rs index 4c1a6546..a0e9a66f 100644 --- a/src/drivers/serial/mod.rs +++ b/src/drivers/serial/mod.rs @@ -76,7 +76,7 @@ impl Serial { read_all_messages("serial", &mut buf, |message| async { let message = Arc::new(message); - for future in on_message_input.call_all(Arc::clone(&message)) { + for future in on_message_input.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_input callback returned error: {error:?}"); continue; @@ -114,7 +114,7 @@ impl Serial { loop { match hub_receiver.recv().await { Ok(message) => { - for future in on_message_output.call_all(Arc::clone(&message)) { + for future in on_message_output.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_output callback returned error: {error:?}"); continue; diff --git a/src/drivers/tcp/client.rs b/src/drivers/tcp/client.rs index 152dcbd8..2b306f21 100644 --- a/src/drivers/tcp/client.rs +++ b/src/drivers/tcp/client.rs @@ -82,7 +82,7 @@ impl Driver for TcpClient { let hub_receiver = hub_sender.subscribe(); tokio::select! { - result = tcp_receive_task(read, server_addr, Arc::clone(&hub_sender), &self.on_message_input, &self.stats) => { + result = tcp_receive_task(read, server_addr, hub_sender.clone(), &self.on_message_input, &self.stats) => { if let Err(e) = result { error!("Error in TCP receive task: {e:?}"); } diff --git a/src/drivers/tcp/mod.rs b/src/drivers/tcp/mod.rs index 0c546cba..18eccd49 100644 --- a/src/drivers/tcp/mod.rs +++ b/src/drivers/tcp/mod.rs @@ -40,9 +40,9 @@ async fn tcp_receive_task( read_all_messages(remote_addr, &mut buf, |message| async { let message = Arc::new(message); - stats.write().await.update_input(Arc::clone(&message)).await; + stats.write().await.update_input(message.clone()).await; - for future in on_message_input.call_all(Arc::clone(&message)) { + for future in on_message_input.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_input callback returned error: {error:?}"); continue; @@ -86,13 +86,9 @@ async fn tcp_send_task( continue; // Don't do loopback } - stats - .write() - .await - .update_output(Arc::clone(&message)) - .await; + stats.write().await.update_output(message.clone()).await; - for future in on_message_output.call_all(Arc::clone(&message)) { + for future in on_message_output.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_output callback returned error: {error:?}"); continue; diff --git a/src/drivers/tcp/server.rs b/src/drivers/tcp/server.rs index 808c9ce0..6ba1d5ba 100644 --- a/src/drivers/tcp/server.rs +++ b/src/drivers/tcp/server.rs @@ -106,7 +106,7 @@ impl Driver for TcpServer { match listener.accept().await { Ok((socket, remote_addr)) => { let remote_addr = remote_addr.to_string(); - let hub_sender = Arc::clone(&hub_sender); + let hub_sender = hub_sender.clone(); tokio::spawn(TcpServer::handle_client( socket, diff --git a/src/drivers/tlog/reader.rs b/src/drivers/tlog/reader.rs index 21a41678..7db8e73d 100644 --- a/src/drivers/tlog/reader.rs +++ b/src/drivers/tlog/reader.rs @@ -104,13 +104,9 @@ impl TlogReader { let message = Arc::new(message); - self.stats - .write() - .await - .update_input(Arc::clone(&message)) - .await; + self.stats.write().await.update_input(message.clone()).await; - for future in self.on_message_input.call_all(Arc::clone(&message)) { + for future in self.on_message_input.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_input callback returned error: {error:?}"); continue; diff --git a/src/drivers/tlog/writer.rs b/src/drivers/tlog/writer.rs index c7627bbe..466a7569 100644 --- a/src/drivers/tlog/writer.rs +++ b/src/drivers/tlog/writer.rs @@ -62,10 +62,10 @@ impl TlogWriter { self.stats .write() .await - .update_output(Arc::clone(&message)) + .update_output(message.clone()) .await; - for future in self.on_message_output.call_all(Arc::clone(&message)) { + for future in self.on_message_output.call_all(message.clone()) { if let Err(error) = future.await { debug!( "Dropping message: on_message_input callback returned error: {error:?}" diff --git a/src/drivers/udp/client.rs b/src/drivers/udp/client.rs index cd8e9a26..9db7d614 100644 --- a/src/drivers/udp/client.rs +++ b/src/drivers/udp/client.rs @@ -75,10 +75,10 @@ impl UdpClient { self.stats .write() .await - .update_input(Arc::clone(&message)) + .update_input(message.clone()) .await; - for future in self.on_message_input.call_all(Arc::clone(&message)) { + for future in self.on_message_input.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_input callback returned error: {error:?}"); continue; @@ -122,10 +122,10 @@ impl UdpClient { self.stats .write() .await - .update_output(Arc::clone(&message)) + .update_output(message.clone()) .await; - for future in self.on_message_output.call_all(Arc::clone(&message)) { + for future in self.on_message_output.call_all(message.clone()) { if let Err(error) = future.await { debug!( "Dropping message: on_message_output callback returned error: {error:?}" diff --git a/src/drivers/udp/server.rs b/src/drivers/udp/server.rs index b8572804..76bbda05 100644 --- a/src/drivers/udp/server.rs +++ b/src/drivers/udp/server.rs @@ -78,10 +78,10 @@ impl UdpServer { self.stats .write() .await - .update_input(Arc::clone(&message)) + .update_input(message.clone()) .await; - for future in self.on_message_input.call_all(Arc::clone(&message)) { + for future in self.on_message_input.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_input callback returned error: {error:?}"); continue; @@ -141,10 +141,10 @@ impl UdpServer { self.stats .write() .await - .update_output(Arc::clone(&message)) + .update_output(message.clone()) .await; - for future in self.on_message_output.call_all(Arc::clone(&message)) { + for future in self.on_message_output.call_all(message.clone()) { if let Err(error) = future.await { debug!("Dropping message: on_message_output callback returned error: {error:?}"); continue; diff --git a/src/stats/actor.rs b/src/stats/actor.rs index ece557ab..697c6dcf 100644 --- a/src/stats/actor.rs +++ b/src/stats/actor.rs @@ -29,10 +29,10 @@ impl StatsActor { pub async fn start(mut self, mut receiver: mpsc::Receiver) { let drivers_stats_task = tokio::spawn({ let hub = self.hub.clone(); - let update_period = Arc::clone(&self.update_period); - let last_accumulated_drivers_stats = Arc::clone(&self.last_accumulated_drivers_stats); - let drivers_stats = Arc::clone(&self.drivers_stats); - let start_time = Arc::clone(&self.start_time); + let update_period = self.update_period.clone(); + let last_accumulated_drivers_stats = self.last_accumulated_drivers_stats.clone(); + let drivers_stats = self.drivers_stats.clone(); + let start_time = self.start_time.clone(); async move { loop { @@ -51,10 +51,10 @@ impl StatsActor { let hub_stats_task = tokio::spawn({ let hub = self.hub.clone(); - let update_period = Arc::clone(&self.update_period); - let last_accumulated_hub_stats = Arc::clone(&self.last_accumulated_hub_stats); - let hub_stats = Arc::clone(&self.hub_stats); - let start_time = Arc::clone(&self.start_time); + let update_period = self.update_period.clone(); + let last_accumulated_hub_stats = self.last_accumulated_hub_stats.clone(); + let hub_stats = self.hub_stats.clone(); + let start_time = self.start_time.clone(); async move { loop {