Skip to content

Commit

Permalink
ConsumeCompleted -> ConsumeCompletedIn
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 18, 2024
1 parent cb5527c commit 6e089c4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 12 deletions.
30 changes: 22 additions & 8 deletions windsock/src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ pub enum Report {
completed_in: Duration,
message: String,
},
/// Indicates a pubsub consume response comes back from the service.
ConsumeCompleted,

/// Indicates a pubsub consume response came back from the service.
/// The Duration should be the time between the initial produce request being created on a client and the response being consumed on a client.
/// It is suggested that a timestamp be encoded in the payload of the produce to achieve this.
/// For payload sizes where a timestamp can not fit set the Duration to None.
ConsumeCompletedIn(Option<Duration>),

/// Indicates pubsub consume error response came back from the service.
ConsumeErrored { message: String },
Expand Down Expand Up @@ -150,7 +154,7 @@ pub struct OperationsReport {
pub requested_operations_per_second: Option<u64>,
pub total_operations_per_second: u32,
pub total_errors_per_second: u32,
pub mean_time: Duration,
pub mean_time: Option<Duration>,
pub time_percentiles: Percentiles,
pub total_each_second: Vec<u64>,
}
Expand All @@ -166,9 +170,11 @@ pub struct PubSubReport {
pub requested_produce_per_second: Option<u64>,
pub produce_per_second: u32,
pub produce_errors_per_second: u32,
pub consume_mean_time: Option<Duration>,
pub consume_time_percentiles: Percentiles,
pub consume_per_second: u32,
pub consume_errors_per_second: u32,
pub produce_mean_time: Duration,
pub produce_mean_time: Option<Duration>,
pub produce_time_percentiles: Percentiles,
pub produce_each_second: Vec<u64>,
pub consume_each_second: Vec<u64>,
Expand Down Expand Up @@ -369,8 +375,10 @@ pub(crate) async fn report_builder(
let mut operations_report = None;
let mut operation_times = vec![];
let mut produce_times = vec![];
let mut consume_times = vec![];
let mut total_operation_time = Duration::from_secs(0);
let mut total_produce_time = Duration::from_secs(0);
let mut total_consume_time = Duration::from_secs(0);
let mut error_messages = vec![];
let mut info_messages = vec![];

Expand Down Expand Up @@ -427,11 +435,15 @@ pub(crate) async fn report_builder(
total_produce_time += completed_in;
}
}
Report::ConsumeCompleted => {
Report::ConsumeCompletedIn(duration) => {
let report = pubsub_report.get_or_insert_with(PubSubReport::default);
if started.is_some() {
report.total_backlog -= 1;
report.total_consume += 1;
if let Some(duration) = duration {
total_consume_time += duration;
consume_times.push(duration);
}
match report.consume_each_second.last_mut() {
Some(last) => *last += 1,
None => report.consume_each_second.push(0),
Expand Down Expand Up @@ -503,13 +515,15 @@ pub(crate) async fn report_builder(
if let Some(report) = pubsub_report.as_mut() {
report.requested_produce_per_second = requested_ops;
report.produce_mean_time = mean_time(&produce_times, total_produce_time);
report.consume_mean_time = mean_time(&consume_times, total_consume_time);
report.produce_per_second = calculate_ops(report.total_produce, finished_in);
report.produce_errors_per_second =
calculate_ops(report.total_produce_error, finished_in);
report.consume_per_second = calculate_ops(report.total_consume, finished_in);
report.consume_errors_per_second =
calculate_ops(report.total_consume_error, finished_in);
report.produce_time_percentiles = calculate_percentiles(produce_times);
report.consume_time_percentiles = calculate_percentiles(consume_times);

// This is not a complete result so discard it.
report.produce_each_second.pop();
Expand All @@ -531,11 +545,11 @@ pub(crate) async fn report_builder(
archive
}

fn mean_time(times: &[Duration], total_time: Duration) -> Duration {
fn mean_time(times: &[Duration], total_time: Duration) -> Option<Duration> {
if !times.is_empty() {
total_time / times.len() as u32
Some(total_time / times.len() as u32)
} else {
Duration::from_secs(0)
None
}
}

Expand Down
47 changes: 43 additions & 4 deletions windsock/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,11 @@ fn base(reports: &[ReportColumn], table_type: &str) {
rows.push(Row::measurements(reports, "Opn Time Mean", |report| {
report.operations_report.as_ref().map(|report| {
(
report.mean_time.as_secs_f64(),
duration_ms(report.mean_time),
report
.mean_time
.map(|x| x.as_secs_f64())
.unwrap_or_default(),
duration_ms_opt(report.mean_time),
Goal::SmallerIsBetter,
)
})
Expand Down Expand Up @@ -416,8 +419,11 @@ fn base(reports: &[ReportColumn], table_type: &str) {
rows.push(Row::measurements(reports, "Produce Time Mean", |report| {
report.pubsub_report.as_ref().map(|report| {
(
report.produce_mean_time.as_secs_f64(),
duration_ms(report.produce_mean_time),
report
.produce_mean_time
.map(|x| x.as_secs_f64())
.unwrap_or_default(),
duration_ms_opt(report.produce_mean_time),
Goal::SmallerIsBetter,
)
})
Expand Down Expand Up @@ -459,6 +465,32 @@ fn base(reports: &[ReportColumn], table_type: &str) {
}));
}

rows.push(Row::measurements(reports, "Consume Time Mean", |report| {
report.pubsub_report.as_ref().map(|report| {
(
report
.consume_mean_time
.map(|x| x.as_secs_f64())
.unwrap_or_default(),
duration_ms_opt(report.consume_mean_time),
Goal::SmallerIsBetter,
)
})
}));

rows.push(Row::Heading("Consume Time Percentiles".to_owned()));
for (i, p) in Percentile::iter().enumerate() {
rows.push(Row::measurements(reports, p.name(), |report| {
report.pubsub_report.as_ref().map(|report| {
(
report.consume_time_percentiles[i].as_secs_f64(),
duration_ms(report.consume_time_percentiles[i]),
Goal::SmallerIsBetter,
)
})
}));
}

rows.push(Row::Heading("Consume Each Second".to_owned()));
for i in 0..reports
.iter()
Expand Down Expand Up @@ -826,6 +858,13 @@ fn duration_ms(duration: Duration) -> String {
format!("{:.3}ms", duration.as_micros() as f32 / 1000.0)
}

fn duration_ms_opt(duration: Option<Duration>) -> String {
match duration {
Some(duration) => format!("{:.3}ms", duration.as_micros() as f32 / 1000.0),
None => "N/A".to_owned(),
}
}

enum Row {
Heading(String),
ColumnNames {
Expand Down

0 comments on commit 6e089c4

Please sign in to comment.