From 222fb943e3ba39b28d56b810c4e2dc3d453ffe71 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Fri, 1 Dec 2023 18:11:19 +0530 Subject: [PATCH 1/3] feat(metrics): add drainer delay metric --- crates/diesel_models/src/kv.rs | 13 +++++++++++++ crates/drainer/src/lib.rs | 3 +++ crates/drainer/src/metrics.rs | 5 ++++- crates/drainer/src/utils.rs | 17 +++++++++++++++++ crates/router_env/src/metrics.rs | 19 +++++++++++++++++++ 5 files changed, 56 insertions(+), 1 deletion(-) diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index f56ef8304186..a5146c3e62a3 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -19,6 +19,16 @@ pub enum DBOperation { Delete, } +impl std::fmt::Display for DBOperation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DBOperation::Insert { .. } => f.write_str("Insert"), + DBOperation::Update { .. } => f.write_str("Update"), + DBOperation::Delete => f.write_str("Delete"), + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct TypedSql { #[serde(flatten)] @@ -31,6 +41,8 @@ impl TypedSql { request_id: String, global_id: String, ) -> crate::StorageResult> { + let pushed_at = common_utils::date_time::now_unix_timestamp(); + Ok(vec![ ( "typed_sql", @@ -40,6 +52,7 @@ impl TypedSql { ), ("global_id", global_id), ("request_id", request_id), + ("pushed_at", pushed_at.to_string()), ]) } } diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 94a29e3b0a04..7b77873a648e 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -199,6 +199,7 @@ async fn drainer( .get("request_id") .map_or(String::new(), Clone::clone); let global_id = entry.1.get("global_id").map_or(String::new(), Clone::clone); + let pushed_at = entry.1.get("pushed_at"); tracing::Span::current().record("request_id", request_id); tracing::Span::current().record("global_id", global_id); @@ -261,6 +262,7 @@ async fn drainer( value: insert_op.into(), }], ); + utils::push_drainer_delay(pushed_at, insert_op.to_string()); } kv::DBOperation::Update { updatable } => { let (_, execution_time) = common_utils::date_time::time_it(|| async { @@ -302,6 +304,7 @@ async fn drainer( value: update_op.into(), }], ); + utils::push_drainer_delay(pushed_at, update_op.to_string()); } kv::DBOperation::Delete => { // [#224]: Implement this diff --git a/crates/drainer/src/metrics.rs b/crates/drainer/src/metrics.rs index 77f3d5e7db1d..52fe1d216734 100644 --- a/crates/drainer/src/metrics.rs +++ b/crates/drainer/src/metrics.rs @@ -1,5 +1,7 @@ pub use router_env::opentelemetry::KeyValue; -use router_env::{counter_metric, global_meter, histogram_metric, metrics_context}; +use router_env::{ + counter_metric, global_meter, histogram_metric, histogram_metric_i64, metrics_context, +}; metrics_context!(CONTEXT); global_meter!(DRAINER_METER, "DRAINER"); @@ -18,3 +20,4 @@ histogram_metric!(QUERY_EXECUTION_TIME, DRAINER_METER); // Time in (ms) millisec histogram_metric!(REDIS_STREAM_READ_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric!(REDIS_STREAM_TRIM_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric!(CLEANUP_TIME, DRAINER_METER); // Time in (ms) milliseconds +histogram_metric_i64!(DRAINER_DELAY_MS, DRAINER_METER); // Time in (ms) milliseconds diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 2bd9f092f12c..04c7c3dc349c 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -128,6 +128,23 @@ pub fn parse_stream_entries<'a>( .into_report() } +pub fn push_drainer_delay(pushed_at: Option<&String>, operation: String) { + if let Some(pushed_at) = pushed_at { + if let Ok(time) = pushed_at.parse::() { + let drained_at = common_utils::date_time::now_unix_timestamp(); + let delay_ms = (drained_at - time) * 1000; + metrics::DRAINER_DELAY_MS.record( + &metrics::CONTEXT, + delay_ms, + &[metrics::KeyValue { + key: "operation".into(), + value: operation.into(), + }], + ); + } + } +} + // Here the output is in the format (stream_index, jobs_picked), // similar to the first argument of the function pub async fn increment_stream_index( diff --git a/crates/router_env/src/metrics.rs b/crates/router_env/src/metrics.rs index 14402a7a6e91..961e0d362205 100644 --- a/crates/router_env/src/metrics.rs +++ b/crates/router_env/src/metrics.rs @@ -82,3 +82,22 @@ macro_rules! histogram_metric_u64 { > = once_cell::sync::Lazy::new(|| $meter.u64_histogram($description).init()); }; } + +/// Create a [`Histogram`][Histogram] i64 metric with the specified name and an optional description, +/// associated with the specified meter. Note that the meter must be to a valid [`Meter`][Meter]. +/// +/// [Histogram]: opentelemetry::metrics::Histogram +/// [Meter]: opentelemetry::metrics::Meter +#[macro_export] +macro_rules! histogram_metric_i64 { + ($name:ident, $meter:ident) => { + pub(crate) static $name: once_cell::sync::Lazy< + $crate::opentelemetry::metrics::Histogram, + > = once_cell::sync::Lazy::new(|| $meter.i64_histogram(stringify!($name)).init()); + }; + ($name:ident, $meter:ident, $description:literal) => { + pub(crate) static $name: once_cell::sync::Lazy< + $crate::opentelemetry::metrics::Histogram, + > = once_cell::sync::Lazy::new(|| $meter.i64_histogram($description).init()); + }; +} From c8e16d7f7db3941ac74587a68bf89532e1186b79 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Fri, 1 Dec 2023 18:12:59 +0530 Subject: [PATCH 2/3] fix: remove unnecessary display impl --- crates/diesel_models/src/kv.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index a5146c3e62a3..dd12a916c90f 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -19,16 +19,6 @@ pub enum DBOperation { Delete, } -impl std::fmt::Display for DBOperation { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - DBOperation::Insert { .. } => f.write_str("Insert"), - DBOperation::Update { .. } => f.write_str("Update"), - DBOperation::Delete => f.write_str("Delete"), - } - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct TypedSql { #[serde(flatten)] From 11218453016121a38cfd0c0dfd265baeb5ae9df6 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Fri, 1 Dec 2023 19:01:10 +0530 Subject: [PATCH 3/3] chore: change unit to seconds --- crates/drainer/src/metrics.rs | 2 +- crates/drainer/src/utils.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/drainer/src/metrics.rs b/crates/drainer/src/metrics.rs index 52fe1d216734..06e9119787d5 100644 --- a/crates/drainer/src/metrics.rs +++ b/crates/drainer/src/metrics.rs @@ -20,4 +20,4 @@ histogram_metric!(QUERY_EXECUTION_TIME, DRAINER_METER); // Time in (ms) millisec histogram_metric!(REDIS_STREAM_READ_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric!(REDIS_STREAM_TRIM_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric!(CLEANUP_TIME, DRAINER_METER); // Time in (ms) milliseconds -histogram_metric_i64!(DRAINER_DELAY_MS, DRAINER_METER); // Time in (ms) milliseconds +histogram_metric_i64!(DRAINER_DELAY_SECONDS, DRAINER_METER); // Time in (s) seconds diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 04c7c3dc349c..5d3bd241d4df 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -132,10 +132,12 @@ pub fn push_drainer_delay(pushed_at: Option<&String>, operation: String) { if let Some(pushed_at) = pushed_at { if let Ok(time) = pushed_at.parse::() { let drained_at = common_utils::date_time::now_unix_timestamp(); - let delay_ms = (drained_at - time) * 1000; - metrics::DRAINER_DELAY_MS.record( + let delay = drained_at - time; + + logger::debug!(operation = operation, delay = delay); + metrics::DRAINER_DELAY_SECONDS.record( &metrics::CONTEXT, - delay_ms, + delay, &[metrics::KeyValue { key: "operation".into(), value: operation.into(),