From 8f4a6e69106ea42a5e77713ccf57e5c8f679f3d2 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Thu, 22 Aug 2024 18:06:33 +0300 Subject: [PATCH] chore: refactor cyclotron-core (#24526) --- rust/cyclotron-core/src/base_ops.rs | 697 ------------------ .../src/bin/create_test_data.rs | 6 +- rust/cyclotron-core/src/bin/load_test.rs | 7 +- rust/cyclotron-core/src/config.rs | 42 ++ rust/cyclotron-core/src/error.rs | 2 +- rust/cyclotron-core/src/lib.rs | 76 +- rust/cyclotron-core/src/manager.rs | 53 +- .../src/{janitor_ops.rs => ops/janitor.rs} | 23 +- rust/cyclotron-core/src/ops/manager.rs | 160 ++++ rust/cyclotron-core/src/ops/meta.rs | 26 + rust/cyclotron-core/src/ops/mod.rs | 4 + rust/cyclotron-core/src/ops/worker.rs | 398 ++++++++++ rust/cyclotron-core/src/types.rs | 139 ++++ rust/cyclotron-core/src/worker.rs | 21 +- rust/cyclotron-core/tests/base_ops.rs | 10 +- rust/cyclotron-core/tests/common.rs | 2 +- rust/cyclotron-core/tests/shard.rs | 2 +- rust/cyclotron-fetch/src/context.rs | 2 +- rust/cyclotron-fetch/src/fetch.rs | 6 +- rust/cyclotron-fetch/tests/fetch.rs | 2 +- rust/cyclotron-fetch/tests/utils.rs | 7 +- rust/cyclotron-janitor/src/janitor.rs | 7 +- rust/cyclotron-janitor/tests/janitor.rs | 7 +- rust/cyclotron-node/src/lib.rs | 9 +- 24 files changed, 860 insertions(+), 848 deletions(-) delete mode 100644 rust/cyclotron-core/src/base_ops.rs create mode 100644 rust/cyclotron-core/src/config.rs rename rust/cyclotron-core/src/{janitor_ops.rs => ops/janitor.rs} (62%) create mode 100644 rust/cyclotron-core/src/ops/manager.rs create mode 100644 rust/cyclotron-core/src/ops/meta.rs create mode 100644 rust/cyclotron-core/src/ops/mod.rs create mode 100644 rust/cyclotron-core/src/ops/worker.rs create mode 100644 rust/cyclotron-core/src/types.rs diff --git a/rust/cyclotron-core/src/base_ops.rs b/rust/cyclotron-core/src/base_ops.rs deleted file mode 100644 index 5d1f194d88cbb..0000000000000 --- a/rust/cyclotron-core/src/base_ops.rs +++ /dev/null @@ -1,697 +0,0 @@ -//! # PgQueue -//! -//! A job queue implementation backed by a PostgreSQL table. - -use std::str::FromStr; - -use chrono::{self, DateTime, Utc}; -use serde::{self, Deserialize, Serialize}; -use sqlx::{ - postgres::{PgArguments, PgHasArrayType, PgQueryResult, PgTypeInfo}, - query::Query, -}; -use uuid::Uuid; - -use crate::error::QueueError; - -#[derive(Debug, Deserialize, Serialize, sqlx::Type)] -#[serde(rename_all = "lowercase")] -#[sqlx(type_name = "JobState", rename_all = "lowercase")] -pub enum JobState { - Available, - Running, - Completed, - Failed, - Paused, -} - -impl FromStr for JobState { - type Err = (); - - fn from_str(s: &str) -> Result { - match s { - "available" => Ok(JobState::Available), - "running" => Ok(JobState::Running), - "completed" => Ok(JobState::Completed), - "failed" => Ok(JobState::Failed), - _ => Err(()), - } - } -} - -impl PgHasArrayType for JobState { - fn array_type_info() -> sqlx::postgres::PgTypeInfo { - // Postgres default naming convention for array types is "_typename" - PgTypeInfo::with_name("_JobState") - } -} - -// The chunk of data needed to enqueue a job -#[derive(Debug, Deserialize, Serialize, Clone, Eq, PartialEq)] -pub struct JobInit { - pub team_id: i32, - pub queue_name: String, - pub priority: i16, - pub scheduled: DateTime, - pub function_id: Option, - pub vm_state: Option, - pub parameters: Option, - pub metadata: Option, -} - -// TODO - there are certain things we might want to be on a per-team basis here... the ability to say -// "do not process any jobs for this team" independent of doing an operation on the job table seems powerful, -// but that requires a distinct team table. For now, I'm just making a note that it's something we might -// want (the command to modify the treatment of all jobs associated with a team should only need to be issued and -// processed /once/, not once per job, and should apply to all jobs both currently queued and any future ones). This -// can be added in a progressive way (by adding joins and clauses to the dequeue query), so we don't need to worry about -// it too much up front. -#[derive(Debug, Deserialize, Serialize)] -pub struct Job { - // Job metadata - pub id: Uuid, - pub team_id: i32, - pub function_id: Option, // Some jobs might not come from hog, and it doesn't /kill/ use to support that - pub created: DateTime, - - // Queue bookkeeping - // This will be set for any worker that ever has a job in the "running" state (so any worker that dequeues a job) - // but I don't want to do the work to encode that in the type system right now - later it should be - pub lock_id: Option, - pub last_heartbeat: Option>, - pub janitor_touch_count: i16, - pub transition_count: i16, - pub last_transition: DateTime, - - // Virtual queue components - pub queue_name: String, // We can have multiple "virtual queues" workers pull from - - // Job availability - pub state: JobState, - pub priority: i16, // For sorting "available" jobs. Lower is higher priority - pub scheduled: DateTime, - - // Job data - pub vm_state: Option, // The state of the VM this job is running on (if it exists) - pub metadata: Option, // Additional fields a worker can tack onto a job, for e.g. tracking some state across retries (or number of retries in general by a given class of worker) - pub parameters: Option, // The actual parameters of the job (function args for a hog function, http request for a fetch function) -} - -pub async fn create_job<'c, E>(executor: E, data: JobInit) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let id = Uuid::now_v7(); - sqlx::query!( - r#" -INSERT INTO cyclotron_jobs - ( - id, - team_id, - function_id, - created, - lock_id, - last_heartbeat, - janitor_touch_count, - transition_count, - last_transition, - queue_name, - state, - scheduled, - priority, - vm_state, - metadata, - parameters - ) -VALUES - ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10) - "#, - id, - data.team_id, - data.function_id, - data.queue_name, - JobState::Available as _, - data.scheduled, - data.priority, - data.vm_state, - data.metadata, - data.parameters - ) - .execute(executor) - .await?; - - Ok(()) -} - -pub async fn bulk_create_jobs<'c, E>(executor: E, jobs: &[JobInit]) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let now = Utc::now(); - // Flatten these jobs into a series of vecs of arguments PG can unnest - let mut ids = Vec::with_capacity(jobs.len()); - let mut team_ids = Vec::with_capacity(jobs.len()); - let mut function_ids = Vec::with_capacity(jobs.len()); - let mut created_at = Vec::with_capacity(jobs.len()); - let mut lock_ids = Vec::with_capacity(jobs.len()); - let mut last_heartbeats = Vec::with_capacity(jobs.len()); - let mut janitor_touch_counts = Vec::with_capacity(jobs.len()); - let mut transition_counts = Vec::with_capacity(jobs.len()); - let mut last_transitions = Vec::with_capacity(jobs.len()); - let mut queue_names = Vec::with_capacity(jobs.len()); - let mut states = Vec::with_capacity(jobs.len()); - let mut scheduleds = Vec::with_capacity(jobs.len()); - let mut priorities = Vec::with_capacity(jobs.len()); - let mut vm_states = Vec::with_capacity(jobs.len()); - let mut metadatas = Vec::with_capacity(jobs.len()); - let mut parameters = Vec::with_capacity(jobs.len()); - - for d in jobs { - ids.push(Uuid::now_v7()); - team_ids.push(d.team_id); - function_ids.push(d.function_id); - created_at.push(now); - lock_ids.push(None::); - last_heartbeats.push(None::>); - janitor_touch_counts.push(0); - transition_counts.push(0); - last_transitions.push(now); - queue_names.push(d.queue_name.clone()); - states.push(JobState::Available); - scheduleds.push(d.scheduled); - priorities.push(d.priority); - vm_states.push(d.vm_state.clone()); - metadatas.push(d.metadata.clone()); - parameters.push(d.parameters.clone()); - } - - // Using the "unnest" function to turn an array of rows into a set of rows - sqlx::query( - r#" -INSERT INTO cyclotron_jobs - ( - id, - team_id, - function_id, - created, - lock_id, - last_heartbeat, - janitor_touch_count, - transition_count, - last_transition, - queue_name, - state, - scheduled, - priority, - vm_state, - metadata, - parameters - ) -SELECT * -FROM UNNEST( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9, - $10, - $11, - $12, - $13, - $14, - $15, - $16 - ) -"#, - ) - .bind(ids) - .bind(team_ids) - .bind(function_ids) - .bind(created_at) - .bind(lock_ids) - .bind(last_heartbeats) - .bind(janitor_touch_counts) - .bind(transition_counts) - .bind(last_transitions) - .bind(queue_names) - .bind(states) - .bind(scheduleds) - .bind(priorities) - .bind(vm_states) - .bind(metadatas) - .bind(parameters) - .execute(executor) - .await?; - - Ok(()) -} - -// Dequeue the next job batch from the queue, skipping VM state since it can be large -pub async fn dequeue_jobs<'c, E>( - executor: E, - queue: &str, - max: usize, -) -> Result, QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - // TODO - right now, locks are completely transient. We could instead have the lock_id act like a - // "worker_id", and be provided by the caller, which would let workers do less bookkeeping, and make - // some kinds of debugging easier, but I prefer locks being opaque to workers for now, to avoid any - // confusion or potential for accidental deadlocking (e.g. if someone persisted the worker_id across - // process restarts). - let lock_id = Uuid::now_v7(); - Ok(sqlx::query_as!( - Job, - r#" -WITH available AS ( - SELECT - id, - state - FROM cyclotron_jobs - WHERE - state = 'available'::JobState - AND queue_name = $1 - AND scheduled <= NOW() - ORDER BY - priority ASC, - scheduled ASC - LIMIT $2 - FOR UPDATE SKIP LOCKED -) -UPDATE cyclotron_jobs -SET - state = 'running'::JobState, - lock_id = $3, - last_heartbeat = NOW(), - last_transition = NOW(), - transition_count = transition_count + 1 -FROM available -WHERE - cyclotron_jobs.id = available.id -RETURNING - cyclotron_jobs.id, - team_id, - available.state as "state: JobState", - queue_name, - priority, - function_id, - created, - last_transition, - scheduled, - transition_count, - NULL as vm_state, - metadata, - parameters, - lock_id, - last_heartbeat, - janitor_touch_count - "#, - queue, - max as i64, - lock_id - ) - .fetch_all(executor) - .await?) -} - -// Dequeue a batch of jobs, also returning their VM state. This is an optimisation - you could -// dequeue a batch of jobs and then fetch their VM state in a separate query, but this is hopefully less -// heavy on the DB, if a given worker knows it needs VM state for all dequeue jobs -pub async fn dequeue_with_vm_state<'c, E>( - executor: E, - queue: &str, - max: usize, -) -> Result, QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let lock_id = Uuid::now_v7(); - Ok(sqlx::query_as!( - Job, - r#" -WITH available AS ( - SELECT - id, - state - FROM cyclotron_jobs - WHERE - state = 'available'::JobState - AND queue_name = $1 - AND scheduled <= NOW() - ORDER BY - priority ASC, - scheduled ASC - LIMIT $2 - FOR UPDATE SKIP LOCKED -) -UPDATE cyclotron_jobs -SET - state = 'running'::JobState, - lock_id = $3, - last_heartbeat = NOW(), - last_transition = NOW(), - transition_count = transition_count + 1 -FROM available -WHERE - cyclotron_jobs.id = available.id -RETURNING - cyclotron_jobs.id, - team_id, - available.state as "state: JobState", - queue_name, - priority, - function_id, - created, - last_transition, - scheduled, - transition_count, - vm_state, - metadata, - parameters, - lock_id, - last_heartbeat, - janitor_touch_count - "#, - queue, - max as i64, - lock_id - ) - .fetch_all(executor) - .await?) -} - -// Grab a jobs VM state - for workers that might sometimes need a jobs vm state, but not always, -// this lets them use dequeue_jobs, and then fetch the states they need. VM state can only be retrieved -// by workers holding a job lock -pub async fn get_vm_state<'c, E>( - executor: E, - job_id: Uuid, - lock_id: Uuid, -) -> Result, QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - struct VMState { - vm_state: Option, - } - - // We use fetch_on here because giving us an unknown ID is an error - let res = sqlx::query_as!( - VMState, - "SELECT vm_state FROM cyclotron_jobs WHERE id = $1 AND lock_id = $2", - job_id, - lock_id - ) - .fetch_one(executor) - .await?; - - Ok(res.vm_state) -} - -// A struct representing a set of updates for a job. Outer none values mean "don't update this field", -// with nested none values meaning "set this field to null" for nullable fields -#[derive(Debug, Deserialize, Serialize)] -pub struct JobUpdate { - pub lock_id: Uuid, // The ID of the lock acquired when this worker dequeued the job, required for any update to be valid - pub state: Option, - pub queue_name: Option, - pub priority: Option, - pub scheduled: Option>, - pub vm_state: Option>, - pub metadata: Option>, - pub parameters: Option>, -} - -impl JobUpdate { - pub fn new(lock_id: Uuid) -> Self { - Self { - lock_id, - state: None, - queue_name: None, - priority: None, - scheduled: None, - vm_state: None, - metadata: None, - parameters: None, - } - } -} - -// TODO - I should think about a bulk-flush interface at /some/ point, although we expect jobs to be -// high variance with respect to work time, so maybe that wouldn't be that useful in the end. -// TODO - this isn't the cheapest way to update a row in a table... I could probably do better by instead -// using a query builder, but I wanted sqlx's nice macro handling, at least while iterating on the schema. -// If/when we start hitting perf issues, this is a good place to start. -// NOTE - this function permits multiple flushes to the same job, without losing the lock on it, but -// high level implementations are recommended to avoid this - ideally, for every de/requeue, there should be -// exactly 2 database operations. -pub async fn flush_job<'c, C>( - connection: &mut C, - job_id: Uuid, - updates: JobUpdate, -) -> Result<(), QueueError> -where - C: sqlx::Connection, -{ - let mut txn = connection.begin().await?; - - // Flushing any job state except "running" is a signal that the worker no longer holds this job - let job_returned = !matches!(updates.state, Some(JobState::Running)); - let lock_id = updates.lock_id; - - if let Some(state) = updates.state { - set_state(&mut *txn, job_id, updates.lock_id, state).await?; - } - - if let Some(queue_name) = updates.queue_name { - set_queue(&mut *txn, job_id, &queue_name, lock_id).await?; - } - - if let Some(priority) = updates.priority { - set_priority(&mut *txn, job_id, lock_id, priority).await?; - } - - if let Some(scheduled) = updates.scheduled { - set_scheduled(&mut *txn, job_id, scheduled, lock_id).await?; - } - - if let Some(vm_state) = updates.vm_state { - set_vm_state(&mut *txn, job_id, vm_state, lock_id).await?; - } - - if let Some(metadata) = updates.metadata { - set_metadata(&mut *txn, job_id, metadata, lock_id).await?; - } - - if let Some(parameters) = updates.parameters { - set_parameters(&mut *txn, job_id, parameters, lock_id).await?; - } - - // Calling flush indicates forward progress, so we should touch the heartbeat - set_heartbeat(&mut *txn, job_id, lock_id).await?; - - // We do this here, instead of in the set_state call, because otherwise the lock_id passed to other - // updates would be invalid - if job_returned { - let query = sqlx::query!( - "UPDATE cyclotron_jobs SET lock_id = NULL, last_heartbeat = NULL WHERE id = $1 AND lock_id = $2", - job_id, - lock_id - ); - assert_does_update(&mut *txn, job_id, lock_id, query).await?; - } - - txn.commit().await?; - - Ok(()) -} - -// Simple wrapper, that just executes a query and throws an error if no rows were affected -async fn assert_does_update<'c, E>( - executor: E, - job_id: Uuid, - lock_id: Uuid, - query: Query<'_, sqlx::Postgres, PgArguments>, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let res = query.execute(executor).await?; - throw_if_no_rows(res, job_id, lock_id) -} - -// Most of the rest of these functions are designed to be used as part of larger transactions, e.g. -// "completing" a job means updating various rows and then marking it complete, and we can build that -// by composing a set of individual queries together using a transaction. -// Update the state of a job, also tracking the transition count and last transition time -pub async fn set_state<'c, E>( - executor: E, - job_id: Uuid, - lock_id: Uuid, - state: JobState, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - r#"UPDATE cyclotron_jobs - SET state = $1, last_transition = NOW(), transition_count = transition_count + 1 - WHERE id = $2 AND lock_id = $3"#, - state as _, - job_id, - lock_id - ); - - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_queue<'c, E>( - executor: E, - job_id: Uuid, - queue: &str, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET queue_name = $1 WHERE id = $2 AND lock_id = $3", - queue, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_priority<'c, E>( - executor: E, - job_id: Uuid, - lock_id: Uuid, - priority: i16, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET priority = $1 WHERE id = $2 AND lock_id = $3", - priority, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_scheduled<'c, E>( - executor: E, - job_id: Uuid, - scheduled: DateTime, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET scheduled = $1 WHERE id = $2 AND lock_id = $3", - scheduled, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_vm_state<'c, E>( - executor: E, - job_id: Uuid, - vm_state: Option, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET vm_state = $1 WHERE id = $2 AND lock_id = $3", - vm_state, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_metadata<'c, E>( - executor: E, - job_id: Uuid, - metadata: Option, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET metadata = $1 WHERE id = $2 AND lock_id = $3", - metadata, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_parameters<'c, E>( - executor: E, - job_id: Uuid, - parameters: Option, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET parameters = $1 WHERE id = $2 AND lock_id = $3", - parameters, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_heartbeat<'c, E>( - executor: E, - job_id: Uuid, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET last_heartbeat = NOW() WHERE id = $1 AND lock_id = $2", - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let res = sqlx::query!( - "SELECT COUNT(*) FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW()", - ) - .fetch_one(executor) - .await?; - - let res = res.count.unwrap_or(0); - Ok(res as u64) -} - -fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(), QueueError> { - if res.rows_affected() == 0 { - Err(QueueError::InvalidLock(lock, job)) - } else { - Ok(()) - } -} diff --git a/rust/cyclotron-core/src/bin/create_test_data.rs b/rust/cyclotron-core/src/bin/create_test_data.rs index e3010fe245a6b..ce875c676c98b 100644 --- a/rust/cyclotron-core/src/bin/create_test_data.rs +++ b/rust/cyclotron-core/src/bin/create_test_data.rs @@ -1,9 +1,5 @@ use chrono::{Duration, Utc}; -use cyclotron_core::{ - base_ops::JobInit, - manager::{ManagerConfig, QueueManager}, - PoolConfig, -}; +use cyclotron_core::{JobInit, ManagerConfig, PoolConfig, QueueManager}; use uuid::Uuid; // Just inserts jobs as fast as it can, choosing randomly between hog and fetch workers, and between different priorities. diff --git a/rust/cyclotron-core/src/bin/load_test.rs b/rust/cyclotron-core/src/bin/load_test.rs index 21a47774e486e..16dd825c1c305 100644 --- a/rust/cyclotron-core/src/bin/load_test.rs +++ b/rust/cyclotron-core/src/bin/load_test.rs @@ -4,12 +4,7 @@ use std::{ }; use chrono::{Duration, Utc}; -use cyclotron_core::{ - base_ops::{JobInit, JobState}, - manager::{ManagerConfig, QueueManager}, - worker::Worker, - PoolConfig, -}; +use cyclotron_core::{JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; use futures::future::join_all; use uuid::Uuid; diff --git a/rust/cyclotron-core/src/config.rs b/rust/cyclotron-core/src/config.rs new file mode 100644 index 0000000000000..8304816671435 --- /dev/null +++ b/rust/cyclotron-core/src/config.rs @@ -0,0 +1,42 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use sqlx::{pool::PoolOptions, PgPool}; + +// A pool config object, designed to be passable across API boundaries +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PoolConfig { + pub db_url: String, + pub max_connections: Option, // Default to 10 + pub min_connections: Option, // Default to 1 + pub acquire_timeout_seconds: Option, // Default to 30 + pub max_lifetime_seconds: Option, // Default to 300 + pub idle_timeout_seconds: Option, // Default to 60 +} + +impl PoolConfig { + pub async fn connect(&self) -> Result { + let builder = PoolOptions::new() + .max_connections(self.max_connections.unwrap_or(10)) + .min_connections(self.min_connections.unwrap_or(1)) + .max_lifetime(Duration::from_secs( + self.max_lifetime_seconds.unwrap_or(300), + )) + .idle_timeout(Duration::from_secs(self.idle_timeout_seconds.unwrap_or(60))) + .acquire_timeout(Duration::from_secs( + self.acquire_timeout_seconds.unwrap_or(30), + )); + + builder.connect(&self.db_url).await + } +} + +pub const DEFAULT_QUEUE_DEPTH_LIMIT: u64 = 10_000; +pub const DEFAULT_SHARD_HEALTH_CHECK_INTERVAL: u64 = 10; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ManagerConfig { + pub shards: Vec, + pub shard_depth_limit: Option, // Defaults to 10_000 available jobs per shard + pub shard_depth_check_interval_seconds: Option, // Defaults to 10 seconds - checking shard capacity +} diff --git a/rust/cyclotron-core/src/error.rs b/rust/cyclotron-core/src/error.rs index 1a95305f4fd18..4e870e75a2d59 100644 --- a/rust/cyclotron-core/src/error.rs +++ b/rust/cyclotron-core/src/error.rs @@ -5,7 +5,7 @@ pub enum QueueError { #[error("sqlx error: {0}")] SqlxError(#[from] sqlx::Error), #[error("Unknown job id: {0}")] - UnknownJobId(Uuid), // Happens when someone tries to update a job through a QueueManager that wasn't dequeue or was already flushed + UnknownJobId(Uuid), #[error("Job {0} flushed without a new state, which would leave it in a running state forever (or until reaped)")] FlushWithoutNextState(Uuid), #[error("Invalid lock {0} used to update job {1}. This usually means a job has been reaped from under a worker - did you forget to set the heartbeat?")] diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index a62d69b304b6c..ebc72ad0b9ea7 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -1,38 +1,42 @@ -use std::time::Duration; - -use serde::{Deserialize, Serialize}; -use sqlx::{pool::PoolOptions, PgPool}; - -pub mod base_ops; -pub mod error; -pub mod janitor_ops; -pub mod manager; -pub mod worker; - -// A pool config object, designed to be passable across API boundaries -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct PoolConfig { - pub db_url: String, - pub max_connections: Option, // Default to 10 - pub min_connections: Option, // Default to 1 - pub acquire_timeout_seconds: Option, // Default to 30 - pub max_lifetime_seconds: Option, // Default to 300 - pub idle_timeout_seconds: Option, // Default to 60 -} +mod ops; + +// We do this pattern (privately use a module, then re-export parts of it) so we can refactor/rename or generally futz around with the internals without breaking the public API + +// Types +mod types; +pub use types::BulkInsertResult; +pub use types::Job; +pub use types::JobInit; +pub use types::JobState; +pub use types::JobUpdate; + +// Errors +mod error; +pub use error::QueueError; + +// Manager +mod manager; +pub use manager::QueueManager; + +// Worker +mod worker; +pub use worker::Worker; + +// Janitor operations are exposed directly for now (and only the janitor impl uses them) +pub use ops::janitor::delete_completed_jobs; +pub use ops::janitor::delete_failed_jobs; +pub use ops::janitor::delete_poison_pills; +pub use ops::janitor::reset_stalled_jobs; + +// We also expose some handly meta operations +pub use ops::meta::count_total_waiting_jobs; + +// Config +mod config; +pub use config::ManagerConfig; +pub use config::PoolConfig; -impl PoolConfig { - pub async fn connect(&self) -> Result { - let builder = PoolOptions::new() - .max_connections(self.max_connections.unwrap_or(10)) - .min_connections(self.min_connections.unwrap_or(1)) - .max_lifetime(Duration::from_secs( - self.max_lifetime_seconds.unwrap_or(300), - )) - .idle_timeout(Duration::from_secs(self.idle_timeout_seconds.unwrap_or(60))) - .acquire_timeout(Duration::from_secs( - self.acquire_timeout_seconds.unwrap_or(30), - )); - - builder.connect(&self.db_url).await - } +#[doc(hidden)] +pub mod test_support { + pub use crate::manager::Shard; } diff --git a/rust/cyclotron-core/src/manager.rs b/rust/cyclotron-core/src/manager.rs index 2cafdee9c91b5..6339c4e9cf4ed 100644 --- a/rust/cyclotron-core/src/manager.rs +++ b/rust/cyclotron-core/src/manager.rs @@ -1,30 +1,18 @@ use std::sync::atomic::AtomicUsize; use chrono::{DateTime, Duration, Utc}; -use serde::{Deserialize, Serialize}; use sqlx::PgPool; use tokio::sync::RwLock; use crate::{ - base_ops::{bulk_create_jobs, count_total_waiting_jobs, create_job, JobInit}, - error::QueueError, - PoolConfig, + config::{DEFAULT_QUEUE_DEPTH_LIMIT, DEFAULT_SHARD_HEALTH_CHECK_INTERVAL}, + ops::{ + manager::{bulk_create_jobs, create_job}, + meta::count_total_waiting_jobs, + }, + BulkInsertResult, JobInit, ManagerConfig, QueueError, }; -pub const DEFAULT_QUEUE_DEPTH_LIMIT: u64 = 10_000; -pub const DEFAULT_SHARD_HEALTH_CHECK_INTERVAL: u64 = 10; - -// TODO - right now, a lot of this sharding stuff will be hollow, but later we'll add logic like -// e.g. routing work to alive shards if one is down, or reporting shard failure, etc. -// TODO - here's also where queue management commands will go, like "downgrade the priority of this function" -// or "pause jobs for this team", but we're going to add those ad-hoc as they're needed, not up front -#[derive(Debug, Serialize, Deserialize)] -pub struct ManagerConfig { - pub shards: Vec, - pub shard_depth_limit: Option, // Defaults to 10_000 available jobs per shard - pub shard_depth_check_interval_seconds: Option, // Defaults to 10 seconds - checking shard capacity -} - pub struct Shard { pub pool: PgPool, pub last_healthy: RwLock>, @@ -37,12 +25,6 @@ pub struct QueueManager { next_shard: AtomicUsize, } -// Bulk inserts across multiple shards can partially succeed, so we need to track failures -// and hand back failed job inits to the caller. -pub struct BulkInsertResult { - pub failures: Vec<(QueueError, Vec)>, -} - impl QueueManager { pub async fn new(config: ManagerConfig) -> Result { let mut shards = vec![]; @@ -65,7 +47,7 @@ impl QueueManager { }) } - // Designed mostly to be used for testing, but safe enough to expose publicly + #[doc(hidden)] // Mostly for testing, but safe to expose pub fn from_pool(pool: PgPool) -> Self { Self { shards: RwLock::new(vec![Shard::new( @@ -188,6 +170,7 @@ impl Shard { create_job(&self.pool, init).await } + // As above, with the same caveats about what "capacity" means pub async fn bulk_create_jobs_blocking( &self, inits: &[JobInit], @@ -240,23 +223,3 @@ impl Shard { Ok(is_full) } } - -impl BulkInsertResult { - pub fn new() -> Self { - Self { failures: vec![] } - } - - pub fn add_failure(&mut self, err: QueueError, jobs: Vec) { - self.failures.push((err, jobs)); - } - - pub fn all_succeeded(&self) -> bool { - self.failures.is_empty() - } -} - -impl Default for BulkInsertResult { - fn default() -> Self { - Self::new() - } -} diff --git a/rust/cyclotron-core/src/janitor_ops.rs b/rust/cyclotron-core/src/ops/janitor.rs similarity index 62% rename from rust/cyclotron-core/src/janitor_ops.rs rename to rust/cyclotron-core/src/ops/janitor.rs index d2d1c947dd560..df77732a17110 100644 --- a/rust/cyclotron-core/src/janitor_ops.rs +++ b/rust/cyclotron-core/src/ops/janitor.rs @@ -30,9 +30,7 @@ where } // Jobs are considered stalled if their lock is held and their last_heartbeat is older than `timeout`. -// NOTE - because this runs on running jobs, it can stall workers trying to flush updates as it -// executes. I need to use some of the load generators alongside explain/analyze to optimise this (and -// the set of DB indexes) +// // TODO - this /could/ return the lock_id's held, which might help with debugging (if workers reported // the lock_id's they dequeue'd), but lets not do that right now. pub async fn reset_stalled_jobs<'c, E>(executor: E, timeout: Duration) -> Result @@ -58,14 +56,10 @@ WHERE cyclotron_jobs.id = stalled.id Ok(result.rows_affected()) } -// Poison pills are jobs whose lock is held and whose heartbeat is older than `timeout`, that have -// been returned to the queue by the janitor more than `max_janitor_touched` times. -// NOTE - this has the same performance caveat as reset_stalled_jobs -// TODO - This shoud, instead, move the job row to a dead letter table, for later investigation. Of course, -// rather than doing that, it could just put the job in a "dead letter" state, and no worker or janitor process -// will touch it... maybe the table moving isn't needed? but either way, being able to debug jobs that cause workers -// to stall would be good (and, thinking about it, moving it to a new table means we don't have to clear the lock, -// so have a potential way to trace back to the last worker that died holding the job) +// Poison pills are stalled jobs that have been reset by the janitor more than `max_janitor_touched` times. +// +// TODO - investigating poision pills is important. We should consider moving them into a separate table or queue, rather than +// deleting them, so we can investigate why they're stalling/killing workers. pub async fn delete_poison_pills<'c, E>( executor: E, timeout: Duration, @@ -75,11 +69,8 @@ where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { let oldest_valid_heartbeat = Utc::now() - timeout; - // NOTE - we don't check the lock_id here, because it probably doesn't matter (the lock_id should be set if the - // job state is "running"), but perhaps we should only delete jobs with a set lock_id, and report an error - // if we find a job with a state of "running" and no lock_id. Also, we delete jobs whose last_heartbeat is - // null, which again should never happen (dequeuing a job should always set the last_heartbeat), but for - // robustness sake we may as well handle it + // KLUDGE - the lock_id being set isn't checked here. A job in a running state without a lock id is violating an invariant, + // and would be useful to report. let result = sqlx::query!( r#" DELETE FROM cyclotron_jobs WHERE state = 'running' AND COALESCE(last_heartbeat, $1) <= $1 AND janitor_touch_count >= $2 diff --git a/rust/cyclotron-core/src/ops/manager.rs b/rust/cyclotron-core/src/ops/manager.rs new file mode 100644 index 0000000000000..8c2ec30372adf --- /dev/null +++ b/rust/cyclotron-core/src/ops/manager.rs @@ -0,0 +1,160 @@ +use chrono::{DateTime, Utc}; +use uuid::Uuid; + +use crate::{ + error::QueueError, + types::{JobInit, JobState}, +}; + +pub async fn create_job<'c, E>(executor: E, data: JobInit) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let id = Uuid::now_v7(); + sqlx::query!( + r#" +INSERT INTO cyclotron_jobs + ( + id, + team_id, + function_id, + created, + lock_id, + last_heartbeat, + janitor_touch_count, + transition_count, + last_transition, + queue_name, + state, + scheduled, + priority, + vm_state, + metadata, + parameters + ) +VALUES + ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10) + "#, + id, + data.team_id, + data.function_id, + data.queue_name, + JobState::Available as _, + data.scheduled, + data.priority, + data.vm_state, + data.metadata, + data.parameters + ) + .execute(executor) + .await?; + + Ok(()) +} + +pub async fn bulk_create_jobs<'c, E>(executor: E, jobs: &[JobInit]) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let now = Utc::now(); + // Flatten these jobs into a series of vecs of arguments PG can unnest + let mut ids = Vec::with_capacity(jobs.len()); + let mut team_ids = Vec::with_capacity(jobs.len()); + let mut function_ids = Vec::with_capacity(jobs.len()); + let mut created_at = Vec::with_capacity(jobs.len()); + let mut lock_ids = Vec::with_capacity(jobs.len()); + let mut last_heartbeats = Vec::with_capacity(jobs.len()); + let mut janitor_touch_counts = Vec::with_capacity(jobs.len()); + let mut transition_counts = Vec::with_capacity(jobs.len()); + let mut last_transitions = Vec::with_capacity(jobs.len()); + let mut queue_names = Vec::with_capacity(jobs.len()); + let mut states = Vec::with_capacity(jobs.len()); + let mut scheduleds = Vec::with_capacity(jobs.len()); + let mut priorities = Vec::with_capacity(jobs.len()); + let mut vm_states = Vec::with_capacity(jobs.len()); + let mut metadatas = Vec::with_capacity(jobs.len()); + let mut parameters = Vec::with_capacity(jobs.len()); + + for d in jobs { + ids.push(Uuid::now_v7()); + team_ids.push(d.team_id); + function_ids.push(d.function_id); + created_at.push(now); + lock_ids.push(None::); + last_heartbeats.push(None::>); + janitor_touch_counts.push(0); + transition_counts.push(0); + last_transitions.push(now); + queue_names.push(d.queue_name.clone()); + states.push(JobState::Available); + scheduleds.push(d.scheduled); + priorities.push(d.priority); + vm_states.push(d.vm_state.clone()); + metadatas.push(d.metadata.clone()); + parameters.push(d.parameters.clone()); + } + + // Using the "unnest" function to turn an array of rows into a set of rows + sqlx::query( + r#" +INSERT INTO cyclotron_jobs + ( + id, + team_id, + function_id, + created, + lock_id, + last_heartbeat, + janitor_touch_count, + transition_count, + last_transition, + queue_name, + state, + scheduled, + priority, + vm_state, + metadata, + parameters + ) +SELECT * +FROM UNNEST( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16 + ) +"#, + ) + .bind(ids) + .bind(team_ids) + .bind(function_ids) + .bind(created_at) + .bind(lock_ids) + .bind(last_heartbeats) + .bind(janitor_touch_counts) + .bind(transition_counts) + .bind(last_transitions) + .bind(queue_names) + .bind(states) + .bind(scheduleds) + .bind(priorities) + .bind(vm_states) + .bind(metadatas) + .bind(parameters) + .execute(executor) + .await?; + + Ok(()) +} diff --git a/rust/cyclotron-core/src/ops/meta.rs b/rust/cyclotron-core/src/ops/meta.rs new file mode 100644 index 0000000000000..7257b1f6b36dc --- /dev/null +++ b/rust/cyclotron-core/src/ops/meta.rs @@ -0,0 +1,26 @@ +use sqlx::postgres::PgQueryResult; +use uuid::Uuid; + +use crate::error::QueueError; + +pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let res = sqlx::query!( + "SELECT COUNT(*) FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW()", + ) + .fetch_one(executor) + .await?; + + let res = res.count.unwrap_or(0); + Ok(res as u64) +} + +pub fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(), QueueError> { + if res.rows_affected() == 0 { + Err(QueueError::InvalidLock(lock, job)) + } else { + Ok(()) + } +} diff --git a/rust/cyclotron-core/src/ops/mod.rs b/rust/cyclotron-core/src/ops/mod.rs new file mode 100644 index 0000000000000..e0848468bbc89 --- /dev/null +++ b/rust/cyclotron-core/src/ops/mod.rs @@ -0,0 +1,4 @@ +pub mod janitor; +pub mod manager; +pub mod meta; +pub mod worker; diff --git a/rust/cyclotron-core/src/ops/worker.rs b/rust/cyclotron-core/src/ops/worker.rs new file mode 100644 index 0000000000000..efff1faf2bff6 --- /dev/null +++ b/rust/cyclotron-core/src/ops/worker.rs @@ -0,0 +1,398 @@ +use chrono::{DateTime, Utc}; +use sqlx::{postgres::PgArguments, query::Query}; +use uuid::Uuid; + +use crate::{ + error::QueueError, + types::{Job, JobState, JobUpdate}, +}; + +use super::meta::throw_if_no_rows; + +// Dequeue the next job batch from the queue, skipping VM state since it can be large +pub async fn dequeue_jobs<'c, E>( + executor: E, + queue: &str, + max: usize, +) -> Result, QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + // Transient lock id. This could be a worker ID, or something, but for now it's totally random (per-batch) + let lock_id = Uuid::now_v7(); + Ok(sqlx::query_as!( + Job, + r#" +WITH available AS ( + SELECT + id, + state + FROM cyclotron_jobs + WHERE + state = 'available'::JobState + AND queue_name = $1 + AND scheduled <= NOW() + ORDER BY + priority ASC, + scheduled ASC + LIMIT $2 + FOR UPDATE SKIP LOCKED +) +UPDATE cyclotron_jobs +SET + state = 'running'::JobState, + lock_id = $3, + last_heartbeat = NOW(), + last_transition = NOW(), + transition_count = transition_count + 1 +FROM available +WHERE + cyclotron_jobs.id = available.id +RETURNING + cyclotron_jobs.id, + team_id, + available.state as "state: JobState", + queue_name, + priority, + function_id, + created, + last_transition, + scheduled, + transition_count, + NULL as vm_state, + metadata, + parameters, + lock_id, + last_heartbeat, + janitor_touch_count + "#, + queue, + max as i64, + lock_id + ) + .fetch_all(executor) + .await?) +} + +// Dequeue a batch of jobs, with their VM state. +pub async fn dequeue_with_vm_state<'c, E>( + executor: E, + queue: &str, + max: usize, +) -> Result, QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let lock_id = Uuid::now_v7(); + Ok(sqlx::query_as!( + Job, + r#" +WITH available AS ( + SELECT + id, + state + FROM cyclotron_jobs + WHERE + state = 'available'::JobState + AND queue_name = $1 + AND scheduled <= NOW() + ORDER BY + priority ASC, + scheduled ASC + LIMIT $2 + FOR UPDATE SKIP LOCKED +) +UPDATE cyclotron_jobs +SET + state = 'running'::JobState, + lock_id = $3, + last_heartbeat = NOW(), + last_transition = NOW(), + transition_count = transition_count + 1 +FROM available +WHERE + cyclotron_jobs.id = available.id +RETURNING + cyclotron_jobs.id, + team_id, + available.state as "state: JobState", + queue_name, + priority, + function_id, + created, + last_transition, + scheduled, + transition_count, + vm_state, + metadata, + parameters, + lock_id, + last_heartbeat, + janitor_touch_count + "#, + queue, + max as i64, + lock_id + ) + .fetch_all(executor) + .await?) +} + +pub async fn get_vm_state<'c, E>( + executor: E, + job_id: Uuid, + lock_id: Uuid, +) -> Result, QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + struct VMState { + vm_state: Option, + } + + let res = sqlx::query_as!( + VMState, + "SELECT vm_state FROM cyclotron_jobs WHERE id = $1 AND lock_id = $2", + job_id, + lock_id + ) + .fetch_one(executor) + .await?; + + Ok(res.vm_state) +} + +// TODO - this isn't the cheapest way to update a row in a table... we could probably do better by instead +// using a query builder, but that means no longer using query_as! and query! macros, unfortunately. +// If/when we start hitting perf issues, this is a good place to start. +// +// NOTE - this clears the lock_id when the job state is set to anything other than "running", since that indicates +// the worker is finished with the job. This means subsequent flushes with the same lock_id will fail. +pub async fn flush_job<'c, C>( + connection: &mut C, + job_id: Uuid, + updates: JobUpdate, +) -> Result<(), QueueError> +where + C: sqlx::Connection, +{ + let mut txn = connection.begin().await?; + + let job_returned = !matches!(updates.state, Some(JobState::Running)); + let lock_id = updates.lock_id; + + if let Some(state) = updates.state { + set_state(&mut *txn, job_id, lock_id, state).await?; + } + + if let Some(queue_name) = updates.queue_name { + set_queue(&mut *txn, job_id, &queue_name, lock_id).await?; + } + + if let Some(priority) = updates.priority { + set_priority(&mut *txn, job_id, lock_id, priority).await?; + } + + if let Some(scheduled) = updates.scheduled { + set_scheduled(&mut *txn, job_id, scheduled, lock_id).await?; + } + + if let Some(vm_state) = updates.vm_state { + set_vm_state(&mut *txn, job_id, vm_state, lock_id).await?; + } + + if let Some(metadata) = updates.metadata { + set_metadata(&mut *txn, job_id, metadata, lock_id).await?; + } + + if let Some(parameters) = updates.parameters { + set_parameters(&mut *txn, job_id, parameters, lock_id).await?; + } + + // Calling flush indicates forward progress, so we should touch the heartbeat + set_heartbeat(&mut *txn, job_id, lock_id).await?; + + // We do this here, instead of in the set_state call, because otherwise the lock_id passed to other + // updates would be invalid + if job_returned { + let query = sqlx::query!( + "UPDATE cyclotron_jobs SET lock_id = NULL, last_heartbeat = NULL WHERE id = $1 AND lock_id = $2", + job_id, + lock_id + ); + assert_does_update(&mut *txn, job_id, lock_id, query).await?; + } + + txn.commit().await?; + + Ok(()) +} + +// ---------------------- +// Setters +// +// Most of the rest of these functions are designed to be used as part of larger transactions, e.g. +// "completing" a job means updating various rows and then marking it complete, and we can build that +// by composing a set of individual queries together using a transaction. +// +// ---------------------- + +// Update the state of a job, also tracking the transition count and last transition time +pub async fn set_state<'c, E>( + executor: E, + job_id: Uuid, + lock_id: Uuid, + state: JobState, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + r#"UPDATE cyclotron_jobs + SET state = $1, last_transition = NOW(), transition_count = transition_count + 1 + WHERE id = $2 AND lock_id = $3"#, + state as _, + job_id, + lock_id + ); + + assert_does_update(executor, job_id, lock_id, q).await +} + +pub async fn set_queue<'c, E>( + executor: E, + job_id: Uuid, + queue: &str, + lock_id: Uuid, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET queue_name = $1 WHERE id = $2 AND lock_id = $3", + queue, + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + +pub async fn set_priority<'c, E>( + executor: E, + job_id: Uuid, + lock_id: Uuid, + priority: i16, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET priority = $1 WHERE id = $2 AND lock_id = $3", + priority, + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + +pub async fn set_scheduled<'c, E>( + executor: E, + job_id: Uuid, + scheduled: DateTime, + lock_id: Uuid, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET scheduled = $1 WHERE id = $2 AND lock_id = $3", + scheduled, + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + +pub async fn set_vm_state<'c, E>( + executor: E, + job_id: Uuid, + vm_state: Option, + lock_id: Uuid, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET vm_state = $1 WHERE id = $2 AND lock_id = $3", + vm_state, + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + +pub async fn set_metadata<'c, E>( + executor: E, + job_id: Uuid, + metadata: Option, + lock_id: Uuid, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET metadata = $1 WHERE id = $2 AND lock_id = $3", + metadata, + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + +pub async fn set_parameters<'c, E>( + executor: E, + job_id: Uuid, + parameters: Option, + lock_id: Uuid, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET parameters = $1 WHERE id = $2 AND lock_id = $3", + parameters, + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + +pub async fn set_heartbeat<'c, E>( + executor: E, + job_id: Uuid, + lock_id: Uuid, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET last_heartbeat = NOW() WHERE id = $1 AND lock_id = $2", + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + +// Simple wrapper, that just executes a query and throws an error if no rows were affected +async fn assert_does_update<'c, E>( + executor: E, + job_id: Uuid, + lock_id: Uuid, + query: Query<'_, sqlx::Postgres, PgArguments>, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let res = query.execute(executor).await?; + throw_if_no_rows(res, job_id, lock_id) +} diff --git a/rust/cyclotron-core/src/types.rs b/rust/cyclotron-core/src/types.rs new file mode 100644 index 0000000000000..8e0a11a6a822a --- /dev/null +++ b/rust/cyclotron-core/src/types.rs @@ -0,0 +1,139 @@ +use std::str::FromStr; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; +use uuid::Uuid; + +use crate::QueueError; + +#[derive(Debug, Deserialize, Serialize, sqlx::Type)] +#[serde(rename_all = "lowercase")] +#[sqlx(type_name = "JobState", rename_all = "lowercase")] +pub enum JobState { + Available, + Running, + Completed, + Failed, + Paused, +} + +impl FromStr for JobState { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "available" => Ok(JobState::Available), + "running" => Ok(JobState::Running), + "completed" => Ok(JobState::Completed), + "failed" => Ok(JobState::Failed), + _ => Err(()), + } + } +} + +impl PgHasArrayType for JobState { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + // Postgres default naming convention for array types is "_typename" + PgTypeInfo::with_name("_JobState") + } +} + +// The chunk of data needed to enqueue a job +#[derive(Debug, Deserialize, Serialize, Clone, Eq, PartialEq)] +pub struct JobInit { + pub team_id: i32, + pub queue_name: String, + pub priority: i16, + pub scheduled: DateTime, + pub function_id: Option, + pub vm_state: Option, + pub parameters: Option, + pub metadata: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Job { + // Job metadata + pub id: Uuid, + pub team_id: i32, + pub function_id: Option, // Some jobs might not come from hog, and it doesn't /kill/ use to support that + pub created: DateTime, + + // Queue bookkeeping + // This will be set for any worker that ever has a job in the "running" state (so any worker that dequeues a job) + // but I don't want to do the work to encode that in the type system right now - later it should be + pub lock_id: Option, + pub last_heartbeat: Option>, + pub janitor_touch_count: i16, + pub transition_count: i16, + pub last_transition: DateTime, + + // Virtual queue components + pub queue_name: String, // We can have multiple "virtual queues" workers pull from + + // Job availability + pub state: JobState, + pub priority: i16, // For sorting "available" jobs. Lower is higher priority + pub scheduled: DateTime, + + // Job data + pub vm_state: Option, // The state of the VM this job is running on (if it exists) + pub metadata: Option, // Additional fields a worker can tack onto a job, for e.g. tracking some state across retries (or number of retries in general by a given class of worker) + pub parameters: Option, // The actual parameters of the job (function args for a hog function, http request for a fetch function) +} + +// A struct representing a set of updates for a job. Outer none values mean "don't update this field", +// with nested none values meaning "set this field to null" for nullable fields +#[derive(Debug, Deserialize, Serialize)] +pub struct JobUpdate { + pub lock_id: Uuid, // The ID of the lock acquired when this worker dequeued the job, required for any update to be valid + pub state: Option, + pub queue_name: Option, + pub priority: Option, + pub scheduled: Option>, + pub vm_state: Option>, + pub metadata: Option>, + pub parameters: Option>, +} + +impl JobUpdate { + pub fn new(lock_id: Uuid) -> Self { + Self { + lock_id, + state: None, + queue_name: None, + priority: None, + scheduled: None, + vm_state: None, + metadata: None, + parameters: None, + } + } +} + +// Bulk inserts across multiple shards can partially succeed, so we need to track failures +// and hand back failed job inits to the caller. +pub struct BulkInsertResult { + pub failures: Vec<(QueueError, Vec)>, +} + +impl BulkInsertResult { + pub fn new() -> Self { + Self { failures: vec![] } + } + + pub fn add_failure(&mut self, err: QueueError, jobs: Vec) { + self.failures.push((err, jobs)); + } + + pub fn all_succeeded(&self) -> bool { + self.failures.is_empty() + } +} + +impl Default for BulkInsertResult { + fn default() -> Self { + Self::new() + } +} diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index 431bd22f447d0..337e6f9a82a0f 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -6,11 +6,8 @@ use std::sync::Mutex; use uuid::Uuid; use crate::{ - base_ops::{ - dequeue_jobs, dequeue_with_vm_state, flush_job, set_heartbeat, Job, JobState, JobUpdate, - }, - error::QueueError, - PoolConfig, + ops::worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat}, + Job, JobState, JobUpdate, PoolConfig, QueueError, }; // The worker's interface to the underlying queue system - a worker can do everything except @@ -92,6 +89,20 @@ impl Worker { Ok(jobs) } + /// Retrieve the VM state for a job, if, for example, you dequeued it and then realised you + /// need the VM state as well. + pub async fn get_vm_state(&self, job_id: Uuid) -> Result, QueueError> { + let lock_id = { + let pending = self.pending.lock().unwrap(); + pending + .get(&job_id) + .ok_or(QueueError::UnknownJobId(job_id))? + .lock_id + }; + + get_vm_state(&self.pool, job_id, lock_id).await + } + /// NOTE - This function can only be called once, even though the underlying /// basic operation can be performed as many times as the caller likes (so long as /// the job state is never set to something other than running, as that clears the diff --git a/rust/cyclotron-core/tests/base_ops.rs b/rust/cyclotron-core/tests/base_ops.rs index 4b5684f6bea02..36ceb7af4743e 100644 --- a/rust/cyclotron-core/tests/base_ops.rs +++ b/rust/cyclotron-core/tests/base_ops.rs @@ -2,11 +2,7 @@ use std::sync::Arc; use chrono::{Duration, Utc}; use common::{assert_job_matches_init, create_new_job, dates_match}; -use cyclotron_core::{ - base_ops::{bulk_create_jobs, JobState}, - manager::QueueManager, - worker::Worker, -}; +use cyclotron_core::{JobState, QueueManager, Worker}; use sqlx::PgPool; use uuid::Uuid; @@ -233,6 +229,7 @@ async fn test_queue(db: PgPool) { #[sqlx::test(migrations = "./migrations")] pub async fn test_bulk_insert(db: PgPool) { let worker = Worker::from_pool(db.clone()); + let manager = QueueManager::from_pool(db.clone()); let job_template = create_new_job(); @@ -244,7 +241,8 @@ pub async fn test_bulk_insert(db: PgPool) { }) .collect::>(); - bulk_create_jobs(&db, &jobs).await.unwrap(); + let result = manager.bulk_create_jobs(jobs).await; + assert!(result.all_succeeded()); let dequeue_jobs = worker .dequeue_jobs(&job_template.queue_name, 1000) diff --git a/rust/cyclotron-core/tests/common.rs b/rust/cyclotron-core/tests/common.rs index 0746e27590d8f..b1e6d3e715de2 100644 --- a/rust/cyclotron-core/tests/common.rs +++ b/rust/cyclotron-core/tests/common.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Duration, Utc}; -use cyclotron_core::base_ops::{Job, JobInit}; +use cyclotron_core::{Job, JobInit}; use uuid::Uuid; #[allow(dead_code)] diff --git a/rust/cyclotron-core/tests/shard.rs b/rust/cyclotron-core/tests/shard.rs index cade4458162d2..8446a0c2e9f28 100644 --- a/rust/cyclotron-core/tests/shard.rs +++ b/rust/cyclotron-core/tests/shard.rs @@ -1,6 +1,6 @@ use chrono::{Duration, Utc}; use common::create_new_job; -use cyclotron_core::manager::Shard; +use cyclotron_core::test_support::Shard; use sqlx::PgPool; use tokio::sync::RwLock; diff --git a/rust/cyclotron-fetch/src/context.rs b/rust/cyclotron-fetch/src/context.rs index 36bb64678c1c4..64a29c6336a6e 100644 --- a/rust/cyclotron-fetch/src/context.rs +++ b/rust/cyclotron-fetch/src/context.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use cyclotron_core::{worker::Worker, PoolConfig}; +use cyclotron_core::{PoolConfig, Worker}; use health::HealthHandle; use tokio::sync::Semaphore; diff --git a/rust/cyclotron-fetch/src/fetch.rs b/rust/cyclotron-fetch/src/fetch.rs index 3245c8221c70e..8a7214823effe 100644 --- a/rust/cyclotron-fetch/src/fetch.rs +++ b/rust/cyclotron-fetch/src/fetch.rs @@ -1,11 +1,7 @@ use std::{cmp::min, collections::HashMap, sync::Arc}; use chrono::{DateTime, Duration, Utc}; -use cyclotron_core::{ - base_ops::{Job, JobState}, - error::QueueError, - worker::Worker, -}; +use cyclotron_core::{Job, JobState, QueueError, Worker}; use futures::StreamExt; use http::StatusCode; use reqwest::Response; diff --git a/rust/cyclotron-fetch/tests/fetch.rs b/rust/cyclotron-fetch/tests/fetch.rs index b8ca6a61aeb0d..0fe7d565eca19 100644 --- a/rust/cyclotron-fetch/tests/fetch.rs +++ b/rust/cyclotron-fetch/tests/fetch.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use chrono::Duration; -use cyclotron_core::{manager::QueueManager, worker::Worker}; +use cyclotron_core::{QueueManager, Worker}; use cyclotron_fetch::fetch::{tick, FetchResult, HttpMethod}; use httpmock::{Method, MockServer}; use serde_json::json; diff --git a/rust/cyclotron-fetch/tests/utils.rs b/rust/cyclotron-fetch/tests/utils.rs index 306bfdf257116..656f08f596980 100644 --- a/rust/cyclotron-fetch/tests/utils.rs +++ b/rust/cyclotron-fetch/tests/utils.rs @@ -1,11 +1,8 @@ use std::sync::Arc; use chrono::{Duration, Utc}; -use cyclotron_core::{ - base_ops::{Job, JobInit}, - error::QueueError, - worker::Worker, -}; + +use cyclotron_core::{Job, JobInit, QueueError, Worker}; use cyclotron_fetch::{ config::AppConfig, context::AppContext, diff --git a/rust/cyclotron-janitor/src/janitor.rs b/rust/cyclotron-janitor/src/janitor.rs index 7cccd17e886e2..b686235f0380c 100644 --- a/rust/cyclotron-janitor/src/janitor.rs +++ b/rust/cyclotron-janitor/src/janitor.rs @@ -1,9 +1,6 @@ use chrono::Utc; use cyclotron_core::{ - error::QueueError, - janitor_ops::{ - delete_completed_jobs, delete_failed_jobs, delete_poison_pills, reset_stalled_jobs, - }, + delete_completed_jobs, delete_failed_jobs, delete_poison_pills, reset_stalled_jobs, QueueError, }; use sqlx::PgPool; use tracing::{info, warn}; @@ -92,7 +89,7 @@ impl Janitor { self.settings.max_touches, ) .await?; - let taken = Utc::now() - before; + let taken: chrono::Duration = Utc::now() - before; metrics::histogram!( "cyclotron_janitor_poison_pills_cleanup_duration_ms", &self.metrics_labels diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index fb77a7faf23de..98d7b3c55352c 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -1,9 +1,6 @@ use chrono::{Duration, Utc}; -use cyclotron_core::{ - base_ops::{JobInit, JobState}, - manager::QueueManager, - worker::Worker, -}; + +use cyclotron_core::{JobInit, JobState, QueueManager, Worker}; use cyclotron_janitor::{config::JanitorSettings, janitor::Janitor}; use sqlx::PgPool; use uuid::Uuid; diff --git a/rust/cyclotron-node/src/lib.rs b/rust/cyclotron-node/src/lib.rs index 212053d5fa74e..b231317a14e16 100644 --- a/rust/cyclotron-node/src/lib.rs +++ b/rust/cyclotron-node/src/lib.rs @@ -1,11 +1,6 @@ use chrono::{DateTime, Utc}; -use cyclotron_core::{ - base_ops::{JobInit, JobState}, - manager::{ManagerConfig, QueueManager}, - worker::Worker, - PoolConfig, -}; +use cyclotron_core::{JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; use neon::{ handle::Handle, prelude::{Context, FunctionContext, ModuleContext}, @@ -261,7 +256,7 @@ fn flush_job(mut cx: FunctionContext) -> JsResult { }; let res = worker.flush_job(job_id).await; deferred.settle_with(&channel, move |mut cx| { - res.or_else(|e: cyclotron_core::error::QueueError| cx.throw_error(format!("{}", e)))?; + res.or_else(|e: cyclotron_core::QueueError| cx.throw_error(format!("{}", e)))?; Ok(cx.null()) }); };