diff --git a/rust/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json b/rust/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json new file mode 100644 index 0000000000000..d70d4c9d33a43 --- /dev/null +++ b/rust/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM cyclotron_jobs WHERE queue_name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": ["Text"] + }, + "nullable": [null] + }, + "hash": "78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8" +} diff --git a/rust/cyclotron-core/src/janitor.rs b/rust/cyclotron-core/src/janitor.rs index 42e0b0b3acd2d..8fd98307fba67 100644 --- a/rust/cyclotron-core/src/janitor.rs +++ b/rust/cyclotron-core/src/janitor.rs @@ -1,3 +1,4 @@ +use crate::DEAD_LETTER_QUEUE; use chrono::Duration; use sqlx::PgPool; @@ -64,4 +65,16 @@ impl Janitor { pub async fn waiting_jobs(&self) -> Result { count_total_waiting_jobs(&self.pool).await } + + pub async fn count_dlq_depth(&self) -> Result { + let result = sqlx::query_scalar!( + "SELECT COUNT(*) FROM cyclotron_jobs WHERE queue_name = $1", + DEAD_LETTER_QUEUE + ) + .fetch_one(&self.pool) + .await + .map_err(QueueError::from)?; + + Ok(result.unwrap_or(0) as u64) + } } diff --git a/rust/cyclotron-janitor/src/janitor.rs b/rust/cyclotron-janitor/src/janitor.rs index 5e1e8aba1926b..be36c07ec009d 100644 --- a/rust/cyclotron-janitor/src/janitor.rs +++ b/rust/cyclotron-janitor/src/janitor.rs @@ -84,10 +84,16 @@ impl Janitor { } let available = { - let _time = common_metrics::timing_guard(QUEUE_DEPTH, &self.metrics_labels); + let _time = common_metrics::timing_guard(AVAILABLE_DEPTH_TIME, &self.metrics_labels); self.inner.waiting_jobs().await? }; - common_metrics::gauge(QUEUE_DEPTH, &self.metrics_labels, available as f64); + common_metrics::gauge(AVAILABLE_DEPTH, &self.metrics_labels, available as f64); + + let dlq_depth = { + let _time = common_metrics::timing_guard(DLQ_DEPTH_TIME, &self.metrics_labels); + self.inner.count_dlq_depth().await? + }; + common_metrics::gauge(DLQ_DEPTH, &self.metrics_labels, dlq_depth as f64); common_metrics::inc(RUN_ENDS, &self.metrics_labels, 1); info!("Janitor loop complete"); diff --git a/rust/cyclotron-janitor/src/metrics_constants.rs b/rust/cyclotron-janitor/src/metrics_constants.rs index 331900301d163..2da1822484ee5 100644 --- a/rust/cyclotron-janitor/src/metrics_constants.rs +++ b/rust/cyclotron-janitor/src/metrics_constants.rs @@ -15,4 +15,7 @@ pub const STALLED_COUNT: &str = "cyclotron_janitor_stalled_jobs_reset"; pub const STALLED_TIME: &str = "cyclotron_janitor_stalled_jobs_reset_ms"; // The janitor should report some basic shard-level metrics -pub const QUEUE_DEPTH: &str = "cyclotron_available_jobs"; +pub const AVAILABLE_DEPTH: &str = "cyclotron_available_jobs"; +pub const AVAILABLE_DEPTH_TIME: &str = "cyclotron_available_jobs_ms"; +pub const DLQ_DEPTH: &str = "cyclotron_dead_letter_queue_depth"; +pub const DLQ_DEPTH_TIME: &str = "cyclotron_dead_letter_queue_depth_ms";