diff --git a/windsock/src/report.rs b/windsock/src/report.rs index 4dbd6cb..105dee4 100644 --- a/windsock/src/report.rs +++ b/windsock/src/report.rs @@ -36,8 +36,12 @@ pub enum Report { completed_in: Duration, message: String, }, - /// Indicates a pubsub consume response comes back from the service. - ConsumeCompleted, + + /// Indicates a pubsub consume response came back from the service. + /// The Duration should be the time between the initial produce request being created on a client and the response being consumed on a client. + /// It is suggested that a timestamp be encoded in the payload of the produce to achieve this. + /// For payload sizes where a timestamp can not fit set the Duration to None. + ConsumeCompletedIn(Option), /// Indicates pubsub consume error response came back from the service. ConsumeErrored { message: String }, @@ -150,7 +154,7 @@ pub struct OperationsReport { pub requested_operations_per_second: Option, pub total_operations_per_second: u32, pub total_errors_per_second: u32, - pub mean_time: Duration, + pub mean_time: Option, pub time_percentiles: Percentiles, pub total_each_second: Vec, } @@ -166,9 +170,11 @@ pub struct PubSubReport { pub requested_produce_per_second: Option, pub produce_per_second: u32, pub produce_errors_per_second: u32, + pub consume_mean_time: Option, + pub consume_time_percentiles: Percentiles, pub consume_per_second: u32, pub consume_errors_per_second: u32, - pub produce_mean_time: Duration, + pub produce_mean_time: Option, pub produce_time_percentiles: Percentiles, pub produce_each_second: Vec, pub consume_each_second: Vec, @@ -369,8 +375,10 @@ pub(crate) async fn report_builder( let mut operations_report = None; let mut operation_times = vec![]; let mut produce_times = vec![]; + let mut consume_times = vec![]; let mut total_operation_time = Duration::from_secs(0); let mut total_produce_time = Duration::from_secs(0); + let mut total_consume_time = Duration::from_secs(0); let mut error_messages = vec![]; let mut info_messages = vec![]; @@ -427,11 +435,15 @@ pub(crate) async fn report_builder( total_produce_time += completed_in; } } - Report::ConsumeCompleted => { + Report::ConsumeCompletedIn(duration) => { let report = pubsub_report.get_or_insert_with(PubSubReport::default); if started.is_some() { report.total_backlog -= 1; report.total_consume += 1; + if let Some(duration) = duration { + total_consume_time += duration; + consume_times.push(duration); + } match report.consume_each_second.last_mut() { Some(last) => *last += 1, None => report.consume_each_second.push(0), @@ -503,6 +515,7 @@ pub(crate) async fn report_builder( if let Some(report) = pubsub_report.as_mut() { report.requested_produce_per_second = requested_ops; report.produce_mean_time = mean_time(&produce_times, total_produce_time); + report.consume_mean_time = mean_time(&consume_times, total_consume_time); report.produce_per_second = calculate_ops(report.total_produce, finished_in); report.produce_errors_per_second = calculate_ops(report.total_produce_error, finished_in); @@ -510,6 +523,7 @@ pub(crate) async fn report_builder( report.consume_errors_per_second = calculate_ops(report.total_consume_error, finished_in); report.produce_time_percentiles = calculate_percentiles(produce_times); + report.consume_time_percentiles = calculate_percentiles(consume_times); // This is not a complete result so discard it. report.produce_each_second.pop(); @@ -531,11 +545,11 @@ pub(crate) async fn report_builder( archive } -fn mean_time(times: &[Duration], total_time: Duration) -> Duration { +fn mean_time(times: &[Duration], total_time: Duration) -> Option { if !times.is_empty() { - total_time / times.len() as u32 + Some(total_time / times.len() as u32) } else { - Duration::from_secs(0) + None } } diff --git a/windsock/src/tables.rs b/windsock/src/tables.rs index dc7ab42..b6b858a 100644 --- a/windsock/src/tables.rs +++ b/windsock/src/tables.rs @@ -249,8 +249,11 @@ fn base(reports: &[ReportColumn], table_type: &str) { rows.push(Row::measurements(reports, "Opn Time Mean", |report| { report.operations_report.as_ref().map(|report| { ( - report.mean_time.as_secs_f64(), - duration_ms(report.mean_time), + report + .mean_time + .map(|x| x.as_secs_f64()) + .unwrap_or_default(), + duration_ms_opt(report.mean_time), Goal::SmallerIsBetter, ) }) @@ -416,8 +419,11 @@ fn base(reports: &[ReportColumn], table_type: &str) { rows.push(Row::measurements(reports, "Produce Time Mean", |report| { report.pubsub_report.as_ref().map(|report| { ( - report.produce_mean_time.as_secs_f64(), - duration_ms(report.produce_mean_time), + report + .produce_mean_time + .map(|x| x.as_secs_f64()) + .unwrap_or_default(), + duration_ms_opt(report.produce_mean_time), Goal::SmallerIsBetter, ) }) @@ -459,6 +465,32 @@ fn base(reports: &[ReportColumn], table_type: &str) { })); } + rows.push(Row::measurements(reports, "Consume Time Mean", |report| { + report.pubsub_report.as_ref().map(|report| { + ( + report + .consume_mean_time + .map(|x| x.as_secs_f64()) + .unwrap_or_default(), + duration_ms_opt(report.consume_mean_time), + Goal::SmallerIsBetter, + ) + }) + })); + + rows.push(Row::Heading("Consume Time Percentiles".to_owned())); + for (i, p) in Percentile::iter().enumerate() { + rows.push(Row::measurements(reports, p.name(), |report| { + report.pubsub_report.as_ref().map(|report| { + ( + report.consume_time_percentiles[i].as_secs_f64(), + duration_ms(report.consume_time_percentiles[i]), + Goal::SmallerIsBetter, + ) + }) + })); + } + rows.push(Row::Heading("Consume Each Second".to_owned())); for i in 0..reports .iter() @@ -826,6 +858,13 @@ fn duration_ms(duration: Duration) -> String { format!("{:.3}ms", duration.as_micros() as f32 / 1000.0) } +fn duration_ms_opt(duration: Option) -> String { + match duration { + Some(duration) => format!("{:.3}ms", duration.as_micros() as f32 / 1000.0), + None => "N/A".to_owned(), + } +} + enum Row { Heading(String), ColumnNames {