From 2ec6bf0fa60a744d963f14297dd326bc802e20c2 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Wed, 18 Sep 2024 20:16:09 +0100 Subject: [PATCH] chore(cyclotron): modify janitor metric, expose depth limit (#25060) --- plugin-server/package.json | 2 +- plugin-server/src/cdp/cdp-consumers.ts | 4 +++- plugin-server/src/config/config.ts | 2 ++ plugin-server/src/types.ts | 1 + ...5b577efe424710b45f02e1ddeece8bca96f87.json | 12 ---------- ...9126073113a4af03c4c9fd3a81004d898f883.json | 18 --------------- ...0fffa01b679951ec786f8ee2410454b9a2de0.json | 12 ---------- ...9a284f16421f77b77fe38e67143f28e270805.json | 12 ---------- ...7f8d490dda8b27a150454e413d02f89c92050.json | 12 ---------- ...129b8c4d7020b7cb1999702aee1bfb1087efb.json | 12 ---------- ...f46045c332cc45e69b08708b289cc933b3b5c.json | 12 ---------- ...a8c4eb4cb910b0c5ddbc6bdbd9156af7b4e23.json | 12 ---------- ...4ef7a5da523e168a5f9be47f6caaef09487d7.json | 12 ---------- ...2f7a3ad2b2ee57d8d2353c858299d7b6ffe13.json | 23 ------------------- ...074995829d1372fe8ec1fd683f9524bcebb8b.json | 23 +++++++++++++++++++ rust/cyclotron-core/src/config.rs | 2 +- rust/cyclotron-core/src/janitor.rs | 2 +- rust/cyclotron-core/src/manager.rs | 3 ++- rust/cyclotron-core/src/ops/meta.rs | 20 +++++++++++----- rust/cyclotron-janitor/src/janitor.rs | 8 ++++++- rust/cyclotron-node/package.json | 2 +- rust/cyclotron-node/src/manager.ts | 3 ++- 22 files changed, 58 insertions(+), 151 deletions(-) delete mode 100644 rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json delete mode 100644 rust/cyclotron-core/.sqlx/query-213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883.json delete mode 100644 rust/cyclotron-core/.sqlx/query-2b62adf40f8dd5758690c763df30fffa01b679951ec786f8ee2410454b9a2de0.json delete mode 100644 rust/cyclotron-core/.sqlx/query-2f6de0977357909dfd8d3d510c39a284f16421f77b77fe38e67143f28e270805.json delete mode 100644 rust/cyclotron-core/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json delete mode 100644 rust/cyclotron-core/.sqlx/query-884da9767d2992c7b279b4f8df5129b8c4d7020b7cb1999702aee1bfb1087efb.json delete mode 100644 rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json delete mode 100644 rust/cyclotron-core/.sqlx/query-98da1f12285a97a47ce88535c82a8c4eb4cb910b0c5ddbc6bdbd9156af7b4e23.json delete mode 100644 rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json delete mode 100644 rust/cyclotron-core/.sqlx/query-b3239c1dde9a88769ec488299612f7a3ad2b2ee57d8d2353c858299d7b6ffe13.json create mode 100644 rust/cyclotron-core/.sqlx/query-b420ccc79fa7847f65246adf76a074995829d1372fe8ec1fd683f9524bcebb8b.json diff --git a/plugin-server/package.json b/plugin-server/package.json index d9492e316844f..725bd36f904df 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -147,6 +147,6 @@ }, "cyclotron": { "//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true, - "version": "0.1.3" + "version": "0.1.4" } } diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 19fbf5a7a3c84..23ca7d0b60d0a 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -592,8 +592,10 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { handleBatch: (messages) => this._handleKafkaBatch(messages), }) + const shardDepthLimit = this.hub.CYCLOTRON_SHARD_DEPTH_LIMIT ?? 1000000 + this.cyclotronManager = this.hub.CYCLOTRON_DATABASE_URL - ? new CyclotronManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) + ? new CyclotronManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }], shardDepthLimit }) : undefined await this.cyclotronManager?.connect() diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index a0c64393c4352..8e40236e2e801 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -197,6 +197,8 @@ export function getDefaultConfig(): PluginsServerConfig { : isDevEnv() ? 'postgres://posthog:posthog@localhost:5432/cyclotron' : '', + + CYCLOTRON_SHARD_DEPTH_LIMIT: 1000000, } } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 90bea28edc33d..a580c09a28edd 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -287,6 +287,7 @@ export interface PluginsServerConfig extends CdpConfig { SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: number CYCLOTRON_DATABASE_URL: string + CYCLOTRON_SHARD_DEPTH_LIMIT: number } export interface Hub extends PluginsServerConfig { diff --git a/rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json b/rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json deleted file mode 100644 index 23b0665a2d357..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET vm_state = $1 WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": ["Bytea", "Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87" -} diff --git a/rust/cyclotron-core/.sqlx/query-213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883.json b/rust/cyclotron-core/.sqlx/query-213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883.json deleted file mode 100644 index f9150cfcda3e1..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT COUNT(*) FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW()", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [null] - }, - "hash": "213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883" -} diff --git a/rust/cyclotron-core/.sqlx/query-2b62adf40f8dd5758690c763df30fffa01b679951ec786f8ee2410454b9a2de0.json b/rust/cyclotron-core/.sqlx/query-2b62adf40f8dd5758690c763df30fffa01b679951ec786f8ee2410454b9a2de0.json deleted file mode 100644 index 3c2761eccb0a5..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-2b62adf40f8dd5758690c763df30fffa01b679951ec786f8ee2410454b9a2de0.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET queue_name = $1 WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": ["Text", "Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "2b62adf40f8dd5758690c763df30fffa01b679951ec786f8ee2410454b9a2de0" -} diff --git a/rust/cyclotron-core/.sqlx/query-2f6de0977357909dfd8d3d510c39a284f16421f77b77fe38e67143f28e270805.json b/rust/cyclotron-core/.sqlx/query-2f6de0977357909dfd8d3d510c39a284f16421f77b77fe38e67143f28e270805.json deleted file mode 100644 index b0e1ef221041f..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-2f6de0977357909dfd8d3d510c39a284f16421f77b77fe38e67143f28e270805.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET priority = $1 WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": ["Int2", "Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "2f6de0977357909dfd8d3d510c39a284f16421f77b77fe38e67143f28e270805" -} diff --git a/rust/cyclotron-core/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json b/rust/cyclotron-core/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json deleted file mode 100644 index 5a2231c7c2fdd..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET blob = $1 WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": ["Bytea", "Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050" -} diff --git a/rust/cyclotron-core/.sqlx/query-884da9767d2992c7b279b4f8df5129b8c4d7020b7cb1999702aee1bfb1087efb.json b/rust/cyclotron-core/.sqlx/query-884da9767d2992c7b279b4f8df5129b8c4d7020b7cb1999702aee1bfb1087efb.json deleted file mode 100644 index b728d398568c5..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-884da9767d2992c7b279b4f8df5129b8c4d7020b7cb1999702aee1bfb1087efb.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET lock_id = NULL, last_heartbeat = NULL WHERE id = $1 AND lock_id = $2", - "describe": { - "columns": [], - "parameters": { - "Left": ["Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "884da9767d2992c7b279b4f8df5129b8c4d7020b7cb1999702aee1bfb1087efb" -} diff --git a/rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json b/rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json deleted file mode 100644 index 66ae665232405..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET metadata = $1 WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": ["Bytea", "Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c" -} diff --git a/rust/cyclotron-core/.sqlx/query-98da1f12285a97a47ce88535c82a8c4eb4cb910b0c5ddbc6bdbd9156af7b4e23.json b/rust/cyclotron-core/.sqlx/query-98da1f12285a97a47ce88535c82a8c4eb4cb910b0c5ddbc6bdbd9156af7b4e23.json deleted file mode 100644 index 59a56c441cb7c..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-98da1f12285a97a47ce88535c82a8c4eb4cb910b0c5ddbc6bdbd9156af7b4e23.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET scheduled = $1 WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": ["Timestamptz", "Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "98da1f12285a97a47ce88535c82a8c4eb4cb910b0c5ddbc6bdbd9156af7b4e23" -} diff --git a/rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json b/rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json deleted file mode 100644 index 4364f2fee8816..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs SET parameters = $1 WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": ["Bytea", "Uuid", "Uuid"] - }, - "nullable": [] - }, - "hash": "b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7" -} diff --git a/rust/cyclotron-core/.sqlx/query-b3239c1dde9a88769ec488299612f7a3ad2b2ee57d8d2353c858299d7b6ffe13.json b/rust/cyclotron-core/.sqlx/query-b3239c1dde9a88769ec488299612f7a3ad2b2ee57d8d2353c858299d7b6ffe13.json deleted file mode 100644 index d2942f91b1930..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-b3239c1dde9a88769ec488299612f7a3ad2b2ee57d8d2353c858299d7b6ffe13.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE cyclotron_jobs\n SET state = $1, last_transition = NOW(), transition_count = transition_count + 1\n WHERE id = $2 AND lock_id = $3", - "describe": { - "columns": [], - "parameters": { - "Left": [ - { - "Custom": { - "name": "jobstate", - "kind": { - "Enum": ["available", "completed", "failed", "running", "paused"] - } - } - }, - "Uuid", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "b3239c1dde9a88769ec488299612f7a3ad2b2ee57d8d2353c858299d7b6ffe13" -} diff --git a/rust/cyclotron-core/.sqlx/query-b420ccc79fa7847f65246adf76a074995829d1372fe8ec1fd683f9524bcebb8b.json b/rust/cyclotron-core/.sqlx/query-b420ccc79fa7847f65246adf76a074995829d1372fe8ec1fd683f9524bcebb8b.json new file mode 100644 index 0000000000000..ed79a7102f52f --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-b420ccc79fa7847f65246adf76a074995829d1372fe8ec1fd683f9524bcebb8b.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*), queue_name FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW() GROUP BY queue_name", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "queue_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [null, false] + }, + "hash": "b420ccc79fa7847f65246adf76a074995829d1372fe8ec1fd683f9524bcebb8b" +} diff --git a/rust/cyclotron-core/src/config.rs b/rust/cyclotron-core/src/config.rs index f49ba07e2c201..4a9cbb2229b49 100644 --- a/rust/cyclotron-core/src/config.rs +++ b/rust/cyclotron-core/src/config.rs @@ -31,7 +31,7 @@ impl PoolConfig { } } -pub const DEFAULT_QUEUE_DEPTH_LIMIT: u64 = 10_000; +pub const DEFAULT_QUEUE_DEPTH_LIMIT: u64 = 1_000_000; pub const DEFAULT_SHARD_HEALTH_CHECK_INTERVAL: u64 = 10; #[derive(Debug, Serialize, Deserialize)] diff --git a/rust/cyclotron-core/src/janitor.rs b/rust/cyclotron-core/src/janitor.rs index dd48f031b1060..d70eb336379f9 100644 --- a/rust/cyclotron-core/src/janitor.rs +++ b/rust/cyclotron-core/src/janitor.rs @@ -59,7 +59,7 @@ impl Janitor { Ok(poison.len() as u64) } - pub async fn waiting_jobs(&self) -> Result { + pub async fn waiting_jobs(&self) -> Result, QueueError> { count_total_waiting_jobs(&self.pool).await } diff --git a/rust/cyclotron-core/src/manager.rs b/rust/cyclotron-core/src/manager.rs index 6339c4e9cf4ed..56a39cd3fb706 100644 --- a/rust/cyclotron-core/src/manager.rs +++ b/rust/cyclotron-core/src/manager.rs @@ -216,7 +216,8 @@ impl Shard { } let pending = count_total_waiting_jobs(&self.pool).await?; - let is_full = pending >= self.depth_limit; + let total_pending = pending.iter().map(|(count, _)| count).sum::(); + let is_full = total_pending >= self.depth_limit; if !is_full { *last_healthy = Utc::now(); } diff --git a/rust/cyclotron-core/src/ops/meta.rs b/rust/cyclotron-core/src/ops/meta.rs index f22b41e70f1a7..040c88d5398b0 100644 --- a/rust/cyclotron-core/src/ops/meta.rs +++ b/rust/cyclotron-core/src/ops/meta.rs @@ -6,18 +6,26 @@ use crate::{ DEAD_LETTER_QUEUE, }; -pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result +pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result, QueueError> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { - let res = sqlx::query!( - "SELECT COUNT(*) FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW()", + struct Count { + count: Option, + queue_name: String, + } + + let res = sqlx::query_as!( + Count, + "SELECT COUNT(*), queue_name FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW() GROUP BY queue_name", ) - .fetch_one(executor) + .fetch_all(executor) .await?; - let res = res.count.unwrap_or(0); - Ok(res as u64) + Ok(res + .into_iter() + .map(|r| (r.count.unwrap_or(0) as u64, r.queue_name)) + .collect()) } // Returns an InvalidLock error if the query run did not affect any rows. diff --git a/rust/cyclotron-janitor/src/janitor.rs b/rust/cyclotron-janitor/src/janitor.rs index 6fc0b248b54ab..63e21234e4baa 100644 --- a/rust/cyclotron-janitor/src/janitor.rs +++ b/rust/cyclotron-janitor/src/janitor.rs @@ -135,7 +135,13 @@ impl Janitor { let _time = common_metrics::timing_guard(AVAILABLE_DEPTH_TIME, &self.metrics_labels); self.inner.waiting_jobs().await? }; - common_metrics::gauge(AVAILABLE_DEPTH, &self.metrics_labels, available as f64); + + let mut available_labels = self.metrics_labels.clone(); + for (count, queue_name) in available { + available_labels.push(("queue_name".to_string(), queue_name)); + common_metrics::gauge(AVAILABLE_DEPTH, &available_labels, count as f64); + available_labels.pop(); + } let dlq_depth = { let _time = common_metrics::timing_guard(DLQ_DEPTH_TIME, &self.metrics_labels); diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 2b6ca207f54f4..13a16e01b2f41 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -1,6 +1,6 @@ { "name": "@posthog/cyclotron", - "version": "0.1.3", + "version": "0.1.4", "description": "Node bindings for cyclotron", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/rust/cyclotron-node/src/manager.ts b/rust/cyclotron-node/src/manager.ts index bba6488828ba2..5c932c164c1a6 100644 --- a/rust/cyclotron-node/src/manager.ts +++ b/rust/cyclotron-node/src/manager.ts @@ -5,7 +5,7 @@ import { convertToInternalPoolConfig, serializeObject } from './helpers' import { CyclotronJobInit, CyclotronPoolConfig } from './types' export class CyclotronManager { - constructor(private config: { shards: CyclotronPoolConfig[] }) { + constructor(private config: { shards: CyclotronPoolConfig[], shardDepthLimit: number }) { this.config = config } @@ -13,6 +13,7 @@ export class CyclotronManager { return await cyclotron.maybeInitManager( JSON.stringify({ shards: this.config.shards.map((shard) => convertToInternalPoolConfig(shard)), + shard_depth_limit: this.config.shardDepthLimit, }) ) }