Skip to content

Commit

Permalink
feat(metrics): add drainer delay metric (#3034)
Browse files Browse the repository at this point in the history
  • Loading branch information
dracarys18 authored Dec 6, 2023
1 parent cfafd5c commit c6e2ee2
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 1 deletion.
3 changes: 3 additions & 0 deletions crates/diesel_models/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl TypedSql {
request_id: String,
global_id: String,
) -> crate::StorageResult<Vec<(&str, String)>> {
let pushed_at = common_utils::date_time::now_unix_timestamp();

Ok(vec![
(
"typed_sql",
Expand All @@ -40,6 +42,7 @@ impl TypedSql {
),
("global_id", global_id),
("request_id", request_id),
("pushed_at", pushed_at.to_string()),
])
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/drainer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ async fn drainer(
.get("request_id")
.map_or(String::new(), Clone::clone);
let global_id = entry.1.get("global_id").map_or(String::new(), Clone::clone);
let pushed_at = entry.1.get("pushed_at");

tracing::Span::current().record("request_id", request_id);
tracing::Span::current().record("global_id", global_id);
Expand Down Expand Up @@ -261,6 +262,7 @@ async fn drainer(
value: insert_op.into(),
}],
);
utils::push_drainer_delay(pushed_at, insert_op.to_string());
}
kv::DBOperation::Update { updatable } => {
let (_, execution_time) = common_utils::date_time::time_it(|| async {
Expand Down Expand Up @@ -302,6 +304,7 @@ async fn drainer(
value: update_op.into(),
}],
);
utils::push_drainer_delay(pushed_at, update_op.to_string());
}
kv::DBOperation::Delete => {
// [#224]: Implement this
Expand Down
5 changes: 4 additions & 1 deletion crates/drainer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub use router_env::opentelemetry::KeyValue;
use router_env::{counter_metric, global_meter, histogram_metric, metrics_context};
use router_env::{
counter_metric, global_meter, histogram_metric, histogram_metric_i64, metrics_context,
};

metrics_context!(CONTEXT);
global_meter!(DRAINER_METER, "DRAINER");
Expand All @@ -18,3 +20,4 @@ histogram_metric!(QUERY_EXECUTION_TIME, DRAINER_METER); // Time in (ms) millisec
histogram_metric!(REDIS_STREAM_READ_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric!(REDIS_STREAM_TRIM_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric!(CLEANUP_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric_i64!(DRAINER_DELAY_SECONDS, DRAINER_METER); // Time in (s) seconds
19 changes: 19 additions & 0 deletions crates/drainer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,25 @@ pub fn parse_stream_entries<'a>(
.into_report()
}

pub fn push_drainer_delay(pushed_at: Option<&String>, operation: String) {
if let Some(pushed_at) = pushed_at {
if let Ok(time) = pushed_at.parse::<i64>() {
let drained_at = common_utils::date_time::now_unix_timestamp();
let delay = drained_at - time;

logger::debug!(operation = operation, delay = delay);
metrics::DRAINER_DELAY_SECONDS.record(
&metrics::CONTEXT,
delay,
&[metrics::KeyValue {
key: "operation".into(),
value: operation.into(),
}],
);
}
}
}

// Here the output is in the format (stream_index, jobs_picked),
// similar to the first argument of the function
pub async fn increment_stream_index(
Expand Down
19 changes: 19 additions & 0 deletions crates/router_env/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,22 @@ macro_rules! histogram_metric_u64 {
> = once_cell::sync::Lazy::new(|| $meter.u64_histogram($description).init());
};
}

/// Create a [`Histogram`][Histogram] i64 metric with the specified name and an optional description,
/// associated with the specified meter. Note that the meter must be to a valid [`Meter`][Meter].
///
/// [Histogram]: opentelemetry::metrics::Histogram
/// [Meter]: opentelemetry::metrics::Meter
#[macro_export]
macro_rules! histogram_metric_i64 {
($name:ident, $meter:ident) => {
pub(crate) static $name: once_cell::sync::Lazy<
$crate::opentelemetry::metrics::Histogram<i64>,
> = once_cell::sync::Lazy::new(|| $meter.i64_histogram(stringify!($name)).init());
};
($name:ident, $meter:ident, $description:literal) => {
pub(crate) static $name: once_cell::sync::Lazy<
$crate::opentelemetry::metrics::Histogram<i64>,
> = once_cell::sync::Lazy::new(|| $meter.i64_histogram($description).init());
};
}

0 comments on commit c6e2ee2

Please sign in to comment.