From 615c4351391e2480c502666a3c63730aa403565d Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Tue, 27 Aug 2024 10:52:16 +0300 Subject: [PATCH] feat(cyclotron): Change dead-letter strategy, adopt in fetch and janitor (#24569) Co-authored-by: Brett Hoerner --- ...9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json | 18 ++ ...3d0e39f01b19b243d724b09f3ce6617d03dc7.json | 12 + ...873bf6d1d43884cd628df5b36978dd761b025.json | 18 ++ ...4711bdfb8cf78e362ccda8bc14e92324d51f8.json | 18 ++ ...a76bd8a7d96d73983a0c408f32f17da5f483b.json | 12 + ...3f990986c8b8433f01e449fbd1eee70ce6aeb.json | 12 - ...0240804122549_initial_job_queue_schema.sql | 11 + rust/cyclotron-core/src/janitor.rs | 80 ++++++ rust/cyclotron-core/src/lib.rs | 20 +- rust/cyclotron-core/src/ops/janitor.rs | 16 +- rust/cyclotron-core/src/ops/meta.rs | 46 +++- rust/cyclotron-core/src/worker.rs | 17 +- rust/cyclotron-fetch/src/fetch.rs | 259 +++++++++--------- rust/cyclotron-janitor/Cargo.toml | 4 +- rust/cyclotron-janitor/src/janitor.rs | 61 ++--- .../src/metrics_constants.rs | 5 +- rust/cyclotron-janitor/tests/janitor.rs | 6 +- 17 files changed, 406 insertions(+), 209 deletions(-) create mode 100644 rust/.sqlx/query-2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json create mode 100644 rust/.sqlx/query-2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7.json create mode 100644 rust/.sqlx/query-385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025.json create mode 100644 rust/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json create mode 100644 rust/.sqlx/query-b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b.json delete mode 100644 rust/.sqlx/query-fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb.json create mode 100644 rust/cyclotron-core/src/janitor.rs diff --git a/rust/.sqlx/query-2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json b/rust/.sqlx/query-2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json new file mode 100644 index 0000000000000..cfcbdd6288f56 --- /dev/null +++ b/rust/.sqlx/query-2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE cyclotron_jobs SET state = 'running', lock_id = $1, last_heartbeat=NOW() WHERE id = $2 returning queue_name", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "queue_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": ["Uuid", "Uuid"] + }, + "nullable": [false] + }, + "hash": "2bd3251126625d8dd5143f58f4f9c4bbd0c3a17b7ea65767cf5e7512e5a6ea89" +} diff --git a/rust/.sqlx/query-2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7.json b/rust/.sqlx/query-2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7.json new file mode 100644 index 0000000000000..e69786b54b25e --- /dev/null +++ b/rust/.sqlx/query-2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE cyclotron_jobs SET state = 'available', lock_id = NULL, queue_name = $1 WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": ["Text", "Uuid"] + }, + "nullable": [] + }, + "hash": "2ca9ea5e8706bba21b14d9a349f3d0e39f01b19b243d724b09f3ce6617d03dc7" +} diff --git a/rust/.sqlx/query-385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025.json b/rust/.sqlx/query-385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025.json new file mode 100644 index 0000000000000..5c6b66d3f8739 --- /dev/null +++ b/rust/.sqlx/query-385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\nSELECT id FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": ["Timestamptz", "Int2"] + }, + "nullable": [false] + }, + "hash": "385e94f4adab0f85174968f6eee873bf6d1d43884cd628df5b36978dd761b025" +} diff --git a/rust/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json b/rust/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json new file mode 100644 index 0000000000000..d70d4c9d33a43 --- /dev/null +++ b/rust/.sqlx/query-78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM cyclotron_jobs WHERE queue_name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": ["Text"] + }, + "nullable": [null] + }, + "hash": "78f54fcebc11e2411008448281e4711bdfb8cf78e362ccda8bc14e92324d51f8" +} diff --git a/rust/.sqlx/query-b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b.json b/rust/.sqlx/query-b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b.json new file mode 100644 index 0000000000000..8f201d80503ce --- /dev/null +++ b/rust/.sqlx/query-b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cyclotron_dead_letter_metadata (job_id, original_queue_name, reason, dlq_time) VALUES ($1, $2, $3, NOW())", + "describe": { + "columns": [], + "parameters": { + "Left": ["Uuid", "Text", "Text"] + }, + "nullable": [] + }, + "hash": "b8c1b723826d595dca0389d729fa76bd8a7d96d73983a0c408f32f17da5f483b" +} diff --git a/rust/.sqlx/query-fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb.json b/rust/.sqlx/query-fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb.json deleted file mode 100644 index 09fc24b340d3f..0000000000000 --- a/rust/.sqlx/query-fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\nDELETE FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": ["Timestamptz", "Int2"] - }, - "nullable": [] - }, - "hash": "fdda5a80f5495f2d4b15ce1a0963f990986c8b8433f01e449fbd1eee70ce6aeb" -} diff --git a/rust/cyclotron-core/migrations/20240804122549_initial_job_queue_schema.sql b/rust/cyclotron-core/migrations/20240804122549_initial_job_queue_schema.sql index 26cbf1c94b367..a69c44ef90f77 100644 --- a/rust/cyclotron-core/migrations/20240804122549_initial_job_queue_schema.sql +++ b/rust/cyclotron-core/migrations/20240804122549_initial_job_queue_schema.sql @@ -78,6 +78,17 @@ CREATE INDEX idx_queue_queue_name ON cyclotron_jobs(queue_name); --------------------------------------------------------------------- +-- The dead letter metadata table - when a job is DLQ'd, whovever does it leaves a note here for us. +CREATE TABLE IF NOT EXISTS cyclotron_dead_letter_metadata ( + job_id UUID PRIMARY KEY, + -- The queue the job was on before it was DLQ'd (important if e.g. we want to re-schedule it after fixing a bug) + original_queue_name TEXT NOT NULL, + -- This is the reason the job was DLQ'd. This should be for humans, but can include structured data if needed (keep in mind the original job will still exist) + reason TEXT NOT NULL, + -- This is the time the job was DLQ'd + dlq_time TIMESTAMPTZ NOT NULL +); + -- These are just a starting point, supporting overriding the state for a given team, function or queue -- For now these are entirely unused CREATE TABLE IF NOT EXISTS cyclotron_team_control ( diff --git a/rust/cyclotron-core/src/janitor.rs b/rust/cyclotron-core/src/janitor.rs new file mode 100644 index 0000000000000..8fd98307fba67 --- /dev/null +++ b/rust/cyclotron-core/src/janitor.rs @@ -0,0 +1,80 @@ +use crate::DEAD_LETTER_QUEUE; +use chrono::Duration; +use sqlx::PgPool; + +use crate::{ + ops::{ + janitor::{ + delete_completed_jobs, delete_failed_jobs, detect_poison_pills, reset_stalled_jobs, + }, + meta::{count_total_waiting_jobs, dead_letter, run_migrations}, + }, + PoolConfig, QueueError, +}; + +// Thin layer on top of the raw janitor operations - mostly just avoids users having to take a dep on sqlx +pub struct Janitor { + pub pool: PgPool, +} + +impl Janitor { + pub async fn new(config: PoolConfig) -> Result { + let pool = config.connect().await?; + Ok(Self { pool }) + } + + pub fn from_pool(pool: PgPool) -> Self { + Self { pool } + } + + pub async fn run_migrations(&self) { + run_migrations(&self.pool).await; + } + + pub async fn delete_completed_jobs(&self) -> Result { + delete_completed_jobs(&self.pool).await + } + + pub async fn delete_failed_jobs(&self) -> Result { + delete_failed_jobs(&self.pool).await + } + + pub async fn reset_stalled_jobs(&self, timeout: Duration) -> Result { + reset_stalled_jobs(&self.pool, timeout).await + } + + pub async fn delete_poison_pills( + &self, + timeout: Duration, + max_janitor_touched: i16, + ) -> Result { + let poison = detect_poison_pills(&self.pool, timeout, max_janitor_touched).await?; + + for job in &poison { + dead_letter( + &self.pool, + *job, + &format!("poison pill detected based on a timeout of {}", timeout), + ) + .await?; + } + + Ok(poison.len() as u64) + } + + pub async fn waiting_jobs(&self) -> Result { + count_total_waiting_jobs(&self.pool).await + } + + pub async fn count_dlq_depth(&self) -> Result { + let result = sqlx::query_scalar!( + "SELECT COUNT(*) FROM cyclotron_jobs WHERE queue_name = $1", + DEAD_LETTER_QUEUE + ) + .fetch_one(&self.pool) + .await + .map_err(QueueError::from)?; + + Ok(result.unwrap_or(0) as u64) + } +} diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index 3e8b523aedcd8..6121fea182df3 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -22,32 +22,24 @@ pub use manager::QueueManager; mod worker; pub use worker::Worker; -// Janitor operations are exposed directly for now (and only the janitor impl uses them) -pub use ops::janitor::delete_completed_jobs; -pub use ops::janitor::delete_failed_jobs; -pub use ops::janitor::delete_poison_pills; -pub use ops::janitor::reset_stalled_jobs; - -// We also expose some handly meta operations -pub use ops::meta::count_total_waiting_jobs; +// Janitor +mod janitor; +pub use janitor::Janitor; // Config mod config; pub use config::ManagerConfig; pub use config::PoolConfig; -// Meta -pub use ops::meta::run_migrations; - -// Some data is shared between workers and janitors on a given shard, using -// the metadata table. These keys are reserved for that purpose - // The shard id is a fixed value that is set by the janitor when it starts up. // Workers may use this value when reporting metrics. The `Worker` struct provides // a method for fetching this value, that caches it appropriately such that it's safe // to call frequently, while still being up-to-date (even though it should "never" change) pub const SHARD_ID_KEY: &str = "shard_id"; +// This isn't pub because, ideally, nothing using the core will ever need to know it. +const DEAD_LETTER_QUEUE: &str = "_cyclotron_dead_letter"; + #[doc(hidden)] pub mod test_support { pub use crate::manager::Shard; diff --git a/rust/cyclotron-core/src/ops/janitor.rs b/rust/cyclotron-core/src/ops/janitor.rs index df77732a17110..16bdb9180f0f9 100644 --- a/rust/cyclotron-core/src/ops/janitor.rs +++ b/rust/cyclotron-core/src/ops/janitor.rs @@ -1,4 +1,5 @@ use chrono::{Duration, Utc}; +use uuid::Uuid; use crate::error::QueueError; @@ -57,29 +58,26 @@ WHERE cyclotron_jobs.id = stalled.id } // Poison pills are stalled jobs that have been reset by the janitor more than `max_janitor_touched` times. -// -// TODO - investigating poision pills is important. We should consider moving them into a separate table or queue, rather than -// deleting them, so we can investigate why they're stalling/killing workers. -pub async fn delete_poison_pills<'c, E>( +pub async fn detect_poison_pills<'c, E>( executor: E, timeout: Duration, max_janitor_touched: i16, -) -> Result +) -> Result, QueueError> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { let oldest_valid_heartbeat = Utc::now() - timeout; // KLUDGE - the lock_id being set isn't checked here. A job in a running state without a lock id is violating an invariant, // and would be useful to report. - let result = sqlx::query!( + let result = sqlx::query_scalar!( r#" -DELETE FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2 +SELECT id FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2 "#, oldest_valid_heartbeat, max_janitor_touched - ).execute(executor) + ).fetch_all(executor) .await .map_err(QueueError::from)?; - Ok(result.rows_affected()) + Ok(result) } diff --git a/rust/cyclotron-core/src/ops/meta.rs b/rust/cyclotron-core/src/ops/meta.rs index 883c901276351..d48acd88bc188 100644 --- a/rust/cyclotron-core/src/ops/meta.rs +++ b/rust/cyclotron-core/src/ops/meta.rs @@ -1,7 +1,7 @@ use sqlx::{postgres::PgQueryResult, PgPool}; use uuid::Uuid; -use crate::error::QueueError; +use crate::{error::QueueError, DEAD_LETTER_QUEUE}; pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result where @@ -32,3 +32,47 @@ pub async fn run_migrations(pool: &PgPool) { .await .expect("Failed to run migrations"); } + +/// Move a job into the dead letter queue, also updating the metadata table. Note that this operation does not +/// require a lock on the job. This is because the janitor needs to DLQ jobs that are stalled. The worker wrapper +/// around this operation should check that the job is "known" (owned by it) before calling this function. +pub async fn dead_letter<'c, E>(executor: E, job: Uuid, reason: &str) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres> + Clone, +{ + // The first thing we do here is forcefully take the lock on this job, ensuring any subsequent worker + // operations will fail - we do this because the janitor can move jobs out from under workers. We mark + // the job as "running" and heartbeat so nothing else messes with it. + let lock = Uuid::now_v7(); + let original_queue_name = sqlx::query_scalar!( + "UPDATE cyclotron_jobs SET state = 'running', lock_id = $1, last_heartbeat=NOW() WHERE id = $2 returning queue_name", + lock, + job + ) + .fetch_optional(executor.clone()) + .await?; + + let Some(original_queue_name) = original_queue_name else { + return Err(QueueError::UnknownJobId(job)); + }; + + // Now we add an entry to the dead metadata queue + sqlx::query!( + "INSERT INTO cyclotron_dead_letter_metadata (job_id, original_queue_name, reason, dlq_time) VALUES ($1, $2, $3, NOW())", + job, + original_queue_name, + reason + ).execute(executor.clone()).await?; + + // And finally, we move the job to the dead letter queue. Jobs in the DLQ are "available", because if they ever + // get moved back to a queue, they should be re-run. + sqlx::query!( + "UPDATE cyclotron_jobs SET state = 'available', lock_id = NULL, queue_name = $1 WHERE id = $2", + DEAD_LETTER_QUEUE, + job + ) + .execute(executor) + .await?; + + Ok(()) +} diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index 03a4c19bdab83..f9f1ca65894d9 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::{ ops::{ - meta::run_migrations, + meta::{dead_letter, run_migrations}, worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat}, }, Job, JobState, JobUpdate, PoolConfig, QueueError, @@ -245,4 +245,19 @@ impl Worker { .parameters = Some(parameters); Ok(()) } + + pub async fn dead_letter(&self, job_id: Uuid, reason: &str) -> Result<(), QueueError> { + // KLUDGE: Non-lexical lifetimes are good but they're just not perfect yet - + // changing this to not be a scope bump, and instead explicitly drop'ing the + // lock after the if check, makes the compiler think the lock is held across + // the await point. + { + let pending = self.pending.lock().unwrap(); + if !pending.contains_key(&job_id) { + return Err(QueueError::UnknownJobId(job_id)); + } + } + + dead_letter(&self.pool, job_id, reason).await + } } diff --git a/rust/cyclotron-fetch/src/fetch.rs b/rust/cyclotron-fetch/src/fetch.rs index 214ff80e0e8e2..6893861c984de 100644 --- a/rust/cyclotron-fetch/src/fetch.rs +++ b/rust/cyclotron-fetch/src/fetch.rs @@ -1,4 +1,4 @@ -use std::{cmp::min, collections::HashMap, sync::Arc}; +use std::{cmp::min, collections::HashMap, fmt::Display, sync::Arc}; use chrono::{DateTime, Duration, Utc}; use cyclotron_core::{Job, JobState, QueueError, Worker}; @@ -9,11 +9,11 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::sync::OwnedSemaphorePermit; use tracing::{error, instrument, warn}; +use uuid::Uuid; use crate::{context::AppContext, metrics_constants::*}; // TODO - a lot of these should maybe be configurable -pub const DEAD_LETTER_QUEUE: &str = "fetch-dead-letter"; pub const DEFAULT_RETRIES: u32 = 3; pub const DEFAULT_ON_FINISH: OnFinish = OnFinish::Return; pub const HEARTBEAT_INTERVAL_MS: i64 = 5000; @@ -264,54 +264,45 @@ pub async fn tick(context: Arc) -> Result { Ok(num_jobs) } -// Mostly a thin wrapper to make ser/de a bit easier -struct FetchJob<'a> { - _job: &'a Job, - metadata: FetchMetadata, - parameters: FetchParameters, +impl From<&Job> for FetchMetadata { + fn from(job: &Job) -> Self { + let Some(m) = &job.metadata else { + return FetchMetadata { + tries: 0, + trace: vec![], + }; + }; + + let Ok(m) = serde_json::from_str(m) else { + return FetchMetadata { + tries: 0, + trace: vec![], + }; + }; + + m + } } -impl<'a> TryFrom<&'a Job> for FetchJob<'a> { +impl TryFrom<&Job> for FetchParameters { type Error = FetchFailure; - fn try_from(job: &'a Job) -> Result { + fn try_from(job: &Job) -> Result { let Some(parameters) = &job.parameters else { return Err(FetchFailure::new( FetchFailureKind::MissingParameters, "Job is missing parameters", )); }; - let parameters: FetchParameters = match serde_json::from_str(parameters) { - Ok(p) => p, - Err(e) => { - return Err(FetchFailure::new( - FetchFailureKind::InvalidParameters, - format!("Failed to parse parameters: {}", e), - )) - } - }; - let metadata = match &job.metadata { - Some(m) => match serde_json::from_str(m) { - Ok(m) => m, - Err(_) => { - // If we can't decode the metadata, assume this is the first time we've seen the job - // TODO - this is maybe too lenient, I'm not sure. - FetchMetadata { - tries: 0, - trace: vec![], - } - } - }, - None => FetchMetadata { - tries: 0, - trace: vec![], - }, + + let Ok(p) = serde_json::from_str(parameters) else { + return Err(FetchFailure::new( + FetchFailureKind::InvalidParameters, + "Failed to parse parameters", + )); }; - Ok(Self { - _job: job, - metadata, - parameters, - }) + + Ok(p) } } @@ -323,38 +314,47 @@ pub async fn run_job( ) -> Result<(), FetchError> { let labels = context.metric_labels(); let job_total = common_metrics::timing_guard(JOB_TOTAL_TIME, &labels); - let parsed: FetchJob = match (&job).try_into() { + + let metadata = FetchMetadata::from(&job); + let params = match FetchParameters::try_from(&job) { Ok(p) => p, - Err(e) => { - warn!("Failed to parse job: {:?}", e); - let res = dead_letter_job(&context.worker, job, vec![e]).await; + Err(_) => { + // Failure to parse parameters is a programming error in whatever is handing us jobs, and we + // should dead letter the job and then return. common_metrics::inc(FETCH_DEAD_LETTER, &labels, 1); + let res = context + .worker + .dead_letter(job.id, "Could not parse job parameters") + .await; job_total - .label(OUTCOME_LABEL, "immediate_dead_letter") + .label(OUTCOME_LABEL, "missing_parameters_dead_letter") .fin(); - return res; + return Ok(res?); } }; - let method = (&parsed.parameters.method).into(); + let method = (¶ms.method).into(); // Parsing errors are always dead letters - it /will/ fail every time, so dump it - // TODO - We should probably decide whether to dead letter or return Failed on the basis of OnFinish, - // in case the caller wants to do any cleanup on broken jobs - let url: reqwest::Url = match (parsed.parameters.url).parse() { + let url: reqwest::Url = match (params.url).parse() { Ok(u) => u, Err(e) => { warn!("Failed to parse URL: {}", e); - common_metrics::inc(FETCH_DEAD_LETTER, &labels, 1); - let res = dead_letter_job( + + let failure = FetchFailure::new( + FetchFailureKind::InvalidParameters, + format!("Invalid url: {} - {}", ¶ms.url, e), + ); + + let res = quick_fail_job( &context.worker, job, - vec![FetchFailure::new( - FetchFailureKind::InvalidParameters, - format!("Invalid url: {}", e), - )], + params.return_queue, + params.on_finish.unwrap_or(DEFAULT_ON_FINISH), + failure, ) .await; + job_total .label(OUTCOME_LABEL, "url_parse_dead_letter") .fin(); @@ -362,20 +362,24 @@ pub async fn run_job( } }; - let headers = match (&parsed.parameters.headers.unwrap_or_default()).try_into() { + let headers = match (¶ms.headers.unwrap_or_default()).try_into() { Ok(h) => h, Err(e) => { - let res = dead_letter_job( + warn!("Failed to parse headers: {}", e); + let failure = FetchFailure::new( + FetchFailureKind::InvalidParameters, + format!("Invalid headers: {}", e), + ); + + let res = quick_fail_job( &context.worker, job, - vec![FetchFailure::new( - FetchFailureKind::InvalidParameters, - format!("Invalid headers: {}", e), - )], + params.return_queue, + params.on_finish.unwrap_or(DEFAULT_ON_FINISH), + failure, ) .await; - warn!("Failed to parse headers: {}", e); - common_metrics::inc(FETCH_DEAD_LETTER, &labels, 1); + job_total .label(OUTCOME_LABEL, "headers_parse_dead_letter") .fin(); @@ -383,7 +387,7 @@ pub async fn run_job( } }; - let body = reqwest::Body::from(parsed.parameters.body.unwrap_or_default()); + let body = reqwest::Body::from(params.body.unwrap_or_default()); let mut send_fut = context .client @@ -419,10 +423,10 @@ pub async fn run_job( let res = handle_fetch_failure( &context, &job, - &parsed.metadata, - parsed.parameters.max_tries.unwrap_or(DEFAULT_RETRIES), - parsed.parameters.return_queue, - parsed.parameters.on_finish.unwrap_or(DEFAULT_ON_FINISH), + &metadata, + params.max_tries.unwrap_or(DEFAULT_RETRIES), + params.return_queue, + params.on_finish.unwrap_or(DEFAULT_ON_FINISH), e, ) .await; @@ -472,10 +476,10 @@ pub async fn run_job( let res = handle_fetch_failure( &context, &job, - &parsed.metadata, - parsed.parameters.max_tries.unwrap_or(DEFAULT_RETRIES), - parsed.parameters.return_queue, - parsed.parameters.on_finish.unwrap_or(DEFAULT_ON_FINISH), + &metadata, + params.max_tries.unwrap_or(DEFAULT_RETRIES), + params.return_queue, + params.on_finish.unwrap_or(DEFAULT_ON_FINISH), e, ) .await; @@ -496,10 +500,10 @@ pub async fn run_job( let res = handle_fetch_failure( &context, &job, - &parsed.metadata, - parsed.parameters.max_tries.unwrap_or(DEFAULT_RETRIES), - parsed.parameters.return_queue, - parsed.parameters.on_finish.unwrap_or(DEFAULT_ON_FINISH), + &metadata, + params.max_tries.unwrap_or(DEFAULT_RETRIES), + params.return_queue, + params.on_finish.unwrap_or(DEFAULT_ON_FINISH), failure, ) .await; @@ -518,8 +522,8 @@ pub async fn run_job( let res = complete_job( &context.worker, &job, - parsed.parameters.return_queue, - parsed.parameters.on_finish.unwrap_or(DEFAULT_ON_FINISH), + params.return_queue, + params.on_finish.unwrap_or(DEFAULT_ON_FINISH), result, ) .await; @@ -527,6 +531,21 @@ pub async fn run_job( res } +// This immediately returns a job to the return_queue, with a single failure. It's used in cases like, e.g, +// parsing errors, where we know the job will never succeed. +pub async fn quick_fail_job( + worker: &Worker, + job: Job, + return_queue: String, + on_finish: OnFinish, + failure: FetchFailure, +) -> Result<(), FetchError> { + let result = FetchResult::Failure { + trace: vec![failure], + }; + complete_job(worker, &job, return_queue, on_finish, result).await +} + // Checks if the retry limit has been reached, and does one of: // - Schedule the job for retry, doing metadata bookkeeping // - Complete the job, with the failure trace @@ -583,8 +602,7 @@ where Ok(()) } -// Complete the job, either because we got a good response, or because the jobs retries -// have been exceeded. +// Complete the job with some result. pub async fn complete_job( worker: &Worker, job: &Job, @@ -592,31 +610,24 @@ pub async fn complete_job( on_finish: OnFinish, result: FetchResult, ) -> Result<(), FetchError> { - // If we fail any serde, we just want to flush to the DLQ and bail worker.set_state(job.id, JobState::Available)?; - worker.set_queue(job.id, DEAD_LETTER_QUEUE)?; + worker.set_queue(job.id, &return_queue)?; let is_success = result.is_success(); - let result = match serde_json::to_string(&result) { - Ok(r) => r, - Err(e) => { - // Leave behind a hint for debugging - worker.set_metadata(job.id, Some(format!("Failed to serialise result: {}", e)))?; - worker.flush_job(job.id).await?; - return Err(FetchError::SerdeError(e)); - } - }; - - worker.set_queue(job.id, &return_queue)?; + let result = do_or_dead_letter(worker, job.id, || serde_json::to_string(&result)).await??; - match (is_success, on_finish) { - (true, _) | (false, OnFinish::Return) => { - worker.set_state(job.id, JobState::Available)?; + match (on_finish, is_success) { + (OnFinish::Complete, true) => { + worker.set_state(job.id, JobState::Completed)?; } - (false, OnFinish::Complete) => { + (OnFinish::Complete, false) => { worker.set_state(job.id, JobState::Failed)?; } + (OnFinish::Return, _) => { + // If we're retuning the job, we don't care whether it succeeded or not, the caller wants it back + worker.set_state(job.id, JobState::Available)?; + } } worker.set_parameters(job.id, Some(result))?; @@ -626,41 +637,6 @@ pub async fn complete_job( Ok(()) } -// This moves the job to a dead letter queue, and sets the state to Available (to prevent it -// from being deleted by the janitor). This is for debugging purposes, and only really jobs -// that have some parsing failure on dequeue end up here (as they indicate a programming error -// in the caller, or the worker) -pub async fn dead_letter_job( - worker: &Worker, - job: Job, - errors: Vec, -) -> Result<(), FetchError> { - worker.set_state(job.id, JobState::Available)?; - worker.set_queue(job.id, DEAD_LETTER_QUEUE)?; - - let result = FetchResult::Failure { trace: errors }; - let result = match serde_json::to_string(&result) { - Ok(r) => r, - Err(e) => { - worker.set_metadata( - job.id, - Some(format!( - "Failed to serialise result during DLQ write: {}", - e - )), - )?; - worker.flush_job(job.id).await?; - return Err(FetchError::SerdeError(e)); - } - }; - - worker.set_parameters(job.id, Some(result))?; - - worker.flush_job(job.id).await?; - - Ok(()) -} - // Pulls the body, while maintaining the job heartbeat. pub async fn first_n_bytes_of_response( worker: &Worker, @@ -706,3 +682,22 @@ pub async fn first_n_bytes_of_response( Ok(Ok(body)) } + +pub async fn do_or_dead_letter( + worker: &Worker, + job_id: Uuid, + f: impl FnOnce() -> Result, +) -> Result, FetchError> +where + E: Display, +{ + let res = f(); + match &res { + Ok(_) => {} + Err(e) => { + let reason = e.to_string(); + worker.dead_letter(job_id, &reason).await?; + } + } + Ok(res) +} diff --git a/rust/cyclotron-janitor/Cargo.toml b/rust/cyclotron-janitor/Cargo.toml index 2991a274848c2..3363a16aac4a6 100644 --- a/rust/cyclotron-janitor/Cargo.toml +++ b/rust/cyclotron-janitor/Cargo.toml @@ -8,7 +8,6 @@ workspace = true [dependencies] tracing-subscriber = { workspace = true } -sqlx = { workspace = true } chrono = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } @@ -19,3 +18,6 @@ eyre = { workspace = true } cyclotron-core = { path = "../cyclotron-core" } common-metrics = { path = "../common/metrics" } health = { path = "../common/health" } + +[dev-dependencies] +sqlx = { workspace = true } \ No newline at end of file diff --git a/rust/cyclotron-janitor/src/janitor.rs b/rust/cyclotron-janitor/src/janitor.rs index 60796e9f7e11b..be36c07ec009d 100644 --- a/rust/cyclotron-janitor/src/janitor.rs +++ b/rust/cyclotron-janitor/src/janitor.rs @@ -1,8 +1,4 @@ -use cyclotron_core::{ - delete_completed_jobs, delete_failed_jobs, delete_poison_pills, reset_stalled_jobs, QueueError, - SHARD_ID_KEY, -}; -use sqlx::PgPool; +use cyclotron_core::{QueueError, SHARD_ID_KEY}; use tracing::{info, warn}; use crate::{ @@ -20,15 +16,15 @@ pub struct CleanupResult { } pub struct Janitor { - pool: PgPool, - settings: JanitorSettings, - metrics_labels: Vec<(String, String)>, + pub inner: cyclotron_core::Janitor, + pub settings: JanitorSettings, + pub metrics_labels: Vec<(String, String)>, } impl Janitor { pub async fn new(config: JanitorConfig) -> Result { let settings = config.settings; - let pool = config.pool.connect().await?; + let inner = cyclotron_core::Janitor::new(config.pool).await?; let metrics_labels = vec![ ("janitor_id".to_string(), settings.id.clone()), @@ -36,28 +32,14 @@ impl Janitor { ]; Ok(Self { - pool, + inner, settings, metrics_labels, }) } - #[doc(hidden)] - pub async fn from_pool(pool: PgPool, settings: JanitorSettings) -> Self { - let metrics_labels = vec![ - ("janitor_id".to_string(), settings.id.clone()), - (SHARD_ID_KEY.to_string(), settings.shard_id.clone()), - ]; - - Self { - pool, - settings, - metrics_labels, - } - } - pub async fn run_migrations(&self) { - cyclotron_core::run_migrations(&self.pool).await; + self.inner.run_migrations().await; } pub async fn run_once(&self) -> Result { @@ -67,24 +49,21 @@ impl Janitor { let completed = { let _time = common_metrics::timing_guard(COMPLETED_TIME, &self.metrics_labels); - delete_completed_jobs(&self.pool).await? + self.inner.delete_completed_jobs().await? }; common_metrics::inc(COMPLETED_COUNT, &self.metrics_labels, completed); let failed = { let _time = common_metrics::timing_guard(FAILED_TIME, &self.metrics_labels); - delete_failed_jobs(&self.pool).await? + self.inner.delete_failed_jobs().await? }; common_metrics::inc(FAILED_COUNT, &self.metrics_labels, failed); let poisoned = { let _time = common_metrics::timing_guard(POISONED_TIME, &self.metrics_labels); - delete_poison_pills( - &self.pool, - self.settings.stall_timeout, - self.settings.max_touches, - ) - .await? + self.inner + .delete_poison_pills(self.settings.stall_timeout, self.settings.max_touches) + .await? }; common_metrics::inc(POISONED_COUNT, &self.metrics_labels, poisoned); @@ -94,7 +73,9 @@ impl Janitor { let stalled = { let _time = common_metrics::timing_guard(STALLED_TIME, &self.metrics_labels); - reset_stalled_jobs(&self.pool, self.settings.stall_timeout).await? + self.inner + .reset_stalled_jobs(self.settings.stall_timeout) + .await? }; common_metrics::inc(STALLED_COUNT, &self.metrics_labels, stalled); @@ -103,10 +84,16 @@ impl Janitor { } let available = { - let _time = common_metrics::timing_guard(QUEUE_DEPTH, &self.metrics_labels); - cyclotron_core::count_total_waiting_jobs(&self.pool).await? + let _time = common_metrics::timing_guard(AVAILABLE_DEPTH_TIME, &self.metrics_labels); + self.inner.waiting_jobs().await? + }; + common_metrics::gauge(AVAILABLE_DEPTH, &self.metrics_labels, available as f64); + + let dlq_depth = { + let _time = common_metrics::timing_guard(DLQ_DEPTH_TIME, &self.metrics_labels); + self.inner.count_dlq_depth().await? }; - common_metrics::gauge(QUEUE_DEPTH, &self.metrics_labels, available as f64); + common_metrics::gauge(DLQ_DEPTH, &self.metrics_labels, dlq_depth as f64); common_metrics::inc(RUN_ENDS, &self.metrics_labels, 1); info!("Janitor loop complete"); diff --git a/rust/cyclotron-janitor/src/metrics_constants.rs b/rust/cyclotron-janitor/src/metrics_constants.rs index 331900301d163..2da1822484ee5 100644 --- a/rust/cyclotron-janitor/src/metrics_constants.rs +++ b/rust/cyclotron-janitor/src/metrics_constants.rs @@ -15,4 +15,7 @@ pub const STALLED_COUNT: &str = "cyclotron_janitor_stalled_jobs_reset"; pub const STALLED_TIME: &str = "cyclotron_janitor_stalled_jobs_reset_ms"; // The janitor should report some basic shard-level metrics -pub const QUEUE_DEPTH: &str = "cyclotron_available_jobs"; +pub const AVAILABLE_DEPTH: &str = "cyclotron_available_jobs"; +pub const AVAILABLE_DEPTH_TIME: &str = "cyclotron_available_jobs_ms"; +pub const DLQ_DEPTH: &str = "cyclotron_dead_letter_queue_depth"; +pub const DLQ_DEPTH_TIME: &str = "cyclotron_dead_letter_queue_depth_ms"; diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index eef0efe636037..494b9431c228a 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -21,7 +21,11 @@ async fn janitor_test(db: PgPool) { id: "test_janitor".to_string(), shard_id: "test_shard".to_string(), }; - let janitor = Janitor::from_pool(db.clone(), settings).await; + let janitor = Janitor { + inner: cyclotron_core::Janitor::from_pool(db.clone()), + settings, + metrics_labels: vec![], + }; let now = Utc::now() - Duration::seconds(10); let queue_name = "default".to_string();