diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index f56ef8304186..a5146c3e62a3 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -19,6 +19,16 @@ pub enum DBOperation { Delete, } +impl std::fmt::Display for DBOperation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DBOperation::Insert { .. } => f.write_str("Insert"), + DBOperation::Update { .. } => f.write_str("Update"), + DBOperation::Delete => f.write_str("Delete"), + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct TypedSql { #[serde(flatten)] @@ -31,6 +41,8 @@ impl TypedSql { request_id: String, global_id: String, ) -> crate::StorageResult> { + let pushed_at = common_utils::date_time::now_unix_timestamp(); + Ok(vec![ ( "typed_sql", @@ -40,6 +52,7 @@ impl TypedSql { ), ("global_id", global_id), ("request_id", request_id), + ("pushed_at", pushed_at.to_string()), ]) } } diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 94a29e3b0a04..7b77873a648e 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -199,6 +199,7 @@ async fn drainer( .get("request_id") .map_or(String::new(), Clone::clone); let global_id = entry.1.get("global_id").map_or(String::new(), Clone::clone); + let pushed_at = entry.1.get("pushed_at"); tracing::Span::current().record("request_id", request_id); tracing::Span::current().record("global_id", global_id); @@ -261,6 +262,7 @@ async fn drainer( value: insert_op.into(), }], ); + utils::push_drainer_delay(pushed_at, insert_op.to_string()); } kv::DBOperation::Update { updatable } => { let (_, execution_time) = common_utils::date_time::time_it(|| async { @@ -302,6 +304,7 @@ async fn drainer( value: update_op.into(), }], ); + utils::push_drainer_delay(pushed_at, update_op.to_string()); } kv::DBOperation::Delete => { // [#224]: Implement this diff --git a/crates/drainer/src/metrics.rs b/crates/drainer/src/metrics.rs index 77f3d5e7db1d..52fe1d216734 100644 --- a/crates/drainer/src/metrics.rs +++ b/crates/drainer/src/metrics.rs @@ -1,5 +1,7 @@ pub use router_env::opentelemetry::KeyValue; -use router_env::{counter_metric, global_meter, histogram_metric, metrics_context}; +use router_env::{ + counter_metric, global_meter, histogram_metric, histogram_metric_i64, metrics_context, +}; metrics_context!(CONTEXT); global_meter!(DRAINER_METER, "DRAINER"); @@ -18,3 +20,4 @@ histogram_metric!(QUERY_EXECUTION_TIME, DRAINER_METER); // Time in (ms) millisec histogram_metric!(REDIS_STREAM_READ_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric!(REDIS_STREAM_TRIM_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric!(CLEANUP_TIME, DRAINER_METER); // Time in (ms) milliseconds +histogram_metric_i64!(DRAINER_DELAY_MS, DRAINER_METER); // Time in (ms) milliseconds diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 2bd9f092f12c..04c7c3dc349c 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -128,6 +128,23 @@ pub fn parse_stream_entries<'a>( .into_report() } +pub fn push_drainer_delay(pushed_at: Option<&String>, operation: String) { + if let Some(pushed_at) = pushed_at { + if let Ok(time) = pushed_at.parse::() { + let drained_at = common_utils::date_time::now_unix_timestamp(); + let delay_ms = (drained_at - time) * 1000; + metrics::DRAINER_DELAY_MS.record( + &metrics::CONTEXT, + delay_ms, + &[metrics::KeyValue { + key: "operation".into(), + value: operation.into(), + }], + ); + } + } +} + // Here the output is in the format (stream_index, jobs_picked), // similar to the first argument of the function pub async fn increment_stream_index( diff --git a/crates/router_env/src/metrics.rs b/crates/router_env/src/metrics.rs index 14402a7a6e91..961e0d362205 100644 --- a/crates/router_env/src/metrics.rs +++ b/crates/router_env/src/metrics.rs @@ -82,3 +82,22 @@ macro_rules! histogram_metric_u64 { > = once_cell::sync::Lazy::new(|| $meter.u64_histogram($description).init()); }; } + +/// Create a [`Histogram`][Histogram] i64 metric with the specified name and an optional description, +/// associated with the specified meter. Note that the meter must be to a valid [`Meter`][Meter]. +/// +/// [Histogram]: opentelemetry::metrics::Histogram +/// [Meter]: opentelemetry::metrics::Meter +#[macro_export] +macro_rules! histogram_metric_i64 { + ($name:ident, $meter:ident) => { + pub(crate) static $name: once_cell::sync::Lazy< + $crate::opentelemetry::metrics::Histogram, + > = once_cell::sync::Lazy::new(|| $meter.i64_histogram(stringify!($name)).init()); + }; + ($name:ident, $meter:ident, $description:literal) => { + pub(crate) static $name: once_cell::sync::Lazy< + $crate::opentelemetry::metrics::Histogram, + > = once_cell::sync::Lazy::new(|| $meter.i64_histogram($description).init()); + }; +}