Skip to content

Commit

Permalink
chore(cyclotron): modify janitor metric, expose depth limit (#25060)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Sep 18, 2024
1 parent 4f5cfef commit 2ec6bf0
Show file tree
Hide file tree
Showing 22 changed files with 58 additions and 151 deletions.
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@
},
"cyclotron": {
"//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true,
"version": "0.1.3"
"version": "0.1.4"
}
}
4 changes: 3 additions & 1 deletion plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,10 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
handleBatch: (messages) => this._handleKafkaBatch(messages),
})

const shardDepthLimit = this.hub.CYCLOTRON_SHARD_DEPTH_LIMIT ?? 1000000

this.cyclotronManager = this.hub.CYCLOTRON_DATABASE_URL
? new CyclotronManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
? new CyclotronManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }], shardDepthLimit })
: undefined

await this.cyclotronManager?.connect()
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ export function getDefaultConfig(): PluginsServerConfig {
: isDevEnv()
? 'postgres://posthog:posthog@localhost:5432/cyclotron'
: '',

CYCLOTRON_SHARD_DEPTH_LIMIT: 1000000,
}
}

Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ export interface PluginsServerConfig extends CdpConfig {
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: number

CYCLOTRON_DATABASE_URL: string
CYCLOTRON_SHARD_DEPTH_LIMIT: number
}

export interface Hub extends PluginsServerConfig {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cyclotron-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl PoolConfig {
}
}

pub const DEFAULT_QUEUE_DEPTH_LIMIT: u64 = 10_000;
pub const DEFAULT_QUEUE_DEPTH_LIMIT: u64 = 1_000_000;
pub const DEFAULT_SHARD_HEALTH_CHECK_INTERVAL: u64 = 10;

#[derive(Debug, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion rust/cyclotron-core/src/janitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Janitor {
Ok(poison.len() as u64)
}

pub async fn waiting_jobs(&self) -> Result<u64, QueueError> {
pub async fn waiting_jobs(&self) -> Result<Vec<(u64, String)>, QueueError> {
count_total_waiting_jobs(&self.pool).await
}

Expand Down
3 changes: 2 additions & 1 deletion rust/cyclotron-core/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ impl Shard {
}

let pending = count_total_waiting_jobs(&self.pool).await?;
let is_full = pending >= self.depth_limit;
let total_pending = pending.iter().map(|(count, _)| count).sum::<u64>();
let is_full = total_pending >= self.depth_limit;
if !is_full {
*last_healthy = Utc::now();
}
Expand Down
20 changes: 14 additions & 6 deletions rust/cyclotron-core/src/ops/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@ use crate::{
DEAD_LETTER_QUEUE,
};

pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result<u64, QueueError>
pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result<Vec<(u64, String)>, QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let res = sqlx::query!(
"SELECT COUNT(*) FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW()",
struct Count {
count: Option<i64>,
queue_name: String,
}

let res = sqlx::query_as!(
Count,
"SELECT COUNT(*), queue_name FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW() GROUP BY queue_name",
)
.fetch_one(executor)
.fetch_all(executor)
.await?;

let res = res.count.unwrap_or(0);
Ok(res as u64)
Ok(res
.into_iter()
.map(|r| (r.count.unwrap_or(0) as u64, r.queue_name))
.collect())
}

// Returns an InvalidLock error if the query run did not affect any rows.
Expand Down
8 changes: 7 additions & 1 deletion rust/cyclotron-janitor/src/janitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ impl Janitor {
let _time = common_metrics::timing_guard(AVAILABLE_DEPTH_TIME, &self.metrics_labels);
self.inner.waiting_jobs().await?
};
common_metrics::gauge(AVAILABLE_DEPTH, &self.metrics_labels, available as f64);

let mut available_labels = self.metrics_labels.clone();
for (count, queue_name) in available {
available_labels.push(("queue_name".to_string(), queue_name));
common_metrics::gauge(AVAILABLE_DEPTH, &available_labels, count as f64);
available_labels.pop();
}

let dlq_depth = {
let _time = common_metrics::timing_guard(DLQ_DEPTH_TIME, &self.metrics_labels);
Expand Down
2 changes: 1 addition & 1 deletion rust/cyclotron-node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@posthog/cyclotron",
"version": "0.1.3",
"version": "0.1.4",
"description": "Node bindings for cyclotron",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
3 changes: 2 additions & 1 deletion rust/cyclotron-node/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import { convertToInternalPoolConfig, serializeObject } from './helpers'
import { CyclotronJobInit, CyclotronPoolConfig } from './types'

export class CyclotronManager {
constructor(private config: { shards: CyclotronPoolConfig[] }) {
constructor(private config: { shards: CyclotronPoolConfig[], shardDepthLimit: number }) {
this.config = config
}

async connect(): Promise<void> {
return await cyclotron.maybeInitManager(
JSON.stringify({
shards: this.config.shards.map((shard) => convertToInternalPoolConfig(shard)),
shard_depth_limit: this.config.shardDepthLimit,
})
)
}
Expand Down

0 comments on commit 2ec6bf0

Please sign in to comment.