From 5e94bd6eddae0047b45a139c8ec897eb08bc08b5 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 25 Nov 2024 10:41:40 +0100 Subject: [PATCH 1/2] fix: fix z_sub_thr termination and improve perf Using `Thread::unpark` `std::process::exit` allows session to be closed, and callback final print to be executed. Callback lock accounted for 2.5% of z_sub_thr perf. Using atomic and fine-grained lock remove the performance impact (even if it doesn't really change the global outcome). --- examples/examples/z_sub_thr.rs | 73 ++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/examples/examples/z_sub_thr.rs b/examples/examples/z_sub_thr.rs index 7a5cd54e0..17df4810c 100644 --- a/examples/examples/z_sub_thr.rs +++ b/examples/examples/z_sub_thr.rs @@ -1,3 +1,7 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Mutex, +}; // // Copyright (c) 2023 ZettaScale Technology // @@ -17,51 +21,52 @@ use clap::Parser; use zenoh::{Config, Wait}; use zenoh_examples::CommonArgs; +struct Start { + round: Instant, + global: Option, +} + struct Stats { - round_count: usize, - round_size: usize, - finished_rounds: usize, - round_start: Instant, - global_start: Option, + received: AtomicU64, + round_size: u64, + start: Mutex, } impl Stats { fn new(round_size: usize) -> Self { Stats { - round_count: 0, - round_size, - finished_rounds: 0, - round_start: Instant::now(), - global_start: None, + received: AtomicU64::new(0), + round_size: round_size as u64, + start: Mutex::new(Start { + round: Instant::now(), + global: None, + }), } } - fn increment(&mut self) { - if self.round_count == 0 { - self.round_start = Instant::now(); - if self.global_start.is_none() { - self.global_start = Some(self.round_start) + fn increment(&self) -> usize { + let prev_received = self.received.fetch_add(1, Ordering::Relaxed); + if prev_received % self.round_size == 0 { + let mut start = self.start.lock().unwrap(); + let now = Instant::now(); + if prev_received == 0 { + start.global = Some(now); + } else { + let elapsed = now.duration_since(start.round).as_secs_f64(); + let throughput = (self.round_size as f64) / elapsed; + println!("{throughput} msg/s"); } - self.round_count += 1; - } else if self.round_count < self.round_size { - self.round_count += 1; - } else { - self.print_round(); - self.finished_rounds += 1; - self.round_count = 0; + start.round = now; } - } - fn print_round(&self) { - let elapsed = self.round_start.elapsed().as_secs_f64(); - let throughput = (self.round_size as f64) / elapsed; - println!("{throughput} msg/s"); + (prev_received / self.round_size) as usize } } impl Drop for Stats { fn drop(&mut self) { - let Some(global_start) = self.global_start else { + let start = self.start.get_mut().unwrap(); + let Some(global_start) = start.global else { return; }; let elapsed = global_start.elapsed().as_secs_f64(); - let total = self.round_size * self.finished_rounds + self.round_count; + let total = *self.received.get_mut(); let throughput = total as f64 / elapsed; println!("Received {total} messages over {elapsed:.2}s: {throughput}msg/s"); } @@ -77,13 +82,13 @@ fn main() { let key_expr = "test/thr"; - let mut stats = Stats::new(n); + let stats = Stats::new(n); + let thread = std::thread::current(); session .declare_subscriber(key_expr) - .callback_mut(move |_sample| { - stats.increment(); - if stats.finished_rounds >= m { - std::process::exit(0) + .callback(move |_sample| { + if stats.increment() >= m { + thread.unpark(); } }) .background() From d9144a140eb5012dfe56bc638a485905e8ccc602 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Tue, 26 Nov 2024 10:47:09 +0100 Subject: [PATCH 2/2] Retrigger CI