Skip to content

Commit

Permalink
feat(cyclotron): Change dead-letter strategy, adopt in fetch and jani…
Browse files Browse the repository at this point in the history
…tor (#24569)

Co-authored-by: Brett Hoerner <[email protected]>
  • Loading branch information
oliverb123 and bretthoerner authored Aug 27, 2024
1 parent e9e3441 commit 615c435
Show file tree
Hide file tree
Showing 17 changed files with 406 additions and 209 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ CREATE INDEX idx_queue_queue_name ON cyclotron_jobs(queue_name);
---------------------------------------------------------------------


-- The dead letter metadata table - when a job is DLQ'd, whovever does it leaves a note here for us.
CREATE TABLE IF NOT EXISTS cyclotron_dead_letter_metadata (
job_id UUID PRIMARY KEY,
-- The queue the job was on before it was DLQ'd (important if e.g. we want to re-schedule it after fixing a bug)
original_queue_name TEXT NOT NULL,
-- This is the reason the job was DLQ'd. This should be for humans, but can include structured data if needed (keep in mind the original job will still exist)
reason TEXT NOT NULL,
-- This is the time the job was DLQ'd
dlq_time TIMESTAMPTZ NOT NULL
);

-- These are just a starting point, supporting overriding the state for a given team, function or queue
-- For now these are entirely unused
CREATE TABLE IF NOT EXISTS cyclotron_team_control (
Expand Down
80 changes: 80 additions & 0 deletions rust/cyclotron-core/src/janitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::DEAD_LETTER_QUEUE;
use chrono::Duration;
use sqlx::PgPool;

use crate::{
ops::{
janitor::{
delete_completed_jobs, delete_failed_jobs, detect_poison_pills, reset_stalled_jobs,
},
meta::{count_total_waiting_jobs, dead_letter, run_migrations},
},
PoolConfig, QueueError,
};

// Thin layer on top of the raw janitor operations - mostly just avoids users having to take a dep on sqlx
pub struct Janitor {
pub pool: PgPool,
}

impl Janitor {
pub async fn new(config: PoolConfig) -> Result<Self, QueueError> {
let pool = config.connect().await?;
Ok(Self { pool })
}

pub fn from_pool(pool: PgPool) -> Self {
Self { pool }
}

pub async fn run_migrations(&self) {
run_migrations(&self.pool).await;
}

pub async fn delete_completed_jobs(&self) -> Result<u64, QueueError> {
delete_completed_jobs(&self.pool).await
}

pub async fn delete_failed_jobs(&self) -> Result<u64, QueueError> {
delete_failed_jobs(&self.pool).await
}

pub async fn reset_stalled_jobs(&self, timeout: Duration) -> Result<u64, QueueError> {
reset_stalled_jobs(&self.pool, timeout).await
}

pub async fn delete_poison_pills(
&self,
timeout: Duration,
max_janitor_touched: i16,
) -> Result<u64, QueueError> {
let poison = detect_poison_pills(&self.pool, timeout, max_janitor_touched).await?;

for job in &poison {
dead_letter(
&self.pool,
*job,
&format!("poison pill detected based on a timeout of {}", timeout),
)
.await?;
}

Ok(poison.len() as u64)
}

pub async fn waiting_jobs(&self) -> Result<u64, QueueError> {
count_total_waiting_jobs(&self.pool).await
}

pub async fn count_dlq_depth(&self) -> Result<u64, QueueError> {
let result = sqlx::query_scalar!(
"SELECT COUNT(*) FROM cyclotron_jobs WHERE queue_name = $1",
DEAD_LETTER_QUEUE
)
.fetch_one(&self.pool)
.await
.map_err(QueueError::from)?;

Ok(result.unwrap_or(0) as u64)
}
}
20 changes: 6 additions & 14 deletions rust/cyclotron-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,24 @@ pub use manager::QueueManager;
mod worker;
pub use worker::Worker;

// Janitor operations are exposed directly for now (and only the janitor impl uses them)
pub use ops::janitor::delete_completed_jobs;
pub use ops::janitor::delete_failed_jobs;
pub use ops::janitor::delete_poison_pills;
pub use ops::janitor::reset_stalled_jobs;

// We also expose some handly meta operations
pub use ops::meta::count_total_waiting_jobs;
// Janitor
mod janitor;
pub use janitor::Janitor;

// Config
mod config;
pub use config::ManagerConfig;
pub use config::PoolConfig;

// Meta
pub use ops::meta::run_migrations;

// Some data is shared between workers and janitors on a given shard, using
// the metadata table. These keys are reserved for that purpose

// The shard id is a fixed value that is set by the janitor when it starts up.
// Workers may use this value when reporting metrics. The `Worker` struct provides
// a method for fetching this value, that caches it appropriately such that it's safe
// to call frequently, while still being up-to-date (even though it should "never" change)
pub const SHARD_ID_KEY: &str = "shard_id";

// This isn't pub because, ideally, nothing using the core will ever need to know it.
const DEAD_LETTER_QUEUE: &str = "_cyclotron_dead_letter";

#[doc(hidden)]
pub mod test_support {
pub use crate::manager::Shard;
Expand Down
16 changes: 7 additions & 9 deletions rust/cyclotron-core/src/ops/janitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::{Duration, Utc};
use uuid::Uuid;

use crate::error::QueueError;

Expand Down Expand Up @@ -57,29 +58,26 @@ WHERE cyclotron_jobs.id = stalled.id
}

// Poison pills are stalled jobs that have been reset by the janitor more than `max_janitor_touched` times.
//
// TODO - investigating poision pills is important. We should consider moving them into a separate table or queue, rather than
// deleting them, so we can investigate why they're stalling/killing workers.
pub async fn delete_poison_pills<'c, E>(
pub async fn detect_poison_pills<'c, E>(
executor: E,
timeout: Duration,
max_janitor_touched: i16,
) -> Result<u64, QueueError>
) -> Result<Vec<Uuid>, QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let oldest_valid_heartbeat = Utc::now() - timeout;
// KLUDGE - the lock_id being set isn't checked here. A job in a running state without a lock id is violating an invariant,
// and would be useful to report.
let result = sqlx::query!(
let result = sqlx::query_scalar!(
r#"
DELETE FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2
SELECT id FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2
"#,
oldest_valid_heartbeat,
max_janitor_touched
).execute(executor)
).fetch_all(executor)
.await
.map_err(QueueError::from)?;

Ok(result.rows_affected())
Ok(result)
}
46 changes: 45 additions & 1 deletion rust/cyclotron-core/src/ops/meta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use sqlx::{postgres::PgQueryResult, PgPool};
use uuid::Uuid;

use crate::error::QueueError;
use crate::{error::QueueError, DEAD_LETTER_QUEUE};

pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result<u64, QueueError>
where
Expand Down Expand Up @@ -32,3 +32,47 @@ pub async fn run_migrations(pool: &PgPool) {
.await
.expect("Failed to run migrations");
}

/// Move a job into the dead letter queue, also updating the metadata table. Note that this operation does not
/// require a lock on the job. This is because the janitor needs to DLQ jobs that are stalled. The worker wrapper
/// around this operation should check that the job is "known" (owned by it) before calling this function.
pub async fn dead_letter<'c, E>(executor: E, job: Uuid, reason: &str) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres> + Clone,
{
// The first thing we do here is forcefully take the lock on this job, ensuring any subsequent worker
// operations will fail - we do this because the janitor can move jobs out from under workers. We mark
// the job as "running" and heartbeat so nothing else messes with it.
let lock = Uuid::now_v7();
let original_queue_name = sqlx::query_scalar!(
"UPDATE cyclotron_jobs SET state = 'running', lock_id = $1, last_heartbeat=NOW() WHERE id = $2 returning queue_name",
lock,
job
)
.fetch_optional(executor.clone())
.await?;

let Some(original_queue_name) = original_queue_name else {
return Err(QueueError::UnknownJobId(job));
};

// Now we add an entry to the dead metadata queue
sqlx::query!(
"INSERT INTO cyclotron_dead_letter_metadata (job_id, original_queue_name, reason, dlq_time) VALUES ($1, $2, $3, NOW())",
job,
original_queue_name,
reason
).execute(executor.clone()).await?;

// And finally, we move the job to the dead letter queue. Jobs in the DLQ are "available", because if they ever
// get moved back to a queue, they should be re-run.
sqlx::query!(
"UPDATE cyclotron_jobs SET state = 'available', lock_id = NULL, queue_name = $1 WHERE id = $2",
DEAD_LETTER_QUEUE,
job
)
.execute(executor)
.await?;

Ok(())
}
17 changes: 16 additions & 1 deletion rust/cyclotron-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use uuid::Uuid;

use crate::{
ops::{
meta::run_migrations,
meta::{dead_letter, run_migrations},
worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat},
},
Job, JobState, JobUpdate, PoolConfig, QueueError,
Expand Down Expand Up @@ -245,4 +245,19 @@ impl Worker {
.parameters = Some(parameters);
Ok(())
}

pub async fn dead_letter(&self, job_id: Uuid, reason: &str) -> Result<(), QueueError> {
// KLUDGE: Non-lexical lifetimes are good but they're just not perfect yet -
// changing this to not be a scope bump, and instead explicitly drop'ing the
// lock after the if check, makes the compiler think the lock is held across
// the await point.
{
let pending = self.pending.lock().unwrap();
if !pending.contains_key(&job_id) {
return Err(QueueError::UnknownJobId(job_id));
}
}

dead_letter(&self.pool, job_id, reason).await
}
}
Loading

0 comments on commit 615c435

Please sign in to comment.