diff --git a/plugin-server/package.json b/plugin-server/package.json index 0ce824160212c..8ef4ee7aa01cc 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -148,6 +148,6 @@ }, "cyclotron": { "//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true, - "version": "0.1.4" + "version": "0.1.5" } } diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 6c7de256ea040..f28ffd0243568 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -427,17 +427,16 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { ) // For the cyclotron ones we simply create the jobs - await Promise.all( - cyclotronInvocations.map((item) => - this.cyclotronManager?.createJob({ - teamId: item.globals.project.id, - functionId: item.hogFunction.id, - queueName: 'hog', - priority: item.priority, - vmState: serializeHogFunctionInvocation(item), - }) - ) - ) + const cyclotronJobs = cyclotronInvocations.map((item) => { + return { + teamId: item.globals.project.id, + functionId: item.hogFunction.id, + queueName: 'hog', + priority: item.priority, + vmState: serializeHogFunctionInvocation(item), + } + }) + await this.cyclotronManager?.bulkCreateJobs(cyclotronJobs) if (kafkaInvocations.length) { // As we don't want to over-produce to kafka we invoke the hog functions and then queue the results diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index 2615c4f4c5e74..cc11a47a4502d 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -5,7 +5,6 @@ mod ops; // Types mod types; pub use types::AggregatedDelete; -pub use types::BulkInsertResult; pub use types::Bytes; pub use types::Job; pub use types::JobInit; diff --git a/rust/cyclotron-core/src/manager.rs b/rust/cyclotron-core/src/manager.rs index 56a39cd3fb706..bce83c9bec0af 100644 --- a/rust/cyclotron-core/src/manager.rs +++ b/rust/cyclotron-core/src/manager.rs @@ -10,7 +10,7 @@ use crate::{ manager::{bulk_create_jobs, create_job}, meta::count_total_waiting_jobs, }, - BulkInsertResult, JobInit, ManagerConfig, QueueError, + JobInit, ManagerConfig, QueueError, }; pub struct Shard { @@ -82,48 +82,26 @@ impl QueueManager { shard.create_job_blocking(init, timeout).await } - pub async fn bulk_create_jobs(&self, inits: Vec) -> BulkInsertResult { + pub async fn bulk_create_jobs(&self, inits: Vec) -> Result<(), QueueError> { + let next = self + .next_shard + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let shards = self.shards.read().await; - let chunk_size = inits.len() / shards.len(); - let mut result = BulkInsertResult::new(); - // TODO - at some point, we should dynamically re-acquire the lock each time, to allow - // for re-routing jobs away from a bad shard during a bulk insert, but right now, we - // don't even re-try inserts. Later work. - for chunk in inits.chunks(chunk_size) { - let next_shard = self - .next_shard - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let shard = &shards[next_shard % shards.len()]; - let shard_result = shard.bulk_create_jobs(chunk).await; - if let Err(err) = shard_result { - result.add_failure(err, chunk.to_vec()); - } - } - - result + shards[next % shards.len()].bulk_create_jobs(&inits).await } pub async fn bulk_create_jobs_blocking( &self, inits: Vec, timeout: Option, - ) -> BulkInsertResult { + ) -> Result<(), QueueError> { + let next = self + .next_shard + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let shards = self.shards.read().await; - let chunk_size = inits.len() / shards.len(); - let mut result = BulkInsertResult::new(); - for chunk in inits.chunks(chunk_size) { - let next_shard = self - .next_shard - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let shard = &shards[next_shard % shards.len()]; - // TODO - we sequentially try each shard, but we could try to parallelize this. - let shard_result = shard.bulk_create_jobs_blocking(chunk, timeout).await; - if let Err(err) = shard_result { - result.add_failure(err, chunk.to_vec()); - } - } - - result + shards[next % shards.len()] + .bulk_create_jobs_blocking(&inits, timeout) + .await } } diff --git a/rust/cyclotron-core/src/types.rs b/rust/cyclotron-core/src/types.rs index 72d1e0f82a124..cda07cf80cb4c 100644 --- a/rust/cyclotron-core/src/types.rs +++ b/rust/cyclotron-core/src/types.rs @@ -4,8 +4,6 @@ use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use std::str::FromStr; use uuid::Uuid; -use crate::QueueError; - pub type Bytes = Vec; #[derive(Debug, Deserialize, Serialize, sqlx::Type)] @@ -120,32 +118,6 @@ impl JobUpdate { } } -// Bulk inserts across multiple shards can partially succeed, so we need to track failures -// and hand back failed job inits to the caller. -pub struct BulkInsertResult { - pub failures: Vec<(QueueError, Vec)>, -} - -impl BulkInsertResult { - pub fn new() -> Self { - Self { failures: vec![] } - } - - pub fn add_failure(&mut self, err: QueueError, jobs: Vec) { - self.failures.push((err, jobs)); - } - - pub fn all_succeeded(&self) -> bool { - self.failures.is_empty() - } -} - -impl Default for BulkInsertResult { - fn default() -> Self { - Self::new() - } -} - // Result of janitor's `delete_completed_and_failed_jobs` #[derive(sqlx::FromRow, Debug)] pub struct AggregatedDelete { diff --git a/rust/cyclotron-core/tests/base_ops.rs b/rust/cyclotron-core/tests/base_ops.rs index 5354eea6f4f01..58835f24c13f6 100644 --- a/rust/cyclotron-core/tests/base_ops.rs +++ b/rust/cyclotron-core/tests/base_ops.rs @@ -237,8 +237,10 @@ pub async fn test_bulk_insert(db: PgPool) { }) .collect::>(); - let result = manager.bulk_create_jobs(jobs).await; - assert!(result.all_succeeded()); + manager + .bulk_create_jobs(jobs) + .await + .expect("failed to bulk insert jobs"); let dequeue_jobs = worker .dequeue_jobs(&job_template.queue_name, 1000) diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 13a16e01b2f41..6be1ad8a6ec5c 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -1,6 +1,6 @@ { "name": "@posthog/cyclotron", - "version": "0.1.4", + "version": "0.1.5", "description": "Node bindings for cyclotron", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/rust/cyclotron-node/src/lib.rs b/rust/cyclotron-node/src/lib.rs index aa8ca7a252034..a481f22ba3a23 100644 --- a/rust/cyclotron-node/src/lib.rs +++ b/rust/cyclotron-node/src/lib.rs @@ -10,7 +10,7 @@ use neon::{ result::{JsResult, NeonResult}, types::{ buffer::TypedArray, JsArray, JsArrayBuffer, JsNull, JsNumber, JsObject, JsPromise, - JsString, JsUint8Array, JsUndefined, JsValue, + JsString, JsUint32Array, JsUint8Array, JsUndefined, JsValue, }, }; use once_cell::sync::OnceCell; @@ -174,7 +174,7 @@ fn create_job(mut cx: FunctionContext) -> JsResult { None } else { Some( - blob.downcast_or_throw::(&mut cx)? + blob.downcast_or_throw::(&mut cx)? .as_slice(&cx) .to_vec(), ) @@ -182,17 +182,7 @@ fn create_job(mut cx: FunctionContext) -> JsResult { let js_job: JsJob = from_json_string(&mut cx, arg1)?; - let job = JobInit { - team_id: js_job.team_id, - queue_name: js_job.queue_name, - priority: js_job.priority, - scheduled: js_job.scheduled, - function_id: js_job.function_id, - vm_state: js_job.vm_state.map(|s| s.into_bytes()), - parameters: js_job.parameters.map(|s| s.into_bytes()), - metadata: js_job.metadata.map(|s| s.into_bytes()), - blob, - }; + let job = js_job.to_job_init(blob); let (deferred, promise) = cx.promise(); let channel = cx.channel(); @@ -220,6 +210,79 @@ fn create_job(mut cx: FunctionContext) -> JsResult { Ok(promise) } +fn bulk_create_jobs(mut cx: FunctionContext) -> JsResult { + let jobs = cx.argument::(0)?; + let jobs: Vec = from_json_string(&mut cx, jobs)?; + + let blobs = cx.argument::(1)?; + let blob_lengths = cx.argument::(2)?; + + let blobs = blobs + .downcast_or_throw::(&mut cx)? + .as_slice(&cx) + .to_vec(); + + let blob_lengths: Vec = blob_lengths + .downcast_or_throw::(&mut cx)? + .as_slice(&cx) + .iter() + .map(|&v| v as usize) + .collect(); + + if jobs.len() != blob_lengths.len() { + return cx.throw_error("jobs and blob_lengths must have the same length"); + } + + if blobs.len() != blob_lengths.iter().sum::() { + return cx.throw_error("blob_lengths must sum to the length of blobs"); + } + + let mut blob_offset: usize = 0; + let blobs: Vec>> = blob_lengths + .iter() + .map(|&len| { + if len == 0 { + return None; + } + let blob = blobs[blob_offset..blob_offset + len].to_vec(); + blob_offset += len; + Some(blob) + }) + .collect(); + + let jobs: Vec = jobs + .into_iter() + .zip(blobs) + .map(|(job, blob)| job.to_job_init(blob)) + .collect(); + + let (deferred, promise) = cx.promise(); + let channel = cx.channel(); + let runtime = runtime(&mut cx)?; + + let fut = async move { + let manager = match MANAGER.get() { + Some(manager) => manager, + None => { + deferred.settle_with(&channel, |mut cx| { + throw_null_err(&mut cx, "manager not initialized") + }); + return; + } + }; + + let res = manager.bulk_create_jobs(jobs).await; + deferred.settle_with(&channel, move |mut cx| { + res.or_else(|e| cx.throw_error(format!("{}", e)))?; + Ok(cx.null()) + }); + }; + + runtime.spawn(fut); + + Ok(promise) +} + fn dequeue_jobs(mut cx: FunctionContext) -> JsResult { let queue_name = cx.argument::(0)?.value(&mut cx); @@ -645,6 +708,22 @@ fn jobs_to_js_array<'a>(cx: &mut TaskContext<'a>, jobs: Vec) -> JsResult<'a Ok(js_array) } +impl JsJob { + fn to_job_init(&self, blob: Option>) -> JobInit { + JobInit { + team_id: self.team_id, + queue_name: self.queue_name.clone(), + priority: self.priority, + scheduled: self.scheduled, + function_id: self.function_id, + vm_state: self.vm_state.as_ref().map(|s| s.as_bytes().to_vec()), + parameters: self.parameters.as_ref().map(|s| s.as_bytes().to_vec()), + metadata: self.metadata.as_ref().map(|s| s.as_bytes().to_vec()), + blob, + } + } +} + #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("hello", hello)?; @@ -653,6 +732,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("maybeInitWorker", maybe_init_worker)?; cx.export_function("maybeInitManager", maybe_init_manager)?; cx.export_function("createJob", create_job)?; + cx.export_function("bulkCreateJobs", bulk_create_jobs)?; cx.export_function("dequeueJobs", dequeue_jobs)?; cx.export_function("dequeueJobsWithVmState", dequeue_with_vm_state)?; cx.export_function("releaseJob", release_job)?; diff --git a/rust/cyclotron-node/src/manager.ts b/rust/cyclotron-node/src/manager.ts index 5c932c164c1a6..8858edaa9a7ee 100644 --- a/rust/cyclotron-node/src/manager.ts +++ b/rust/cyclotron-node/src/manager.ts @@ -35,6 +35,46 @@ export class CyclotronManager { } const json = JSON.stringify(jobInitInternal) - return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined) + return await cyclotron.createJob(json, job.blob ? job.blob : undefined) + } + + async bulkCreateJobs(jobs: CyclotronJobInit[]): Promise { + const jobInitsInternal = jobs.map((job) => { + job.priority ??= 1 + job.scheduled ??= new Date() + + return { + team_id: job.teamId, + function_id: job.functionId, + queue_name: job.queueName, + priority: job.priority, + scheduled: job.scheduled, + vm_state: job.vmState ? serializeObject('vmState', job.vmState) : null, + parameters: job.parameters ? serializeObject('parameters', job.parameters) : null, + metadata: job.metadata ? serializeObject('metadata', job.metadata) : null, + } + }) + const json = JSON.stringify(jobInitsInternal) + + const totalBytes = jobs.reduce((total, job) => total + (job.blob ? job.blob.byteLength : 0), 0) + + // The cyclotron API expects a single buffer with all the blobs concatenated, and an array of lengths. + // 0 lengths indicate that the job has no blob. + const blobs = new Uint8Array(totalBytes) + const blobLengths = new Uint32Array(jobs.length) + + let offset = 0; + for (let i = 0; i < jobs.length; i++) { + let blob = jobs[i].blob + if (blob) { + blobLengths[i] = blob.byteLength + blobs.set(blob, offset) + offset += blob.byteLength + } else { + blobLengths[i] = 0 + } + } + + return await cyclotron.bulkCreateJobs(json, blobs, blobLengths) } }