diff --git a/plugin-server/package.json b/plugin-server/package.json index 81cca1b74ec19..247af3d70baf1 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -147,6 +147,6 @@ }, "cyclotron": { "//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true, - "version": "0.1.2" + "version": "0.1.1" } } diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 19fbf5a7a3c84..47d99f2b77eff 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -760,7 +760,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase { private async updateJobs(invocations: HogFunctionInvocationResult[]) { await Promise.all( - invocations.map((item) => { + invocations.map(async (item) => { const id = item.invocation.id if (item.error) { status.debug('⚡️', 'Updating job to failed', id) @@ -775,19 +775,15 @@ export class CdpCyclotronWorker extends CdpConsumerBase { this.cyclotronWorker?.updateJob(id, 'available', updates) } - return this.cyclotronWorker?.releaseJob(id) + await this.cyclotronWorker?.flushJob(id) }) ) } private async handleJobBatch(jobs: CyclotronJob[]) { gaugeBatchUtilization.labels({ queue: this.queue }).set(jobs.length / this.hub.CDP_CYCLOTRON_BATCH_SIZE) - if (!this.cyclotronWorker) { - throw new Error('No cyclotron worker when trying to handle batch') - } const invocations: HogFunctionInvocation[] = [] - // A list of all the promises related to job releasing that we need to await - const failReleases: Promise[] = [] + for (const job of jobs) { // NOTE: This is all a bit messy and might be better to refactor into a helper if (!job.functionId) { @@ -801,8 +797,8 @@ export class CdpCyclotronWorker extends CdpConsumerBase { status.error('Error finding hog function', { id: job.functionId, }) - this.cyclotronWorker.updateJob(job.id, 'failed') - failReleases.push(this.cyclotronWorker.releaseJob(job.id)) + this.cyclotronWorker?.updateJob(job.id, 'failed') + await this.cyclotronWorker?.flushJob(job.id) continue } @@ -811,7 +807,6 @@ export class CdpCyclotronWorker extends CdpConsumerBase { } await this.processBatch(invocations) - await Promise.all(failReleases) counterJobsProcessed.inc({ queue: this.queue }, jobs.length) } diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 21dff5fb9cf0a..fa17aa0e7e504 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -871,11 +871,11 @@ version = "0.1.0" dependencies = [ "chrono", "futures", + "rand", "serde", "sqlx", "thiserror", "tokio", - "tracing", "uuid", ] diff --git a/rust/cyclotron-core/Cargo.toml b/rust/cyclotron-core/Cargo.toml index 85b51222291f6..18598fd0b37f5 100644 --- a/rust/cyclotron-core/Cargo.toml +++ b/rust/cyclotron-core/Cargo.toml @@ -13,5 +13,5 @@ chrono = { workspace = true } tokio = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true } +rand = { workspace = true } futures = { workspace = true } -tracing = { workspace = true } \ No newline at end of file diff --git a/rust/cyclotron-core/src/bin/create_test_data.rs b/rust/cyclotron-core/src/bin/create_test_data.rs new file mode 100644 index 0000000000000..2e194378dcd24 --- /dev/null +++ b/rust/cyclotron-core/src/bin/create_test_data.rs @@ -0,0 +1,53 @@ +use chrono::{Duration, Utc}; +use cyclotron_core::{JobInit, ManagerConfig, PoolConfig, QueueManager}; +use uuid::Uuid; + +// Just inserts jobs as fast as it can, choosing randomly between hog and fetch workers, and between different priorities. +// prints every 100 jobs inserted. +#[tokio::main] +async fn main() { + let pool_config = PoolConfig { + db_url: "postgresql://posthog:posthog@localhost:5432/cyclotron".to_string(), + max_connections: None, + min_connections: None, + acquire_timeout_seconds: None, + max_lifetime_seconds: None, + idle_timeout_seconds: None, + }; + + let manager_config = ManagerConfig { + shards: vec![pool_config.clone()], + shard_depth_limit: None, + shard_depth_check_interval_seconds: None, + }; + + let manager = QueueManager::new(manager_config).await.unwrap(); + + let now = Utc::now() - Duration::minutes(1); + let start = Utc::now(); + let mut count = 0; + loop { + let queue = if rand::random() { "fetch" } else { "hog" }; + + let priority = (rand::random::() % 3) as i16; + + let test_job = JobInit { + team_id: 1, + queue_name: queue.to_string(), + priority, + scheduled: now, + function_id: Some(Uuid::now_v7()), + vm_state: None, + parameters: None, + metadata: None, + blob: None, + }; + + manager.create_job(test_job).await.unwrap(); + + count += 1; + if count % 100 == 0 { + println!("Elapsed: {:?}, count: {}", Utc::now() - start, count); + } + } +} diff --git a/rust/cyclotron-core/src/bin/load_test.rs b/rust/cyclotron-core/src/bin/load_test.rs new file mode 100644 index 0000000000000..f000ab49c6e12 --- /dev/null +++ b/rust/cyclotron-core/src/bin/load_test.rs @@ -0,0 +1,163 @@ +use std::{ + sync::{atomic::AtomicUsize, Arc}, + time::Instant, +}; + +use chrono::{Duration, Utc}; +use cyclotron_core::{JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; +use futures::future::join_all; +use uuid::Uuid; + +// This spins up a manager and 2 workers, and tries to simulate semi-realistic load (on the DB - the workers do nothing except complete jobs) +// - The manager inserts jobs as fast as it can, choosing randomly between hog and fetch workers, and between different priorities. +// - The workers will process jobs as fast as they can, in batches of 1000. +// - The manager and both workers track how long each insert and dequeue takes, in ms/job. +// - The manager never inserts more than 10,000 more jobs than the workers have processed. +const INSERT_BATCH_SIZE: usize = 1000; + +struct SharedContext { + jobs_inserted: AtomicUsize, + jobs_dequeued: AtomicUsize, +} + +async fn producer_loop(manager: QueueManager, shared_context: Arc) { + let mut time_spent_inserting = Duration::zero(); + let now = Utc::now() - Duration::minutes(1); + loop { + let mut to_insert = Vec::with_capacity(1000); + for _ in 0..INSERT_BATCH_SIZE { + let queue = if rand::random() { "fetch" } else { "hog" }; + + let priority = (rand::random::() % 3) as i16; + + let test_job = JobInit { + team_id: 1, + queue_name: queue.to_string(), + priority, + scheduled: now, + function_id: Some(Uuid::now_v7()), + vm_state: None, + parameters: None, + blob: None, + metadata: None, + }; + + to_insert.push(test_job); + } + + let start = Instant::now(); + manager.bulk_create_jobs(to_insert).await; + let elapsed = start.elapsed(); + time_spent_inserting += Duration::from_std(elapsed).unwrap(); + + let inserted = shared_context + .jobs_inserted + .fetch_add(INSERT_BATCH_SIZE, std::sync::atomic::Ordering::Relaxed); + + println!("Inserted: {} in {}, ", inserted, time_spent_inserting); + let mut dequeued = shared_context + .jobs_dequeued + .load(std::sync::atomic::Ordering::Relaxed); + while inserted > dequeued + 10_000 { + println!( + "Waiting for workers to catch up, lagging by {}", + inserted - dequeued + ); + tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await; + dequeued = shared_context + .jobs_dequeued + .load(std::sync::atomic::Ordering::Relaxed); + } + } +} + +async fn worker_loop(worker: Worker, shared_context: Arc, queue: &str) { + let mut time_spent_dequeuing = Duration::zero(); + let start = Utc::now(); + loop { + let loop_start = Instant::now(); + let jobs = worker.dequeue_jobs(queue, 1000).await.unwrap(); + + if jobs.is_empty() { + println!( + "Worker {:?} outpacing inserts, got no jobs, sleeping!", + queue + ); + tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await; + continue; + } + + let mut futs = Vec::with_capacity(jobs.len()); + for job in &jobs { + worker.set_state(job.id, JobState::Completed).unwrap(); + futs.push(worker.flush_job(job.id)); + } + + for res in join_all(futs).await { + res.unwrap(); + } + + time_spent_dequeuing += Duration::from_std(loop_start.elapsed()).unwrap(); + + let dequeued = shared_context + .jobs_dequeued + .fetch_add(jobs.len(), std::sync::atomic::Ordering::Relaxed); + + // To account for the bunch we just handled + let dequeued = dequeued + jobs.len(); + + println!( + "Dequeued, processed and completed {} jobs in {} for {:?}. Total time running: {}", + dequeued, + time_spent_dequeuing, + queue, + Utc::now() - start + ); + + if jobs.len() < 1000 { + println!( + "Worker {:?} outpacing manager, only got {} jobs, sleeping!", + queue, + jobs.len() + ); + tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await; + } + } +} + +#[tokio::main] +async fn main() { + let pool_config = PoolConfig { + db_url: "postgresql://posthog:posthog@localhost:5432/cyclotron".to_string(), + max_connections: None, + min_connections: None, + acquire_timeout_seconds: None, + max_lifetime_seconds: None, + idle_timeout_seconds: None, + }; + + let manager_config = ManagerConfig { + shards: vec![pool_config.clone()], + shard_depth_limit: None, + shard_depth_check_interval_seconds: None, + }; + + let shared_context = Arc::new(SharedContext { + jobs_inserted: AtomicUsize::new(0), + jobs_dequeued: AtomicUsize::new(0), + }); + + let manager = QueueManager::new(manager_config).await.unwrap(); + let worker_1 = Worker::new(pool_config.clone()).await.unwrap(); + let worker_2 = Worker::new(pool_config.clone()).await.unwrap(); + + let producer = producer_loop(manager, shared_context.clone()); + let worker_1 = worker_loop(worker_1, shared_context.clone(), "fetch"); + let worker_2 = worker_loop(worker_2, shared_context.clone(), "hog"); + + let producer = tokio::spawn(producer); + let worker_1 = tokio::spawn(worker_1); + let worker_2 = tokio::spawn(worker_2); + + tokio::try_join!(producer, worker_1, worker_2).unwrap(); +} diff --git a/rust/cyclotron-core/src/config.rs b/rust/cyclotron-core/src/config.rs index f49ba07e2c201..8304816671435 100644 --- a/rust/cyclotron-core/src/config.rs +++ b/rust/cyclotron-core/src/config.rs @@ -40,39 +40,3 @@ pub struct ManagerConfig { pub shard_depth_limit: Option, // Defaults to 10_000 available jobs per shard pub shard_depth_check_interval_seconds: Option, // Defaults to 10 seconds - checking shard capacity } - -#[derive(Debug, Serialize, Deserialize, Default)] -pub struct WorkerConfig { - #[serde(alias = "heartbeatWindowSeconds")] - pub heartbeat_window_seconds: Option, // Defaults to 5 - #[serde(alias = "lingerTimeMs")] - pub linger_time_ms: Option, // Defaults to 500 - #[serde(alias = "maxUpdatesBuffered")] - pub max_updates_buffered: Option, // Defaults to 100 - #[serde(alias = "maxBytesBuffered")] - pub max_bytes_buffered: Option, // Defaults to 10MB - #[serde(alias = "flushLoopIntervalMs")] - pub flush_loop_interval_ms: Option, // Defaults to 10 -} - -impl WorkerConfig { - pub fn heartbeat_window(&self) -> chrono::Duration { - chrono::Duration::seconds(self.heartbeat_window_seconds.unwrap_or(5) as i64) - } - - pub fn linger_time(&self) -> chrono::Duration { - chrono::Duration::milliseconds(self.linger_time_ms.unwrap_or(500) as i64) - } - - pub fn flush_loop_interval(&self) -> chrono::Duration { - chrono::Duration::milliseconds(self.flush_loop_interval_ms.unwrap_or(10) as i64) - } - - pub fn max_updates_buffered(&self) -> usize { - self.max_updates_buffered.unwrap_or(100) - } - - pub fn max_bytes_buffered(&self) -> usize { - self.max_bytes_buffered.unwrap_or(10_000_000) - } -} diff --git a/rust/cyclotron-core/src/error.rs b/rust/cyclotron-core/src/error.rs index 4c062559ef11a..4e870e75a2d59 100644 --- a/rust/cyclotron-core/src/error.rs +++ b/rust/cyclotron-core/src/error.rs @@ -4,24 +4,14 @@ use uuid::Uuid; pub enum QueueError { #[error("sqlx error: {0}")] SqlxError(#[from] sqlx::Error), + #[error("Unknown job id: {0}")] + UnknownJobId(Uuid), + #[error("Job {0} flushed without a new state, which would leave it in a running state forever (or until reaped)")] + FlushWithoutNextState(Uuid), + #[error("Invalid lock {0} used to update job {1}. This usually means a job has been reaped from under a worker - did you forget to set the heartbeat?")] + InvalidLock(Uuid, Uuid), #[error("Shard over capacity {0} for this manager, insert aborted")] ShardFull(u64), #[error("Timed waiting for shard to have capacity")] TimedOutWaitingForCapacity, - #[error(transparent)] - JobError(#[from] JobError), -} - -#[derive(Debug, thiserror::Error)] -pub enum JobError { - #[error("Unknown job id: {0}")] - UnknownJobId(Uuid), - #[error("Invalid lock id: {0} for job {1}")] - InvalidLock(Uuid, Uuid), - #[error("Cannot flush job {0} without a next state")] - FlushWithoutNextState(Uuid), - #[error("Deadline to flush update for job {0} exceeded")] - DeadlineExceeded(Uuid), - #[error("Update dropped before being flushed.")] - UpdateDropped, } diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index 2615c4f4c5e74..f845ccee042f8 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -14,9 +14,6 @@ pub use types::JobUpdate; // Errors mod error; -// Errors about some job operation - locks being lost, invalid states, flush deadlines exceeded etc -pub use error::JobError; -// Errors about the queue itself - full shards, timeouts, postgres/network errors pub use error::QueueError; // Manager @@ -25,8 +22,6 @@ pub use manager::QueueManager; // Worker mod worker; -// A handle to a released job update, that can be awaited to block waiting for the flush to complete -pub use worker::FlushHandle; pub use worker::Worker; // Janitor @@ -37,7 +32,6 @@ pub use janitor::Janitor; mod config; pub use config::ManagerConfig; pub use config::PoolConfig; -pub use config::WorkerConfig; // The shard id is a fixed value that is set by the janitor when it starts up. // Workers may use this value when reporting metrics. The `Worker` struct provides diff --git a/rust/cyclotron-core/src/ops/meta.rs b/rust/cyclotron-core/src/ops/meta.rs index f22b41e70f1a7..d48acd88bc188 100644 --- a/rust/cyclotron-core/src/ops/meta.rs +++ b/rust/cyclotron-core/src/ops/meta.rs @@ -1,10 +1,7 @@ use sqlx::{postgres::PgQueryResult, PgPool}; use uuid::Uuid; -use crate::{ - error::{JobError, QueueError}, - DEAD_LETTER_QUEUE, -}; +use crate::{error::QueueError, DEAD_LETTER_QUEUE}; pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result where @@ -20,10 +17,9 @@ where Ok(res as u64) } -// Returns an InvalidLock error if the query run did not affect any rows. -pub fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(), JobError> { +pub fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(), QueueError> { if res.rows_affected() == 0 { - Err(JobError::InvalidLock(lock, job)) + Err(QueueError::InvalidLock(lock, job)) } else { Ok(()) } @@ -57,7 +53,7 @@ where .await?; let Some(original_queue_name) = original_queue_name else { - return Err(JobError::UnknownJobId(job).into()); + return Err(QueueError::UnknownJobId(job)); }; // Now we add an entry to the dead metadata queue diff --git a/rust/cyclotron-core/src/ops/worker.rs b/rust/cyclotron-core/src/ops/worker.rs index 3a8f7502fd1e0..8f808aaba5325 100644 --- a/rust/cyclotron-core/src/ops/worker.rs +++ b/rust/cyclotron-core/src/ops/worker.rs @@ -169,7 +169,7 @@ where pub async fn flush_job<'c, E>( executor: E, job_id: Uuid, - updates: &JobUpdate, + updates: JobUpdate, ) -> Result<(), QueueError> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, @@ -180,48 +180,47 @@ where let mut query = QueryBuilder::new("UPDATE cyclotron_jobs SET "); let mut needs_comma = false; - if let Some(state) = &updates.state { + if let Some(state) = updates.state { set_helper(&mut query, "state", state, needs_comma); needs_comma = true; } - if let Some(queue_name) = &updates.queue_name { + if let Some(queue_name) = updates.queue_name { set_helper(&mut query, "queue_name", queue_name, needs_comma); needs_comma = true; } - if let Some(priority) = &updates.priority { + if let Some(priority) = updates.priority { set_helper(&mut query, "priority", priority, needs_comma); needs_comma = true; } - if let Some(scheduled) = &updates.scheduled { + if let Some(scheduled) = updates.scheduled { set_helper(&mut query, "scheduled", scheduled, needs_comma); needs_comma = true; } - if let Some(vm_state) = &updates.vm_state { + if let Some(vm_state) = updates.vm_state { set_helper(&mut query, "vm_state", vm_state, needs_comma); needs_comma = true; } - if let Some(metadata) = &updates.metadata { + if let Some(metadata) = updates.metadata { set_helper(&mut query, "metadata", metadata, needs_comma); needs_comma = true; } - if let Some(parameters) = &updates.parameters { + if let Some(parameters) = updates.parameters { set_helper(&mut query, "parameters", parameters, needs_comma); needs_comma = true; } - if let Some(blob) = &updates.blob { + if let Some(blob) = updates.blob { set_helper(&mut query, "blob", blob, needs_comma); needs_comma = true; } if job_returned { - // If we're returning this job, clear the lock id and the heartbeat set_helper(&mut query, "lock_id", Option::::None, needs_comma); set_helper( &mut query, @@ -230,7 +229,6 @@ where true, ); } else { - // Otherwise, flushing a job update indicates forward progress, so we update the heartbeat set_helper(&mut query, "last_heartbeat", Utc::now(), needs_comma); } @@ -239,6 +237,8 @@ where query.push(" AND lock_id = "); query.push_bind(lock_id); + //println!("Query: {:?}", query.into_sql()); + assert_does_update(executor, job_id, lock_id, query.build()).await?; Ok(()) } @@ -276,7 +276,7 @@ where assert_does_update(executor, job_id, lock_id, q).await } -// Simple wrapper, that just executes a query and returns an InvalidLock error if no rows were affected. +// Simple wrapper, that just executes a query and throws an error if no rows were affected async fn assert_does_update<'c, E>( executor: E, job_id: Uuid, @@ -287,7 +287,5 @@ where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { let res = query.execute(executor).await?; - - // JobError -> QueueError - Ok(throw_if_no_rows(res, job_id, lock_id)?) + throw_if_no_rows(res, job_id, lock_id) } diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index 475951977cd8e..7398c6837113a 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -1,21 +1,11 @@ -use std::{ - collections::HashMap, - future::Future, - sync::{Arc, Weak}, - task::Poll, -}; +use std::collections::HashMap; use chrono::{DateTime, Duration, Utc}; -use futures::FutureExt; use sqlx::PgPool; use std::sync::Mutex; -use tokio::sync::oneshot; -use tracing::error; use uuid::Uuid; use crate::{ - config::WorkerConfig, - error::JobError, ops::{ meta::{dead_letter, run_migrations}, worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat}, @@ -35,49 +25,36 @@ use crate::{ // now (client libraries should wrap this to provide better interfaces). pub struct Worker { pool: PgPool, - // All the jobs the worker is currently working on, and hasn't released for returning - // to the queue. + // All dequeued job IDs that haven't been flushed yet. The idea is this lets us + // manage, on the rust side of any API boundary, the "pending" update of any given + // job, such that a user can progressively build up a full update, and then flush it, + // rather than having to track the update state on their side and submit it all at once. + // This also lets us "hide" all the locking logic, which we're not totally settled on yet. + // TRICKY - this is a sync mutex, because that simplifies using the manager in an FFI // context (since most functions below can be sync). We have to be careful never to // hold a lock across an await point, though. - running: Mutex>, - - // When a user calls release, we queue up the update to be flushed, but only flush on - // some conditions. - flush_batch: Arc>, + pending: Mutex>, - pub heartbeat_window: Duration, // The worker will only pass one heartbeat to the DB per job every heartbeat_window - pub linger: Duration, // Updates will be held at most this long - pub max_buffered: usize, // Updates will be flushed after this many are buffered - pub max_bytes: usize, // Updates will be flushed after the vm_state and blob sizes combined exceed this + pub heartbeat_window: Duration, // The worker will only pass one heartbeat to the DB every heartbeat_window } impl Worker { - pub async fn new(pool: PoolConfig, worker: WorkerConfig) -> Result { - let pool = pool.connect().await?; - Ok(Self::from_pool(pool, worker)) + pub async fn new(config: PoolConfig) -> Result { + let pool = config.connect().await?; + Ok(Self { + pool, + pending: Default::default(), + heartbeat_window: Duration::seconds(5), + }) } - pub fn from_pool(pool: PgPool, worker_config: WorkerConfig) -> Self { - let worker = Self { + pub fn from_pool(pool: PgPool) -> Self { + Self { pool, - running: Default::default(), - heartbeat_window: worker_config.heartbeat_window(), - flush_batch: Default::default(), - linger: worker_config.linger_time(), - max_buffered: worker_config.max_updates_buffered(), - max_bytes: worker_config.max_bytes_buffered(), - }; - - tokio::spawn(flush_loop( - worker.pool.clone(), - Arc::downgrade(&worker.flush_batch), - worker.max_buffered, - worker.max_bytes, - worker_config.flush_loop_interval(), - )); - - worker + pending: Default::default(), + heartbeat_window: Duration::seconds(5), + } } /// Run the latest cyclotron migrations. Panics if the migrations can't be run - failure to run migrations is purposefully fatal. @@ -91,14 +68,14 @@ impl Worker { pub async fn dequeue_jobs(&self, queue: &str, limit: usize) -> Result, QueueError> { let jobs = dequeue_jobs(&self.pool, queue, limit).await?; - let mut running = self.running.lock().unwrap(); + let mut pending = self.pending.lock().unwrap(); for job in &jobs { // We need to hang onto the locks for a job until we flush it, so we can send updates. let update = JobUpdate::new( job.lock_id .expect("Yell at oliver that the dequeuing code is broken. He's very sorry that your process just panicked"), ); - running.insert(job.id, update); + pending.insert(job.id, update); } Ok(jobs) @@ -112,14 +89,14 @@ impl Worker { ) -> Result, QueueError> { let jobs = dequeue_with_vm_state(&self.pool, queue, limit).await?; - let mut running = self.running.lock().unwrap(); + let mut pending = self.pending.lock().unwrap(); for job in &jobs { // We need to hang onto the locks for a job until we flush it, so we can send updates. let update = JobUpdate::new( job.lock_id .expect("Yell at oliver that the dequeuing (with vm) code is broken. He's very sorry that your process just panicked"), ); - running.insert(job.id, update); + pending.insert(job.id, update); } Ok(jobs) @@ -129,74 +106,47 @@ impl Worker { /// need the VM state as well. pub async fn get_vm_state(&self, job_id: Uuid) -> Result, QueueError> { let lock_id = { - let pending = self.running.lock().unwrap(); + let pending = self.pending.lock().unwrap(); pending .get(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .lock_id }; get_vm_state(&self.pool, job_id, lock_id).await } - /// Release a job back to the queue. Callers are returned a flush handle, which they - /// may use to await the flushing of the updated job state, which happens asynchronously - /// to allow for batching of updates. Callers may drop the flush handle without impacting - /// the flushing of the update. This function returns an error if the caller tries to release - /// a job that this `Worker` doesn't know about, or if the worker tries to release a job - /// without having provided a next state for it. - /// - /// The flush handle returned here will resolve to an error if the asynchronous flush operation - /// fails in non-retryable fashion. Retryable errors during flush are not surfaced to the handle, - /// and the flush will be retried until it succeeds, a non-retryable error is encountered (e.g. - /// this workers lock on the job has been lost), or until the deadline is exceeded, if one is - /// provided. All updates will have at least one flush attempt. - pub fn release_job(&self, job_id: Uuid, deadline: Option) -> FlushHandle { + /// NOTE - This function can only be called once, even though the underlying + /// basic operation can be performed as many times as the caller likes (so long as + /// the job state is never set to something other than running, as that clears the + /// job lock). We're more strict here (flushes can only happen once, you must + /// flush some non-running state) to try and enforce a good interaction + /// pattern with the queue. I might return to this and loosen this constraint in the + /// future, if there's a motivating case for needing to flush partial job updates. + pub async fn flush_job(&self, job_id: Uuid) -> Result<(), QueueError> { + // TODO - this drops the job from the known jobs before the flush succeeds, + // which means that if the flush fails, we'll lose the job and can never + // update it's state (leaving it to the reaper). This is a bug, but I'm not + // sure I want to make flushes retryable just yet, so I'm leaving it for now. + // NIT: this wrapping is to ensure pending is dropped prior to the await let update = { - let mut running = self.running.lock().unwrap(); - let Some(update) = running.remove(&job_id) else { - return FlushHandle::immediate(Err(JobError::UnknownJobId(job_id))); - }; + let mut pending = self.pending.lock().unwrap(); + let update = pending + .remove(&job_id) + .ok_or(QueueError::UnknownJobId(job_id))?; + // It's a programming error to flush a job without setting a new state match update.state { Some(JobState::Running) | None => { - // Keep track of any /other/ updates that might have been stored, so this - // error is recoverable simply by providing an appropriate new state. - running.insert(job_id, update); - return FlushHandle::immediate(Err(JobError::FlushWithoutNextState(job_id))); + // Keep track of any /other/ updates that might have been stored, even in this case, + // so a user can queue up the appropriate state transition and flush properly + pending.insert(job_id, update); + return Err(QueueError::FlushWithoutNextState(job_id)); } _ => update, } }; - - // If we were given a deadline, this update should be flushed at least as soon as then, - // otherwise we can wait the full linger time before flushing it. - let now = Utc::now(); - let flush_by = now + deadline.unwrap_or(self.linger); - let deadline = deadline.map(|d| now + d); - - let (pending, handle) = PendingUpdate::new(job_id, update, deadline); - - let mut batch = self.flush_batch.lock().unwrap(); - batch.add(pending, flush_by); - handle - } - - /// Force flush all pending updates, regardless of linger time or buffer size. - /// Transient errors encountered during the flush will cause the operation to - /// be aborted, and the error to be returned to the caller. If no transient errors - /// are encountered, all permanent errors will be dispatched to the relevant flush - /// handle, and this function will return success. - pub async fn force_flush(&self) -> Result<(), QueueError> { - let mut to_flush = { self.flush_batch.lock().unwrap().take() }; - let res = if !to_flush.pending.is_empty() { - to_flush.flush(&self.pool).await - } else { - Ok(()) - }; - // If the flush successed, to_flush is empty, otherwise, we need to retry any - // updates still in it. - self.flush_batch.lock().unwrap().merge(to_flush); - res + let mut connection = self.pool.acquire().await?; + flush_job(&mut *connection, job_id, update).await } /// Jobs are reaped after some seconds (the number is deployment specific, and may become @@ -207,10 +157,10 @@ impl Worker { /// if e.g. the job was reaped out from under you). pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), QueueError> { let lock_id = { - let mut pending = self.running.lock().unwrap(); + let mut pending = self.pending.lock().unwrap(); let update = pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))?; + .ok_or(QueueError::UnknownJobId(job_id))?; let should_heartbeat = update .last_heartbeat @@ -228,31 +178,31 @@ impl Worker { } /// This is how you "return" a job to the queue, by setting the state to "available" - pub fn set_state(&self, job_id: Uuid, state: JobState) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + pub fn set_state(&self, job_id: Uuid, state: JobState) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .state = Some(state); Ok(()) } - pub fn set_queue(&self, job_id: Uuid, queue: &str) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + pub fn set_queue(&self, job_id: Uuid, queue: &str) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .queue_name = Some(queue.to_string()); Ok(()) } /// Jobs are dequeued lowest-priority-first, so this is how you change the "base" priority of a job /// (control tables may apply further deltas if e.g. a given function is in a degraded state) - pub fn set_priority(&self, job_id: Uuid, priority: i16) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + pub fn set_priority(&self, job_id: Uuid, priority: i16) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .priority = Some(priority); Ok(()) } @@ -260,11 +210,15 @@ impl Worker { /// This is how you do e.g. retries after some time, by setting the scheduled time /// to some time in the future. Sleeping, retry backoff, scheduling - it's all the same operation, /// this one. - pub fn set_scheduled_at(&self, job_id: Uuid, scheduled: DateTime) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + pub fn set_scheduled_at( + &self, + job_id: Uuid, + scheduled: DateTime, + ) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .scheduled = Some(scheduled); Ok(()) } @@ -274,31 +228,35 @@ impl Worker { &self, job_id: Uuid, vm_state: Option, // This (and the following) are Options, because the user can null them (by calling with None) - ) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + ) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .vm_state = Some(vm_state); Ok(()) } /// Passing None here will clear the metadata - pub fn set_metadata(&self, job_id: Uuid, metadata: Option) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + pub fn set_metadata(&self, job_id: Uuid, metadata: Option) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .metadata = Some(metadata); Ok(()) } /// Passing None here will clear the parameters - pub fn set_parameters(&self, job_id: Uuid, parameters: Option) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + pub fn set_parameters( + &self, + job_id: Uuid, + parameters: Option, + ) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .parameters = Some(parameters); Ok(()) } @@ -309,9 +267,9 @@ impl Worker { // lock after the if check, makes the compiler think the lock is held across // the await point. { - let pending = self.running.lock().unwrap(); + let pending = self.pending.lock().unwrap(); if !pending.contains_key(&job_id) { - return Err(JobError::UnknownJobId(job_id).into()); + return Err(QueueError::UnknownJobId(job_id)); } } @@ -319,218 +277,12 @@ impl Worker { } /// Passing None here will clear the blob - pub fn set_blob(&self, job_id: Uuid, blob: Option) -> Result<(), JobError> { - let mut pending = self.running.lock().unwrap(); + pub fn set_blob(&self, job_id: Uuid, blob: Option) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(JobError::UnknownJobId(job_id))? + .ok_or(QueueError::UnknownJobId(job_id))? .blob = Some(blob); Ok(()) } } - -// Started by each worker on creation, just loops seeing if the passed batch can be flushed, and -// if it can, flushing it. -async fn flush_loop( - pool: PgPool, - batch: Weak>, - max_buffered: usize, - max_bytes: usize, - interval: Duration, -) { - loop { - let Some(batch) = batch.upgrade() else { - // The batch has been dropped, we should exit. - break; - }; - // Contemplating sync mutexes on the tree of woe. - let mut to_flush = { batch.lock().unwrap().take() }; - if to_flush.should_flush(max_buffered, max_bytes) { - if let Err(e) = to_flush.flush(&pool).await { - error!("Error flushing batch: {:?}", e); - } - } - // We can always merge the taken batch back into the pending batch - on successful - // flush, the taken batch will be empty, and on failure, we need to re-queue those updates. - // TRICKY - we take care not to bind the lock here. Compilation WILL fail if it's bound, - // because it makes this future !Send, and the tokio::spawn above will fail, but in case - // we change the looping strategy, I'm calling it out explicitly too. - batch.lock().unwrap().merge(to_flush); - tokio::time::sleep(interval.to_std().unwrap()).await; - } -} - -struct FlushBatch { - // The minimum of the "flush_by" times of all the updates in the batch - pub next_mandatory_flush: DateTime, - // The list of pending updates. Note that the update batch makes no effort - // to deduplicate or compact updates. - pub pending: Vec, - // A running total of all blob bytes held in the batch - pub blobs_size: usize, - // A running total of all vm_state bytes held in the batch - pub vm_states_size: usize, -} - -impl FlushBatch { - pub fn new() -> Self { - Self { - next_mandatory_flush: Utc::now(), - pending: Default::default(), - blobs_size: 0, - vm_states_size: 0, - } - } - - pub fn add(&mut self, pending: PendingUpdate, flush_by: DateTime) { - // If this is the start of a new batch, reset the first_insert time - if self.pending.is_empty() { - self.next_mandatory_flush = flush_by; - } else { - self.next_mandatory_flush = self.next_mandatory_flush.min(flush_by); - } - - // Update the sizes of the bytes we track - if let Some(Some(blob)) = pending.update.blob.as_ref() { - self.blobs_size += blob.len(); - } - if let Some(Some(vm_state)) = pending.update.vm_state.as_ref() { - self.vm_states_size += vm_state.len(); - } - self.pending.push(pending); - } - - async fn flush(&mut self, pool: &PgPool) -> Result<(), QueueError> { - let now = Utc::now(); - // First, filter any updates whose deadline is exceeded that we have - // already tried to flush once, sending a deadline exceeded error to the - // handle. - let mut i = 0; - while i < self.pending.len() { - if self.pending[i].deadline.map_or(false, |d| d < now) && self.pending[i].tries > 0 { - self.pending.swap_remove(i).fail_deadline_exceeded(); - } else { - i += 1; - } - } - - let mut txn = pool.begin().await?; - let mut results = Vec::new(); - for to_flush in self.pending.iter_mut() { - to_flush.tries += 1; - let result = flush_job(&mut *txn, to_flush.job_id, &to_flush.update).await; - match result { - Ok(()) => { - results.push(Ok(())); - } - Err(QueueError::JobError(e)) => { - results.push(Err(e)); - } - Err(e) => { - return Err(e); - } - } - } - txn.commit().await?; - - // We only dispatch results and clear the pending set if we actually commit the transaction, otherwise - // the updates in this batch should be retried. - for (update, result) in self.pending.drain(..).zip(results) { - update.resolve(result); - } - Ok(()) - } - - fn should_flush(&self, max_buffered: usize, max_bytes: usize) -> bool { - let would_flush = Utc::now() >= self.next_mandatory_flush - || self.pending.len() >= max_buffered - || self.blobs_size + self.vm_states_size >= max_bytes; - - would_flush && !self.pending.is_empty() // we only should flush if we have something to flush - } - - // Take the current batch, replacing it in memory with an empty one. Used along with "merge" - // to let us flush without holding the batch lock for the duration of the flush - fn take(&mut self) -> Self { - std::mem::take(self) - } - - // Combine two batches, setting the next mandatory flush to the earliest of the two - fn merge(&mut self, other: Self) { - self.pending.extend(other.pending); - self.blobs_size += other.blobs_size; - self.vm_states_size += other.vm_states_size; - self.next_mandatory_flush = self.next_mandatory_flush.min(other.next_mandatory_flush); - } -} - -impl Default for FlushBatch { - fn default() -> Self { - Self::new() - } -} - -struct PendingUpdate { - job_id: Uuid, - update: JobUpdate, - deadline: Option>, - tries: u8, - tx: oneshot::Sender>, -} - -impl PendingUpdate { - pub fn new( - job_id: Uuid, - update: JobUpdate, - deadline: Option>, - ) -> (Self, FlushHandle) { - let (tx, rx) = oneshot::channel(); - let update = Self { - job_id, - update, - deadline, - tries: 0, - tx, - }; - (update, FlushHandle { inner: rx }) - } - - pub fn fail_deadline_exceeded(self) { - let job_id = self.job_id; - self.resolve(Err(JobError::DeadlineExceeded(job_id))); - } - - pub fn resolve(self, result: Result<(), JobError>) { - // We do not care if someone is waiting for this result or not - let _ = self.tx.send(result); - } -} - -pub struct FlushHandle { - inner: oneshot::Receiver>, -} - -impl FlushHandle { - pub fn immediate(result: Result<(), JobError>) -> Self { - let (tx, rx) = oneshot::channel(); - let _ = tx.send(result); - Self { inner: rx } - } -} - -// If the inner oneshot resolves to an error, we know that the update was dropped before being flushed, -// so we just return a JobError::UpdateDropped. Otherwise, we return the result of the inner oneshot. -impl Future for FlushHandle { - type Output = Result<(), JobError>; - - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - match self.inner.poll_unpin(cx) { - Poll::Ready(Ok(result)) => Poll::Ready(result), - Poll::Ready(Err(_)) => Poll::Ready(Err(JobError::UpdateDropped)), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/rust/cyclotron-core/tests/base_ops.rs b/rust/cyclotron-core/tests/base_ops.rs index 5354eea6f4f01..35c55c7037f44 100644 --- a/rust/cyclotron-core/tests/base_ops.rs +++ b/rust/cyclotron-core/tests/base_ops.rs @@ -12,8 +12,7 @@ mod common; #[sqlx::test(migrations = "./migrations")] async fn test_queue(db: PgPool) { let manager = QueueManager::from_pool(db.clone()); - let mut worker = Worker::from_pool(db, Default::default()); - worker.max_buffered = 0; // No buffering for testing, flush immediately + let worker = Worker::from_pool(db); let job_1 = create_new_job(); let mut job_2 = create_new_job(); @@ -50,12 +49,14 @@ async fn test_queue(db: PgPool) { .expect("failed to set state"); // Flush the two jobs, having made no other changes, then assert we can re-dequeue them - let handle_1 = worker.release_job(jobs[0].id, None); - let handle_2 = worker.release_job(jobs[1].id, None); - - worker.force_flush().await.unwrap(); - handle_1.await.unwrap(); - handle_2.await.unwrap(); + worker + .flush_job(jobs[0].id) + .await + .expect("failed to flush job"); + worker + .flush_job(jobs[1].id) + .await + .expect("failed to flush job"); let jobs = worker .dequeue_jobs(&queue_name, 2) @@ -74,15 +75,17 @@ async fn test_queue(db: PgPool) { .set_state(jobs[1].id, JobState::Available) .expect("failed to set state"); - let handle_1 = worker.release_job(jobs[0].id, None); - let handle_2 = worker.release_job(jobs[1].id, None); - - worker.force_flush().await.unwrap(); - handle_1.await.unwrap(); - handle_2.await.unwrap(); + worker + .flush_job(jobs[0].id) + .await + .expect("failed to flush job"); + worker + .flush_job(jobs[1].id) + .await + .expect("failed to flush job"); // Spin up two tasks to race on dequeuing, and assert at most 2 jobs are dequeued - let worker: Arc = Arc::new(worker); + let worker = Arc::new(worker); let moved = worker.clone(); let queue_name_moved = queue_name.clone(); let fut_1 = async move { @@ -115,16 +118,20 @@ async fn test_queue(db: PgPool) { .expect("failed to dequeue job"); assert_eq!(empty.len(), 0); - // If we try to flush a job without setting what it's next state will be, - // we should get an error. We don't bother forcing a flush here, because - // the worker should return a handle that immediately resolves. - assert!(worker.release_job(jobs[0].id, None).await.is_err()); + // If we try to flush a job without setting what it's next state will be (or if we set that next state to be "running"), + // we should get an error + worker + .flush_job(jobs[0].id) + .await + .expect_err("expected error due to no-next-state"); - // Trying to flush a job with the state "running" should also fail worker .set_state(jobs[1].id, JobState::Running) .expect("failed to set state"); - assert!(worker.release_job(jobs[1].id, None).await.is_err()); + worker + .flush_job(jobs[1].id) + .await + .expect_err("expected error due to running state"); // But if we properly set the state to completed or failed, now we can flush worker @@ -134,12 +141,14 @@ async fn test_queue(db: PgPool) { .set_state(jobs[1].id, JobState::Failed) .expect("failed to set state"); - let handle_1 = worker.release_job(jobs[0].id, None); - let handle_2 = worker.release_job(jobs[1].id, None); - - worker.force_flush().await.expect("failed to flush job"); - handle_1.await.unwrap(); - handle_2.await.unwrap(); + worker + .flush_job(jobs[0].id) + .await + .expect("failed to flush job"); + worker + .flush_job(jobs[1].id) + .await + .expect("failed to flush job"); // And now, any subsequent dequeues will return no jobs (because these jobs are finished) let empty = worker @@ -201,10 +210,8 @@ async fn test_queue(db: PgPool) { .expect("failed to set metadata"); // Flush the job - let handle = worker.release_job(job.id, None); + worker.flush_job(job.id).await.expect("failed to flush job"); - worker.force_flush().await.unwrap(); - handle.await.unwrap(); // Then dequeue it again (this time being sure to grab the vm state too) let job = worker .dequeue_with_vm_state("test_2", 1) @@ -224,7 +231,7 @@ async fn test_queue(db: PgPool) { #[sqlx::test(migrations = "./migrations")] pub async fn test_bulk_insert(db: PgPool) { - let worker = Worker::from_pool(db.clone(), Default::default()); + let worker = Worker::from_pool(db.clone()); let manager = QueueManager::from_pool(db.clone()); let job_template = create_new_job(); diff --git a/rust/cyclotron-fetch/src/config.rs b/rust/cyclotron-fetch/src/config.rs index eb158fde2764b..752ba5f32217a 100644 --- a/rust/cyclotron-fetch/src/config.rs +++ b/rust/cyclotron-fetch/src/config.rs @@ -1,5 +1,5 @@ use chrono::Duration; -use cyclotron_core::{PoolConfig, WorkerConfig}; +use cyclotron_core::PoolConfig; use envconfig::Envconfig; use uuid::Uuid; @@ -66,22 +66,6 @@ pub struct Config { #[envconfig(nested = true)] pub kafka: KafkaConfig, - - // Worker tuning params - #[envconfig(default = "5")] - pub heartbeat_window_seconds: u64, - - #[envconfig(default = "500")] - pub linger_time_ms: u64, - - #[envconfig(default = "100")] - pub max_updates_buffered: usize, - - #[envconfig(default = "10000000")] - pub max_bytes_buffered: usize, - - #[envconfig(default = "10")] - pub flush_loop_interval_ms: u64, } #[allow(dead_code)] @@ -107,7 +91,7 @@ pub struct AppConfig { } impl Config { - pub fn to_components(self) -> (AppConfig, PoolConfig, KafkaConfig, WorkerConfig) { + pub fn to_components(self) -> (AppConfig, PoolConfig, KafkaConfig) { let app_config = AppConfig { host: self.host, port: self.port, @@ -133,14 +117,6 @@ impl Config { idle_timeout_seconds: Some(self.pg_idle_timeout_seconds), }; - let worker_config = WorkerConfig { - heartbeat_window_seconds: Some(self.heartbeat_window_seconds), - linger_time_ms: Some(self.linger_time_ms), - max_updates_buffered: Some(self.max_updates_buffered), - max_bytes_buffered: Some(self.max_bytes_buffered), - flush_loop_interval_ms: Some(self.flush_loop_interval_ms), - }; - - (app_config, pool_config, self.kafka, worker_config) + (app_config, pool_config, self.kafka) } } diff --git a/rust/cyclotron-fetch/src/context.rs b/rust/cyclotron-fetch/src/context.rs index 2f8f935846251..d88acd1633c02 100644 --- a/rust/cyclotron-fetch/src/context.rs +++ b/rust/cyclotron-fetch/src/context.rs @@ -3,7 +3,6 @@ use std::sync::{Arc, RwLock}; use common_kafka::config::KafkaConfig; use common_kafka::kafka_producer::create_kafka_producer; use common_kafka::kafka_producer::KafkaContext; -use cyclotron_core::WorkerConfig; use cyclotron_core::{PoolConfig, Worker, SHARD_ID_KEY}; use health::HealthHandle; use rdkafka::producer::FutureProducer; @@ -26,7 +25,6 @@ impl AppContext { config: AppConfig, pool_config: PoolConfig, kafka_config: KafkaConfig, - worker_config: WorkerConfig, liveness: HealthHandle, kafka_liveness: HealthHandle, ) -> Result { @@ -52,7 +50,7 @@ impl AppContext { } }; - let worker = Worker::new(pool_config, worker_config).await?; + let worker = Worker::new(pool_config).await?; let labels = vec![ (SHARD_ID_KEY.to_string(), config.shard_id.clone()), diff --git a/rust/cyclotron-fetch/src/fetch.rs b/rust/cyclotron-fetch/src/fetch.rs index cd8b0eb7011a7..bc5082e53ebaa 100644 --- a/rust/cyclotron-fetch/src/fetch.rs +++ b/rust/cyclotron-fetch/src/fetch.rs @@ -1,7 +1,7 @@ use std::{cmp::min, collections::HashMap, fmt::Display, sync::Arc}; use chrono::{DateTime, Duration, Utc}; -use cyclotron_core::{Bytes, Job, JobError, JobState, QueueError, Worker}; +use cyclotron_core::{Bytes, Job, JobState, QueueError, Worker}; use futures::StreamExt; use http::StatusCode; use reqwest::Response; @@ -29,8 +29,6 @@ pub enum FetchError { JobFetchTimeout, #[error(transparent)] QueueError(#[from] QueueError), - #[error(transparent)] - JobError(#[from] JobError), // TRICKY - in most cases, serde errors are a FetchError (something coming from the queue was // invalid), but this is used in cases where /we/ fail to serialise something /to/ the queue #[error(transparent)] @@ -606,7 +604,7 @@ where // We downgrade the priority of jobs that fail, so first attempts at jobs get better QoS context.worker.set_priority(job_id, old_priority + 1)?; - context.worker.release_job(job_id, None).await?; + context.worker.flush_job(job_id).await?; } else { // Complete the job, with a Failed result let result: FetchResult = FetchResult::Failure { @@ -651,9 +649,7 @@ pub async fn complete_job( worker.set_parameters(job_id, Some(result))?; worker.set_blob(job_id, body)?; worker.set_metadata(job_id, None)?; // We're finished with the job, so clear our internal state - - // Since these tasks are lightweight, we just block waiting on the flush here. - worker.release_job(job_id, None).await?; + worker.flush_job(job_id).await?; Ok(()) } diff --git a/rust/cyclotron-fetch/src/main.rs b/rust/cyclotron-fetch/src/main.rs index 7806733b0a45a..ebefa9f01d787 100644 --- a/rust/cyclotron-fetch/src/main.rs +++ b/rust/cyclotron-fetch/src/main.rs @@ -56,7 +56,7 @@ async fn main() { let liveness = HealthRegistry::new("liveness"); - let (app_config, pool_config, kafka_config, worker_config) = config.to_components(); + let (app_config, pool_config, kafka_config) = config.to_components(); let bind = format!("{}:{}", app_config.host, app_config.port); info!( @@ -81,7 +81,6 @@ async fn main() { app_config, pool_config, kafka_config, - worker_config, worker_liveness, kafka_liveness, ) diff --git a/rust/cyclotron-fetch/tests/fetch.rs b/rust/cyclotron-fetch/tests/fetch.rs index eab323cce4ff3..42657837112ad 100644 --- a/rust/cyclotron-fetch/tests/fetch.rs +++ b/rust/cyclotron-fetch/tests/fetch.rs @@ -25,7 +25,7 @@ pub async fn test_run_migrations(db: PgPool) { pub async fn test_completes_fetch(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone(), Default::default()); + let return_worker = Worker::from_pool(db.clone()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -62,7 +62,7 @@ pub async fn test_completes_fetch(db: PgPool) { pub async fn test_returns_failure_after_retries(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone(), Default::default()); + let return_worker = Worker::from_pool(db.clone()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -106,7 +106,7 @@ pub async fn test_returns_failure_after_retries(db: PgPool) { pub fn fetch_discards_bad_metadata(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone(), Default::default()); + let return_worker = Worker::from_pool(db.clone()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -144,7 +144,7 @@ pub fn fetch_discards_bad_metadata(db: PgPool) { pub fn fetch_with_minimum_params_works(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone(), Default::default()); + let return_worker = Worker::from_pool(db.clone()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -192,7 +192,7 @@ pub fn fetch_with_minimum_params_works(db: PgPool) { pub async fn test_completes_fetch_with_headers(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone(), Default::default()); + let return_worker = Worker::from_pool(db.clone()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -235,7 +235,7 @@ pub async fn test_completes_fetch_with_headers(db: PgPool) { pub async fn test_completes_fetch_with_body(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone(), Default::default()); + let return_worker = Worker::from_pool(db.clone()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -273,7 +273,7 @@ pub async fn test_completes_fetch_with_body(db: PgPool) { pub async fn test_completes_fetch_with_vm_state(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone(), Default::default()); + let return_worker = Worker::from_pool(db.clone()); let server = MockServer::start(); let mock = server.mock(|when, then| { diff --git a/rust/cyclotron-fetch/tests/utils.rs b/rust/cyclotron-fetch/tests/utils.rs index b07ec123bf726..7faef3d4bec08 100644 --- a/rust/cyclotron-fetch/tests/utils.rs +++ b/rust/cyclotron-fetch/tests/utils.rs @@ -16,7 +16,7 @@ const FETCH_QUEUE: &str = "fetch"; const RETURN_QUEUE: &str = "return"; pub async fn get_app_test_context(db: PgPool) -> AppContext { - let worker = Worker::from_pool(db.clone(), Default::default()); + let worker = Worker::from_pool(db.clone()); let client = reqwest::Client::new(); let concurrency_limit = Arc::new(Semaphore::new(1)); let health = health::HealthRegistry::new("test"); diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index 3251b6cd4127f..bbaea28513bd5 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -13,7 +13,7 @@ use common_kafka::{test::create_mock_kafka, APP_METRICS2_TOPIC}; #[sqlx::test(migrations = "../cyclotron-core/migrations")] async fn janitor_test(db: PgPool) { - let worker = Worker::from_pool(db.clone(), Default::default()); + let worker = Worker::from_pool(db.clone()); let manager = QueueManager::from_pool(db.clone()); // Purposefully MUCH smaller than would be used in production, so @@ -26,7 +26,6 @@ async fn janitor_test(db: PgPool) { // to be smaller here, to test heartbeat behaviour let mut worker = worker; worker.heartbeat_window = stall_timeout / 2; - worker.max_buffered = 0; // No buffering for testing, flush immediately let worker = worker; let (mock_cluster, mock_producer) = create_mock_kafka().await; @@ -82,7 +81,7 @@ async fn janitor_test(db: PgPool) { .unwrap(); worker.set_state(job.id, JobState::Completed).unwrap(); - worker.release_job(job.id, None).await.unwrap(); + worker.flush_job(job.id).await.unwrap(); let result = janitor.run_once().await.unwrap(); assert_eq!(result.completed, 1); @@ -127,7 +126,7 @@ async fn janitor_test(db: PgPool) { .unwrap(); worker.set_state(job.id, JobState::Failed).unwrap(); - worker.release_job(job.id, None).await.unwrap(); + worker.flush_job(job.id).await.unwrap(); let result = janitor.run_once().await.unwrap(); assert_eq!(result.completed, 0); @@ -192,7 +191,7 @@ async fn janitor_test(db: PgPool) { // Now, the worker can't flush the job worker.set_state(job.id, JobState::Completed).unwrap(); - let result = worker.release_job(job.id, None).await; + let result = worker.flush_job(job.id).await; assert!(result.is_err()); // But if we re-dequeue the job, we can flush it @@ -203,7 +202,7 @@ async fn janitor_test(db: PgPool) { .pop() .unwrap(); worker.set_state(job.id, JobState::Completed).unwrap(); - worker.release_job(job.id, None).await.unwrap(); + worker.flush_job(job.id).await.unwrap(); janitor.run_once().await.unwrap(); // Clean up the completed job to reset for the next test @@ -235,7 +234,7 @@ async fn janitor_test(db: PgPool) { // The worker can still flush the job worker.set_state(job.id, JobState::Completed).unwrap(); - worker.release_job(job.id, None).await.unwrap(); + worker.flush_job(job.id).await.unwrap(); // and now cleanup will work let result = janitor.run_once().await.unwrap(); @@ -267,7 +266,7 @@ async fn janitor_test(db: PgPool) { worker.set_state(job.id, JobState::Completed).unwrap(); let result = worker.heartbeat(job.id).await; assert!(result.is_err()); - let result = worker.release_job(job.id, None).await; + let result = worker.flush_job(job.id).await; assert!(result.is_err()); // re-dequeue the job @@ -290,20 +289,19 @@ async fn janitor_test(db: PgPool) { // The worker can't flush the job worker.set_state(job.id, JobState::Completed).unwrap(); - let result = worker.release_job(job.id, None).await; + let result = worker.flush_job(job.id).await; assert!(result.is_err()); // Sixth test - the janitor can operate on multiple jobs at once manager.create_job(job_init.clone()).await.unwrap(); manager.create_job(job_init.clone()).await.unwrap(); - let jobs = worker.dequeue_jobs(&queue_name, 2).await.unwrap(); worker.set_state(jobs[0].id, JobState::Completed).unwrap(); worker.set_state(jobs[1].id, JobState::Failed).unwrap(); - worker.release_job(jobs[0].id, None).await.unwrap(); - worker.release_job(jobs[1].id, None).await.unwrap(); + worker.flush_job(jobs[0].id).await.unwrap(); + worker.flush_job(jobs[1].id).await.unwrap(); let result = janitor.run_once().await.unwrap(); assert_eq!(result.completed, 1); diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 06295dfee1681..6e2468448bb6c 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -1,6 +1,6 @@ { "name": "@posthog/cyclotron", - "version": "0.1.2", + "version": "0.1.1", "description": "Node bindings for cyclotron", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/rust/cyclotron-node/src/lib.rs b/rust/cyclotron-node/src/lib.rs index aa8ca7a252034..a9071b96de856 100644 --- a/rust/cyclotron-node/src/lib.rs +++ b/rust/cyclotron-node/src/lib.rs @@ -1,8 +1,6 @@ use chrono::{DateTime, Utc}; -use cyclotron_core::{ - Job, JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker, WorkerConfig, -}; +use cyclotron_core::{Job, JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; use neon::{ handle::Handle, object::Object, @@ -63,21 +61,14 @@ fn hello(mut cx: FunctionContext) -> JsResult { fn init_worker_impl(mut cx: FunctionContext, throw_on_reinit: bool) -> JsResult { let arg1 = cx.argument::(0)?; - let config: PoolConfig = from_json_string(&mut cx, arg1)?; - let worker_config: WorkerConfig = if let Ok(arg2) = cx.argument::(1) { - from_json_string(&mut cx, arg2)? - } else { - Default::default() - }; - let (deferred, promise) = cx.promise(); let channel = cx.channel(); let runtime = runtime(&mut cx)?; let fut = async move { - let worker = Worker::new(config, worker_config).await; + let worker = Worker::new(config).await; deferred.settle_with(&channel, move |mut cx| { if WORKER.get().is_some() && !throw_on_reinit { return Ok(cx.null()); // Short circuit to make using maybe_init a no-op @@ -284,7 +275,7 @@ fn dequeue_with_vm_state(mut cx: FunctionContext) -> JsResult { Ok(promise) } -fn release_job(mut cx: FunctionContext) -> JsResult { +fn flush_job(mut cx: FunctionContext) -> JsResult { let arg1 = cx.argument::(0)?.value(&mut cx); let job_id: Uuid = arg1 .parse() @@ -304,38 +295,9 @@ fn release_job(mut cx: FunctionContext) -> JsResult { return; } }; - // We await the handle here because this translates waiting on the join handle all the way to - // a Js Promise.await. - let res = worker.release_job(job_id, None).await; + let res = worker.flush_job(job_id).await; deferred.settle_with(&channel, move |mut cx| { - res.or_else(|e| cx.throw_error(format!("{}", e)))?; - Ok(cx.null()) - }); - }; - - runtime.spawn(fut); - - Ok(promise) -} - -fn force_flush(mut cx: FunctionContext) -> JsResult { - let (deferred, promise) = cx.promise(); - let channel = cx.channel(); - let runtime = runtime(&mut cx)?; - - let fut = async move { - let worker = match WORKER.get() { - Some(worker) => worker, - None => { - deferred.settle_with(&channel, |mut cx| { - throw_null_err(&mut cx, "worker not initialized") - }); - return; - } - }; - let res = worker.force_flush().await; - deferred.settle_with(&channel, |mut cx| { - res.or_else(|e| cx.throw_error(format!("{}", e)))?; + res.or_else(|e: cyclotron_core::QueueError| cx.throw_error(format!("{}", e)))?; Ok(cx.null()) }); }; @@ -655,8 +617,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("createJob", create_job)?; cx.export_function("dequeueJobs", dequeue_jobs)?; cx.export_function("dequeueJobsWithVmState", dequeue_with_vm_state)?; - cx.export_function("releaseJob", release_job)?; - cx.export_function("forceFlush", force_flush)?; + cx.export_function("flushJob", flush_job)?; cx.export_function("setState", set_state)?; cx.export_function("setQueue", set_queue)?; cx.export_function("setPriority", set_priority)?; diff --git a/rust/cyclotron-node/src/types.ts b/rust/cyclotron-node/src/types.ts index 0db0fc06c33ff..88c8a26099083 100644 --- a/rust/cyclotron-node/src/types.ts +++ b/rust/cyclotron-node/src/types.ts @@ -17,22 +17,6 @@ export type CyclotronInternalPoolConfig = { idle_timeout_seconds?: number } -// Config specific to tuning the worker batch flush and heartbeat behaviour -export type CyclotronWorkerTuningConfig = { - // The worker will issue at most 1 heartbeat per this many seconds per job. - heartbeatWindowSeconds?: number - // Updates released by the worker will be buffered for at most this many milliseconds before a flush is attempted. - lingerTimeMs?: number - // The maximum number of updates that can be buffered before a flush is attempted. - maxUpdatesBuffered?: number - // The maximum number of update bytes the worker will buffer, calculated as the sum of VM state and blob - maxBytesBuffered?: number - // The worker flushes update batches in a background loop, which will check if a flush is due based on the - // conditions above every this many milliseconds. Users may also call forceFlush(), which will try to flush any - // pending updates immediately. - flushLoopIntervalMs?: number -} - export type CyclotronJobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' export type CyclotronJob = { diff --git a/rust/cyclotron-node/src/worker.ts b/rust/cyclotron-node/src/worker.ts index 63ac1dd7d32e8..7b3411863af7d 100644 --- a/rust/cyclotron-node/src/worker.ts +++ b/rust/cyclotron-node/src/worker.ts @@ -1,7 +1,7 @@ // eslint-disable-next-line @typescript-eslint/no-var-requires const cyclotron = require('../index.node') import { convertToInternalPoolConfig, deserializeObject, serializeObject } from './helpers' -import { CyclotronJob, CyclotronJobState, CyclotronJobUpdate, CyclotronPoolConfig, CyclotronWorkerTuningConfig } from './types' +import { CyclotronJob, CyclotronJobState, CyclotronJobUpdate, CyclotronPoolConfig } from './types' const parseJob = (job: CyclotronJob): CyclotronJob => { return { @@ -32,15 +32,7 @@ export class CyclotronWorker { private consumerLoopPromise: Promise | null = null - constructor(private config: CyclotronWorkerConfig, private tuning?: CyclotronWorkerTuningConfig) { - let defaultTuning: CyclotronWorkerTuningConfig = { - heartbeatWindowSeconds: 5, - lingerTimeMs: 500, - maxUpdatesBuffered: 100, - maxBytesBuffered: 10000000, - flushLoopIntervalMs: 10, - } - this.tuning = { ...defaultTuning, ...this.tuning } + constructor(private config: CyclotronWorkerConfig) { this.config = config } @@ -56,7 +48,7 @@ export class CyclotronWorker { throw new Error('Already consuming') } - await cyclotron.maybeInitWorker(JSON.stringify(convertToInternalPoolConfig(this.config.pool)), JSON.stringify(this.tuning)) + await cyclotron.maybeInitWorker(JSON.stringify(convertToInternalPoolConfig(this.config.pool))) this.isConsuming = true this.consumerLoopPromise = this.startConsumerLoop(processBatch).finally(() => { @@ -100,9 +92,8 @@ export class CyclotronWorker { await (this.consumerLoopPromise ?? Promise.resolve()) } - async releaseJob(jobId: string): Promise { - // We hand the promise back to the user, letting them decide when to await it. - return cyclotron.releaseJob(jobId) + async flushJob(jobId: string): Promise { + return await cyclotron.flushJob(jobId) } updateJob(id: CyclotronJob['id'], state: CyclotronJobState, updates?: CyclotronJobUpdate): void {