diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index 03888d2268bc7..a24eaf53d4e69 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -57,6 +57,7 @@ jobs: defaults: run: working-directory: 'plugin-server' + steps: - uses: actions/checkout@v3 @@ -81,7 +82,6 @@ jobs: tests: name: Plugin Server Tests (${{matrix.shard}}) needs: changes - if: needs.changes.outputs.plugin-server == 'true' runs-on: ubuntu-latest strategy: @@ -97,17 +97,21 @@ jobs: steps: - name: Code check out + if: needs.changes.outputs.plugin-server == 'true' uses: actions/checkout@v3 - name: Stop/Start stack with Docker Compose + if: needs.changes.outputs.plugin-server == 'true' run: | docker compose -f docker-compose.dev.yml down docker compose -f docker-compose.dev.yml up -d - name: Add Kafka to /etc/hosts + if: needs.changes.outputs.plugin-server == 'true' run: echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts - name: Set up Python + if: needs.changes.outputs.plugin-server == 'true' uses: actions/setup-python@v5 with: python-version: 3.11.9 @@ -118,35 +122,24 @@ jobs: # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - run: pip install uv - - name: Install rust - uses: dtolnay/rust-toolchain@1.77 - - - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - rust/target - key: ${{ runner.os }}-cargo-release-${{ hashFiles('**/Cargo.lock') }} - - - name: Install sqlx-cli - working-directory: rust - run: cargo install sqlx-cli@0.7.3 --no-default-features --features native-tls,postgres - - name: Install SAML (python3-saml) dependencies + if: needs.changes.outputs.plugin-server == 'true' run: | sudo apt-get update sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl - name: Install python dependencies + if: needs.changes.outputs.plugin-server == 'true' run: | uv pip install --system -r requirements-dev.txt uv pip install --system -r requirements.txt - name: Install pnpm + if: needs.changes.outputs.plugin-server == 'true' uses: pnpm/action-setup@v4 - name: Set up Node.js + if: needs.changes.outputs.plugin-server == 'true' uses: actions/setup-node@v4 with: node-version: 18.12.1 @@ -154,14 +147,17 @@ jobs: cache-dependency-path: plugin-server/pnpm-lock.yaml - name: Install package.json dependencies with pnpm + if: needs.changes.outputs.plugin-server == 'true' run: cd plugin-server && pnpm i - name: Wait for Clickhouse, Redis & Kafka + if: needs.changes.outputs.plugin-server == 'true' run: | docker compose -f docker-compose.dev.yml up kafka redis clickhouse -d --wait bin/check_kafka_clickhouse_up - name: Set up databases + if: needs.changes.outputs.plugin-server == 'true' env: TEST: 'true' SECRET_KEY: 'abcdef' # unsafe - for testing only @@ -169,6 +165,7 @@ jobs: run: cd plugin-server && pnpm setup:test - name: Test with Jest + if: needs.changes.outputs.plugin-server == 'true' env: # Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_posthog' diff --git a/.vscode/launch.json b/.vscode/launch.json index 88f00c46c9502..389be51af0c57 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -119,8 +119,7 @@ "WORKER_CONCURRENCY": "2", "OBJECT_STORAGE_ENABLED": "True", "HOG_HOOK_URL": "http://localhost:3300/hoghook", - "CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "", - "CDP_CYCLOTRON_ENABLED_TEAMS": "*" + "CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "" }, "presentation": { "group": "main" diff --git a/bin/migrate b/bin/migrate index 2f2aa49ed749b..1c32b3b5b0614 100755 --- a/bin/migrate +++ b/bin/migrate @@ -1,11 +1,5 @@ #!/bin/bash set -e -SCRIPT_DIR=$(dirname "$(readlink -f "$0")") - -# NOTE when running in docker, rust might not exist so we need to check for it -if [ -d "$SCRIPT_DIR/../rust" ]; then - bash $SCRIPT_DIR/../rust/bin/migrate-cyclotron -fi python manage.py migrate python manage.py migrate_clickhouse diff --git a/bin/start-cyclotron b/bin/start-cyclotron index 2885390287c0f..074ec4802d0a4 100755 --- a/bin/start-cyclotron +++ b/bin/start-cyclotron @@ -12,7 +12,7 @@ export RUST_LOG=${DEBUG:-debug} SQLX_QUERY_LEVEL=${SQLX_QUERY_LEVEL:-warn} export RUST_LOG=$RUST_LOG,sqlx::query=$SQLX_QUERY_LEVEL -export DATABASE_URL=${CYCLOTRON_DATABASE_URL:-postgres://posthog:posthog@localhost:5432/cyclotron} +export DATABASE_URL=${DATABASE_URL:-postgres://posthog:posthog@localhost:5432/posthog} export ALLOW_INTERNAL_IPS=${ALLOW_INTERNAL_IPS:-true} ./target/debug/cyclotron-fetch & diff --git a/plugin-server/package.json b/plugin-server/package.json index 452011ada2a02..5189b589fb2c9 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -23,8 +23,7 @@ "prettier:check": "prettier --check .", "prepublishOnly": "pnpm build", "setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse", - "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron", - "setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron", + "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment", "services:start": "cd .. && docker compose -f docker-compose.dev.yml up", "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts index cfc70e7f1b8fc..34de05942471e 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/cdp-api.ts @@ -9,7 +9,7 @@ import { HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { HogWatcher, HogWatcherState } from './hog-watcher' import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types' -import { createInvocation, queueBlobToString } from './utils' +import { createInvocation } from './utils' export class CdpApi { private hogExecutor: HogExecutor @@ -144,19 +144,11 @@ export class CdpApi { if (invocation.queue === 'fetch') { if (mock_async_functions) { // Add the state, simulating what executeAsyncResponse would do - - // Re-parse the fetch args for the logging - const fetchArgs = { - ...invocation.queueParameters, - body: queueBlobToString(invocation.queueBlob), - } - response = { invocation: { ...invocation, queue: 'hog', - queueParameters: { response: { status: 200 } }, - queueBlob: Buffer.from('{}'), + queueParameters: { response: { status: 200, body: {} } }, }, finished: false, logs: [ @@ -168,7 +160,7 @@ export class CdpApi { { level: 'info', timestamp: DateTime.now(), - message: `fetch(${JSON.stringify(fetchArgs, null, 2)})`, + message: `fetch(${JSON.stringify(invocation.queueParameters, null, 2)})`, }, ], } diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index f75b2a23096e5..8c4eec5e11951 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,9 +1,8 @@ -import { CyclotronJob, CyclotronManager, CyclotronWorker } from '@posthog/cyclotron' +import cyclotron from '@posthog/cyclotron' import { captureException } from '@sentry/node' import { Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' -import { buildIntegerMatcher } from '../config/config' import { KAFKA_APP_METRICS_2, KAFKA_CDP_FUNCTION_CALLBACKS, @@ -15,15 +14,7 @@ import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../kafka/config' import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' import { runInstrumentedFunction } from '../main/utils' -import { - AppMetric2Type, - Hub, - PluginServerService, - RawClickHouseEvent, - TeamId, - TimestampFormat, - ValueMatcher, -} from '../types' +import { AppMetric2Type, Hub, PluginServerService, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' import { createKafkaProducerWrapper } from '../utils/db/hub' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { captureTeamEvent } from '../utils/posthog' @@ -40,7 +31,6 @@ import { CdpRedis, createCdpRedisPool } from './redis' import { HogFunctionInvocation, HogFunctionInvocationGlobals, - HogFunctionInvocationQueueParameters, HogFunctionInvocationResult, HogFunctionInvocationSerialized, HogFunctionInvocationSerializedCompressed, @@ -54,7 +44,7 @@ import { createInvocation, gzipObject, prepareLogEntriesForClickhouse, - serializeHogFunctionInvocation, + serializeInvocation, unGzipObject, } from './utils' @@ -98,6 +88,8 @@ abstract class CdpConsumerBase { protected kafkaProducer?: KafkaProducerWrapper protected abstract name: string + protected abstract topic: string + protected abstract consumerGroupId: string protected heartbeat = () => {} @@ -116,7 +108,7 @@ abstract class CdpConsumerBase { public get service(): PluginServerService { return { - id: this.name, + id: this.consumerGroupId, onShutdown: async () => await this.stop(), healthcheck: () => this.isHealthy() ?? false, batchConsumer: this.batchConsumer, @@ -164,6 +156,8 @@ abstract class CdpConsumerBase { return results } + protected abstract _handleKafkaBatch(messages: Message[]): Promise + protected async produceQueuedMessages() { const messages = [...this.messagesToProduce] this.messagesToProduce = [] @@ -211,23 +205,20 @@ abstract class CdpConsumerBase { }) } - // NOTE: These will be removed once we are only on Cyclotron - protected async queueInvocationsToKafka(invocation: HogFunctionInvocation[]) { + protected async queueInvocations(invocation: HogFunctionInvocation[]) { await Promise.all( invocation.map(async (item) => { - await this.queueInvocationToKafka(item) + await this.queueInvocation(item) }) ) } - protected async queueInvocationToKafka(invocation: HogFunctionInvocation) { - // NOTE: WE keep the queueParams args as kafka land still needs them - const serializedInvocation: HogFunctionInvocationSerialized = { - ...invocation, - hogFunctionId: invocation.hogFunction.id, - } + protected async queueInvocation(invocation: HogFunctionInvocation) { + // TODO: Add cylcotron check here and enqueue that way + // For now we just enqueue to kafka + // For kafka style this is overkill to enqueue this way but it simplifies migrating to the new system - delete (serializedInvocation as any).hogFunction + const serializedInvocation = serializeInvocation(invocation) const request: HogFunctionInvocationSerializedCompressed = { state: await gzipObject(serializedInvocation), @@ -243,22 +234,12 @@ abstract class CdpConsumerBase { } protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise { - return await runInstrumentedFunction({ + await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.produceResults`, func: async () => { - await this.hogWatcher.observeResults(results) - await Promise.all( results.map(async (result) => { - if (result.finished || result.error) { - this.produceAppMetric({ - team_id: result.invocation.teamId, - app_source_id: result.invocation.hogFunction.id, - metric_kind: result.error ? 'failure' : 'success', - metric_name: result.error ? 'failed' : 'succeeded', - count: 1, - }) - } + // Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions this.produceLogs(result) @@ -277,20 +258,30 @@ abstract class CdpConsumerBase { key: `${team!.api_token}:${event.distinct_id}`, }) } + + if (result.finished || result.error) { + this.produceAppMetric({ + team_id: result.invocation.teamId, + app_source_id: result.invocation.hogFunction.id, + metric_kind: result.error ? 'failure' : 'success', + metric_name: result.error ? 'failed' : 'succeeded', + count: 1, + }) + } else { + // Means there is follow up so we enqueue it + await this.queueInvocation(result.invocation) + } }) ) }, }) } - protected async startKafkaConsumer(options: { - topic: string - groupId: string - handleBatch: (messages: Message[]) => Promise - }): Promise { + protected async startKafkaConsumer() { this.batchConsumer = await startBatchConsumer({ - ...options, connectionConfig: createRdConnectionConfigFromEnvVars(this.hub), + groupId: this.consumerGroupId, + topic: this.topic, autoCommit: true, sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, @@ -321,7 +312,7 @@ abstract class CdpConsumerBase { statsKey: `cdpConsumer.handleEachBatch`, sendTimeoutGuardToSentry: false, func: async () => { - await options.handleBatch(messages) + await this._handleKafkaBatch(messages) }, }) }, @@ -331,9 +322,6 @@ abstract class CdpConsumerBase { addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) this.batchConsumer.consumer.on('disconnected', async (err) => { - if (!this.isStopping) { - return - } // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect // we need to listen to disconnect and make sure we're stopped status.info('🔁', `${this.name} batch consumer disconnected, cleaning up`, { err }) @@ -345,11 +333,15 @@ abstract class CdpConsumerBase { // NOTE: This is only for starting shared services await Promise.all([ this.hogFunctionManager.start(), - createKafkaProducerWrapper(this.hub).then((producer) => { - this.kafkaProducer = producer - this.kafkaProducer.producer.connect() - }), + this.hub.CYCLOTRON_DATABASE_URL + ? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) + : Promise.resolve(), ]) + + this.kafkaProducer = await createKafkaProducerWrapper(this.hub) + this.kafkaProducer.producer.connect() + + await this.startKafkaConsumer() } public async stop(): Promise { @@ -368,27 +360,20 @@ abstract class CdpConsumerBase { } public isHealthy() { + // TODO: Check either kafka consumer or cyclotron worker exists + // and that whatever exists is healthy return this.batchConsumer?.isHealthy() } } /** * This consumer handles incoming events from the main clickhouse topic - * Currently it produces to both kafka and Cyclotron based on the team */ + export class CdpProcessedEventsConsumer extends CdpConsumerBase { protected name = 'CdpProcessedEventsConsumer' - private cyclotronMatcher: ValueMatcher - private cyclotronManager?: CyclotronManager - - constructor(hub: Hub) { - super(hub) - this.cyclotronMatcher = buildIntegerMatcher(hub.CDP_CYCLOTRON_ENABLED_TEAMS, true) - } - - private cyclotronEnabled(invocation: HogFunctionInvocation): boolean { - return !!(this.cyclotronManager && this.cyclotronMatcher(invocation.globals.project.id)) - } + protected topic = KAFKA_EVENTS_JSON + protected consumerGroupId = 'cdp-processed-events-consumer' public async processBatch(invocationGlobals: HogFunctionInvocationGlobals[]): Promise { if (!invocationGlobals.length) { @@ -399,48 +384,23 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { this.createHogFunctionInvocations(invocationGlobals) ) - // Split out the cyclotron invocations - const [cyclotronInvocations, kafkaInvocations] = invocationsToBeQueued.reduce( - (acc, item) => { - if (this.cyclotronEnabled(item)) { - acc[0].push(item) - } else { - acc[1].push(item) - } - - return acc - }, - [[], []] as [HogFunctionInvocation[], HogFunctionInvocation[]] - ) - - // For the cyclotron ones we simply create the jobs - await Promise.all( - cyclotronInvocations.map((item) => - this.cyclotronManager?.createJob({ - teamId: item.globals.project.id, - functionId: item.hogFunction.id, - queueName: 'hog', - priority: item.priority, - vmState: serializeHogFunctionInvocation(item), - }) - ) - ) - - if (kafkaInvocations.length) { - // As we don't want to over-produce to kafka we invoke the hog functions and then queue the results + if (this.hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP) { + // NOTE: This is for testing the two ways of enqueueing processing. It will be swapped out for a cyclotron env check + // Kafka based workflow const invocationResults = await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.executeInvocations`, func: async () => { - const hogResults = await this.runManyWithHeartbeat(kafkaInvocations, (item) => + const hogResults = await this.runManyWithHeartbeat(invocationsToBeQueued, (item) => this.hogExecutor.execute(item) ) return [...hogResults] }, }) + await this.hogWatcher.observeResults(invocationResults) await this.processInvocationResults(invocationResults) - const newInvocations = invocationResults.filter((r) => !r.finished).map((r) => r.invocation) - await this.queueInvocationsToKafka(newInvocations) + } else { + await this.queueInvocations(invocationsToBeQueued) } await this.produceQueuedMessages() @@ -451,6 +411,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { /** * Finds all matching hog functions for the given globals. * Filters them for their disabled state as well as masking configs + * */ protected async createHogFunctionInvocations( invocationGlobals: HogFunctionInvocationGlobals[] @@ -483,10 +444,8 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { }) const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id)) - const validInvocations: HogFunctionInvocation[] = [] - // Iterate over adding them to the list and updating their priority - possibleInvocations.forEach((item) => { + const notDisabledInvocations = possibleInvocations.filter((item) => { const state = states[item.hogFunction.id].state if (state >= HogWatcherState.disabledForPeriod) { this.produceAppMetric({ @@ -499,19 +458,15 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { : 'disabled_permanently', count: 1, }) - return + return false } - if (state === HogWatcherState.degraded) { - item.priority = 2 - } - - validInvocations.push(item) + return true }) // Now we can filter by masking configs const { masked, notMasked: notMaskedInvocations } = await this.hogMasker.filterByMasking( - validInvocations + notDisabledInvocations ) masked.forEach((item) => { @@ -570,28 +525,15 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { await this.processBatch(invocationGlobals) } - - public async start(): Promise { - await super.start() - await this.startKafkaConsumer({ - topic: KAFKA_EVENTS_JSON, - groupId: 'cdp-processed-events-consumer', - handleBatch: (messages) => this._handleKafkaBatch(messages), - }) - - this.cyclotronManager = this.hub.CYCLOTRON_DATABASE_URL - ? new CyclotronManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) - : undefined - - await this.cyclotronManager?.connect() - } } /** - * This consumer only deals with kafka messages and will eventually be replaced by the Cyclotron worker + * This consumer handles actually invoking hog in a loop */ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { protected name = 'CdpFunctionCallbackConsumer' + protected topic = KAFKA_CDP_FUNCTION_CALLBACKS + protected consumerGroupId = 'cdp-function-callback-consumer' public async processBatch(invocations: HogFunctionInvocation[]): Promise { if (!invocations.length) { @@ -621,9 +563,8 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { }, }) + await this.hogWatcher.observeResults(invocationResults) await this.processInvocationResults(invocationResults) - const newInvocations = invocationResults.filter((r) => !r.finished).map((r) => r.invocation) - await this.queueInvocationsToKafka(newInvocations) await this.produceQueuedMessages() } @@ -699,143 +640,52 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { await this.processBatch(events) } - - public async start(): Promise { - await super.start() - await this.startKafkaConsumer({ - topic: KAFKA_CDP_FUNCTION_CALLBACKS, - groupId: 'cdp-function-callback-consumer', - handleBatch: (messages) => this._handleKafkaBatch(messages), - }) - } } -/** - * The future of the CDP consumer. This will be the main consumer that will handle all hog jobs from Cyclotron - */ -export class CdpCyclotronWorker extends CdpConsumerBase { - protected name = 'CdpCyclotronWorker' - private cyclotronWorker?: CyclotronWorker - private runningWorker: Promise | undefined - protected queue: 'hog' | 'fetch' = 'hog' - - public async processBatch(invocations: HogFunctionInvocation[]): Promise { - if (!invocations.length) { - return - } - - const invocationResults = await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.executeInvocations`, - func: async () => { - // NOTE: In the future this service will never do fetching (unless we decide we want to do it in node at some point) - // This is just "for now" to support the transition to cyclotron - const fetchQueue = invocations.filter((item) => item.queue === 'fetch') - const fetchResults = await this.runManyWithHeartbeat(fetchQueue, (item) => - this.fetchExecutor.execute(item) - ) - - const hogQueue = invocations.filter((item) => item.queue === 'hog') - const hogResults = await this.runManyWithHeartbeat(hogQueue, (item) => this.hogExecutor.execute(item)) - return [...hogResults, ...(fetchResults.filter(Boolean) as HogFunctionInvocationResult[])] - }, - }) - - await this.processInvocationResults(invocationResults) - await this.updateJobs(invocationResults) - await this.produceQueuedMessages() - } - - private async updateJobs(invocations: HogFunctionInvocationResult[]) { - await Promise.all( - invocations.map(async (item) => { - const id = item.invocation.id - if (item.error) { - status.debug('⚡️', 'Updating job to failed', id) - this.cyclotronWorker?.updateJob(id, 'failed') - } else if (item.finished) { - status.debug('⚡️', 'Updating job to completed', id) - this.cyclotronWorker?.updateJob(id, 'completed') - } else { - status.debug('⚡️', 'Updating job to available', id) - this.cyclotronWorker?.updateJob(id, 'available', { - priority: item.invocation.priority, - vmState: serializeHogFunctionInvocation(item.invocation), - queueName: item.invocation.queue, - parameters: item.invocation.queueParameters ?? null, - blob: item.invocation.queueBlob ?? null, - }) - } - await this.cyclotronWorker?.flushJob(id) - }) - ) - } - - private async handleJobBatch(jobs: CyclotronJob[]) { - const invocations: HogFunctionInvocation[] = [] - - for (const job of jobs) { - // NOTE: This is all a bit messy and might be better to refactor into a helper - if (!job.functionId) { - throw new Error('Bad job: ' + JSON.stringify(job)) - } - const hogFunction = this.hogFunctionManager.getHogFunction(job.functionId) - - if (!hogFunction) { - // Here we need to mark the job as failed - - status.error('Error finding hog function', { - id: job.functionId, - }) - this.cyclotronWorker?.updateJob(job.id, 'failed') - await this.cyclotronWorker?.flushJob(job.id) - continue - } - - const parsedState = job.vmState as HogFunctionInvocationSerialized - - invocations.push({ - id: job.id, - globals: parsedState.globals, - teamId: hogFunction.team_id, - hogFunction, - priority: job.priority, - queue: (job.queueName as any) ?? 'hog', - queueParameters: job.parameters as HogFunctionInvocationQueueParameters | undefined, - queueBlob: job.blob ?? undefined, - vmState: parsedState.vmState, - timings: parsedState.timings, - }) - } - - await this.processBatch(invocations) - } - - public async start() { - await super.start() - - this.cyclotronWorker = new CyclotronWorker({ - pool: { dbUrl: this.hub.CYCLOTRON_DATABASE_URL }, - queueName: this.queue, - includeVmState: true, - batchMaxSize: this.hub.CDP_CYCLOTRON_BATCH_SIZE, - pollDelayMs: this.hub.CDP_CYCLOTRON_BATCH_DELAY_MS, - }) - await this.cyclotronWorker.connect((jobs) => this.handleJobBatch(jobs)) - } - - public async stop() { - await super.stop() - await this.cyclotronWorker?.disconnect() - await this.runningWorker - } - - public isHealthy() { - return this.cyclotronWorker?.isHealthy() ?? false - } -} - -// Mostly used for testing -export class CdpCyclotronWorkerFetch extends CdpCyclotronWorker { - protected name = 'CdpCyclotronWorkerFetch' - protected queue = 'fetch' as const -} +// // TODO: Split out non-Kafka specific parts of CdpConsumerBase so that it can be used by the +// // Cyclotron worker below. Or maybe we can just wait, and rip the Kafka bits out once Cyclotron is +// // shipped (and rename it something other than consumer, probably). For now, this is an easy way to +// // use existing code and get an end-to-end demo shipped. +// export class CdpCyclotronWorker extends CdpFunctionCallbackConsumer { +// protected name = 'CdpCyclotronWorker' +// protected topic = 'UNUSED-CdpCyclotronWorker' +// protected consumerGroupId = 'UNUSED-CdpCyclotronWorker' +// private runningWorker: Promise | undefined +// private isUnhealthy = false + +// private async innerStart() { +// try { +// const limit = 100 // TODO: Make configurable. +// while (!this.isStopping) { +// const jobs = await cyclotron.dequeueJobsWithVmState('hog', limit) +// // TODO: Decode jobs into the right types + +// await this.processBatch(jobs) +// } +// } catch (err) { +// this.isUnhealthy = true +// console.error('Error in Cyclotron worker', err) +// throw err +// } +// } + +// public async start() { +// await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) +// await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }) + +// // Consumer `start` expects an async task is started, and not that `start` itself blocks +// // indefinitely. +// this.runningWorker = this.innerStart() + +// return Promise.resolve() +// } + +// public async stop() { +// await super.stop() +// await this.runningWorker +// } + +// public isHealthy() { +// return this.isUnhealthy +// } +// } diff --git a/plugin-server/src/cdp/fetch-executor.ts b/plugin-server/src/cdp/fetch-executor.ts index 8907fafc35239..89900215ec1fd 100644 --- a/plugin-server/src/cdp/fetch-executor.ts +++ b/plugin-server/src/cdp/fetch-executor.ts @@ -12,7 +12,7 @@ import { HogFunctionQueueParametersFetchRequest, HogFunctionQueueParametersFetchResponse, } from './types' -import { gzipObject, queueBlobToString, serializeHogFunctionInvocation } from './utils' +import { gzipObject, serializeInvocation } from './utils' export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity] @@ -40,22 +40,19 @@ export class FetchExecutor { async execute(invocation: HogFunctionInvocation): Promise { if (invocation.queue !== 'fetch' || !invocation.queueParameters) { - status.error('🦔', `[HogExecutor] Bad invocation`, { invocation }) - return + throw new Error('Bad invocation') } const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest - - const body = queueBlobToString(invocation.queueBlob) - if (body) { - histogramFetchPayloadSize.observe(body.length / 1024) + if (params.body) { + histogramFetchPayloadSize.observe(params.body.length / 1024) } try { if (this.hogHookEnabledForTeams(invocation.teamId)) { // This is very temporary until we are commited to Cyclotron const payload: HogFunctionInvocationAsyncRequest = { - state: await gzipObject(serializeHogFunctionInvocation(invocation)), + state: await gzipObject(serializeInvocation(invocation)), teamId: invocation.teamId, hogFunctionId: invocation.hogFunction.id, asyncFunctionRequest: { @@ -64,7 +61,6 @@ export class FetchExecutor { params.url, { ...params, - body, }, ], }, @@ -92,12 +88,11 @@ export class FetchExecutor { } const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest - const body = queueBlobToString(invocation.queueBlob) || '' - let responseBody = '' const resParams: HogFunctionQueueParametersFetchResponse = { response: { status: 0, + body: {}, }, error: null, timings: [], @@ -107,12 +102,17 @@ export class FetchExecutor { const start = performance.now() const fetchResponse = await trackedFetch(params.url, { method: params.method, - body, + body: params.body, headers: params.headers, timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS, }) - responseBody = await fetchResponse.text() + let responseBody = await fetchResponse.text() + try { + responseBody = JSON.parse(responseBody) + } catch (err) { + // Ignore + } const duration = performance.now() - start @@ -123,6 +123,7 @@ export class FetchExecutor { resParams.response = { status: fetchResponse.status, + body: responseBody, } } catch (err) { status.error('🦔', `[HogExecutor] Error during fetch`, { error: String(err) }) @@ -134,7 +135,6 @@ export class FetchExecutor { ...invocation, queue: 'hog', queueParameters: resParams, - queueBlob: Buffer.from(responseBody), }, finished: false, logs: [], diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 28bad8e38099a..382f6b3fc3549 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -14,7 +14,7 @@ import { HogFunctionQueueParametersFetchResponse, HogFunctionType, } from './types' -import { convertToHogFunctionFilterGlobal, queueBlobToString } from './utils' +import { convertToHogFunctionFilterGlobal } from './utils' const MAX_ASYNC_STEPS = 2 const MAX_HOG_LOGS = 10 @@ -153,33 +153,25 @@ export class HogExecutor { try { // If the queueParameter is set then we have an expected format that we want to parse and add to the stack if (invocation.queueParameters) { - // NOTE: This is all based around the only response type being fetch currently const { logs = [], response = null, error, timings = [], } = invocation.queueParameters as HogFunctionQueueParametersFetchResponse - let responseBody: any = undefined - if (response) { - // Convert from buffer to string - responseBody = queueBlobToString(invocation.queueBlob) - } // Reset the queue parameters to be sure invocation.queue = 'hog' invocation.queueParameters = undefined - invocation.queueBlob = undefined - - const status = typeof response?.status === 'number' ? response.status : 503 // Special handling for fetch - if (status >= 400) { + // TODO: Would be good to have a dedicated value in the fetch response for the status code + if (response?.status && response.status >= 400) { // Generic warn log for bad status codes logs.push({ level: 'warn', timestamp: DateTime.now(), - message: `Fetch returned bad status: ${status}`, + message: `Fetch returned bad status: ${response.status}`, }) } @@ -191,22 +183,16 @@ export class HogExecutor { throw new Error(error) } - if (typeof responseBody === 'string') { + if (typeof response?.body === 'string') { try { - responseBody = JSON.parse(responseBody) + response.body = JSON.parse(response.body) } catch (e) { // pass - if it isn't json we just pass it on } } - // Finally we create the response object as the VM expects - const fetchResponse = { - status, - body: responseBody, - } - // Add the response to the stack to continue execution - invocation.vmState!.stack.push(fetchResponse) + invocation.vmState!.stack.push(response) invocation.timings.push(...timings) result.logs = [...logs, ...result.logs] } @@ -341,22 +327,18 @@ export class HogExecutor { const headers = fetchOptions?.headers || { 'Content-Type': 'application/json', } + let body = fetchOptions?.body // Modify the body to ensure it is a string (we allow Hog to send an object to keep things simple) - const body: string | undefined = fetchOptions?.body - ? typeof fetchOptions.body === 'string' - ? fetchOptions.body - : JSON.stringify(fetchOptions.body) - : fetchOptions?.body + body = body ? (typeof body === 'string' ? body : JSON.stringify(body)) : body result.invocation.queue = 'fetch' result.invocation.queueParameters = { url, method, headers, - return_queue: 'hog', + body, } - // The payload is always blob encoded - result.invocation.queueBlob = body ? Buffer.from(body) : undefined + break default: throw new Error(`Unknown async function '${execRes.asyncFunctionName}'`) @@ -384,7 +366,6 @@ export class HogExecutor { } } catch (err) { result.error = err.message - result.finished = true // Explicitly set to true to prevent infinite loops status.error( '🦔', `[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}`, diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts index 94803e209f25e..d356e6d66ce10 100644 --- a/plugin-server/src/cdp/hog-function-manager.ts +++ b/plugin-server/src/cdp/hog-function-manager.ts @@ -95,7 +95,6 @@ export class HogFunctionManager { if (!this.ready) { throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this') } - return this.cache.functions[id] } @@ -103,7 +102,6 @@ export class HogFunctionManager { if (!this.ready) { throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this') } - const fn = this.cache.functions[hogFunctionId] if (fn?.team_id === teamId) { return fn diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index 3ca31657cfb74..9d277bc4edfa8 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -146,9 +146,8 @@ export interface HogFunctionTiming { export type HogFunctionQueueParametersFetchRequest = { url: string method: string - return_queue: string - max_tries?: number - headers?: Record + body: string + headers: Record } export type HogFunctionQueueParametersFetchResponse = { @@ -157,6 +156,7 @@ export type HogFunctionQueueParametersFetchResponse = { /** The data to be passed to the Hog function from the response */ response?: { status: number + body: any } | null timings?: HogFunctionTiming[] logs?: LogEntry[] @@ -171,10 +171,8 @@ export type HogFunctionInvocation = { globals: HogFunctionInvocationGlobals teamId: Team['id'] hogFunction: HogFunctionType - priority: number queue: 'hog' | 'fetch' queueParameters?: HogFunctionInvocationQueueParameters - queueBlob?: Uint8Array // The current vmstate (set if the invocation is paused) vmState?: VMState timings: HogFunctionTiming[] diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts index c8e6cd25be2fe..375baa91a94e3 100644 --- a/plugin-server/src/cdp/utils.ts +++ b/plugin-server/src/cdp/utils.ts @@ -281,25 +281,16 @@ export function createInvocation( teamId: hogFunction.team_id, hogFunction, queue: 'hog', - priority: 1, timings: [], } } -export function serializeHogFunctionInvocation(invocation: HogFunctionInvocation): HogFunctionInvocationSerialized { +export function serializeInvocation(invocation: HogFunctionInvocation): HogFunctionInvocationSerialized { const serializedInvocation: HogFunctionInvocationSerialized = { ...invocation, hogFunctionId: invocation.hogFunction.id, - // We clear the params as they are never used in the serialized form - queueParameters: undefined, - queueBlob: undefined, } delete (serializedInvocation as any).hogFunction - - return serializedInvocation -} - -export function queueBlobToString(blob?: HogFunctionInvocation['queueBlob']): string | undefined { - return blob ? Buffer.from(blob).toString('utf-8') : undefined + return invocation } diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index a0c64393c4352..afa2ba1d72fe3 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -183,20 +183,14 @@ export function getDefaultConfig(): PluginsServerConfig { CDP_WATCHER_REFILL_RATE: 10, CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3, CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '', - CDP_CYCLOTRON_ENABLED_TEAMS: '', + CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: '', CDP_REDIS_PASSWORD: '', CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: true, CDP_REDIS_HOST: '', CDP_REDIS_PORT: 6479, - CDP_CYCLOTRON_BATCH_DELAY_MS: 50, - CDP_CYCLOTRON_BATCH_SIZE: 500, // Cyclotron - CYCLOTRON_DATABASE_URL: isTestEnv() - ? 'postgres://posthog:posthog@localhost:5432/test_cyclotron' - : isDevEnv() - ? 'postgres://posthog:posthog@localhost:5432/cyclotron' - : '', + CYCLOTRON_DATABASE_URL: '', } } diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index ff1f46b82d338..cafdc0451806d 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -10,12 +10,7 @@ import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' import { CdpApi } from '../cdp/cdp-api' -import { - CdpCyclotronWorker, - CdpCyclotronWorkerFetch, - CdpFunctionCallbackConsumer, - CdpProcessedEventsConsumer, -} from '../cdp/cdp-consumers' +import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-consumers' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginServerService, PluginsServerConfig } from '../types' import { closeHub, createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' @@ -463,23 +458,16 @@ export async function startPluginsServer( } } - if (capabilities.cdpCyclotronWorker) { - const hub = await setupHub() - - if (!hub.CYCLOTRON_DATABASE_URL) { - status.error('💥', 'Cyclotron database URL not set.') - } else { - const worker = new CdpCyclotronWorker(hub) - await worker.start() - services.push(worker.service) - - if (process.env.EXPERIMENTAL_CDP_FETCH_WORKER) { - const workerFetch = new CdpCyclotronWorkerFetch(hub) - await workerFetch.start() - services.push(workerFetch.service) - } - } - } + // if (capabilities.cdpCyclotronWorker) { + // ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) + // if (hub.CYCLOTRON_DATABASE_URL) { + // const worker = new CdpCyclotronWorker(hub) + // await worker.start() + // } else { + // // This is a temporary solution until we *require* Cyclotron to be configured. + // status.warn('💥', 'CYCLOTRON_DATABASE_URL is not set, not running Cyclotron worker') + // } + // } if (capabilities.http) { const app = setupCommonRoutes(services) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 90bea28edc33d..58253a210abd3 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -113,9 +113,7 @@ export type CdpConfig = { CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string - CDP_CYCLOTRON_ENABLED_TEAMS: string - CDP_CYCLOTRON_BATCH_SIZE: number - CDP_CYCLOTRON_BATCH_DELAY_MS: number + CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: string CDP_REDIS_HOST: string CDP_REDIS_PORT: number CDP_REDIS_PASSWORD: string diff --git a/plugin-server/src/utils/status.ts b/plugin-server/src/utils/status.ts index 0b6b8f26ca1c5..385b97739685e 100644 --- a/plugin-server/src/utils/status.ts +++ b/plugin-server/src/utils/status.ts @@ -15,7 +15,7 @@ export interface StatusBlueprint { export class Status implements StatusBlueprint { mode?: string - private logger?: pino.Logger + logger: pino.Logger prompt: string transport: any @@ -59,23 +59,11 @@ export class Status implements StatusBlueprint { close() { this.transport?.end() - this.logger = undefined } buildMethod(type: keyof StatusBlueprint): StatusMethod { return (icon: string, message: string, extra: object) => { const logMessage = `[${this.prompt}] ${icon} ${message}` - - if (!this.logger) { - if (isProdEnv()) { - // This can throw on tests if the logger is closed. We don't really want tests to be bothered with this. - throw new Error(`Logger has been closed! Cannot log: ${logMessage}`) - } - console.log( - `Logger has been closed! Cannot log: ${logMessage}. Logging to console instead due to non-prod env.` - ) - return - } if (extra instanceof Object) { this.logger[type]({ ...extra, msg: logMessage }) } else { diff --git a/plugin-server/tests/cdp/cdp-function-processor.test.ts b/plugin-server/tests/cdp/cdp-consumer.e2e.test.ts similarity index 97% rename from plugin-server/tests/cdp/cdp-function-processor.test.ts rename to plugin-server/tests/cdp/cdp-consumer.e2e.test.ts index 5fb097b0a5c5e..8d6581aef9ef0 100644 --- a/plugin-server/tests/cdp/cdp-function-processor.test.ts +++ b/plugin-server/tests/cdp/cdp-consumer.e2e.test.ts @@ -80,7 +80,10 @@ const convertToKafkaMessage = (message: any): any => { } } -describe('CDP Function Processor', () => { +/** + * NOTE: This isn't fully e2e... We still mock kafka but we trigger one queue from the other in a loop + */ +describe('CDP Consumers E2E', () => { let processedEventsConsumer: CdpProcessedEventsConsumer let functionProcessor: CdpFunctionCallbackConsumer let hub: Hub @@ -118,7 +121,7 @@ describe('CDP Function Processor', () => { jest.useRealTimers() }) - describe('full fetch function', () => { + describe('e2e fetch function', () => { /** * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios */ diff --git a/plugin-server/tests/cdp/cdp-e2e.test.ts b/plugin-server/tests/cdp/cdp-e2e.test.ts deleted file mode 100644 index b5423459e284e..0000000000000 --- a/plugin-server/tests/cdp/cdp-e2e.test.ts +++ /dev/null @@ -1,225 +0,0 @@ -import { - CdpCyclotronWorker, - CdpCyclotronWorkerFetch, - CdpFunctionCallbackConsumer, - CdpProcessedEventsConsumer, -} from '../../src/cdp/cdp-consumers' -import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types' -import { KAFKA_APP_METRICS_2, KAFKA_LOG_ENTRIES } from '../../src/config/kafka-topics' -import { Hub, Team } from '../../src/types' -import { closeHub, createHub } from '../../src/utils/db/hub' -import { waitForExpect } from '../helpers/expectations' -import { getFirstTeam, resetTestDatabase } from '../helpers/sql' -import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' -import { createHogExecutionGlobals, insertHogFunction as _insertHogFunction } from './fixtures' -import { createKafkaObserver, TestKafkaObserver } from './helpers/kafka-observer' - -jest.mock('../../src/utils/fetch', () => { - return { - trackedFetch: jest.fn(() => - Promise.resolve({ - status: 200, - text: () => Promise.resolve(JSON.stringify({ success: true })), - json: () => Promise.resolve({ success: true }), - }) - ), - } -}) - -const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch - -describe('CDP E2E', () => { - jest.setTimeout(10000) - describe.each(['kafka', 'cyclotron'])('e2e fetch call: %s', (mode) => { - let processedEventsConsumer: CdpProcessedEventsConsumer - let functionProcessor: CdpFunctionCallbackConsumer - let cyclotronWorker: CdpCyclotronWorker | undefined - let cyclotronFetchWorker: CdpCyclotronWorkerFetch | undefined - let hub: Hub - let team: Team - let kafkaObserver: TestKafkaObserver - let fnFetchNoFilters: HogFunctionType - let globals: HogFunctionInvocationGlobals - - const insertHogFunction = async (hogFunction: Partial) => { - const item = await _insertHogFunction(hub.postgres, team.id, hogFunction) - return item - } - - beforeEach(async () => { - await resetTestDatabase() - hub = await createHub() - team = await getFirstTeam(hub) - - fnFetchNoFilters = await insertHogFunction({ - ...HOG_EXAMPLES.simple_fetch, - ...HOG_INPUTS_EXAMPLES.simple_fetch, - ...HOG_FILTERS_EXAMPLES.no_filters, - }) - - if (mode === 'cyclotron') { - hub.CDP_CYCLOTRON_ENABLED_TEAMS = '*' - hub.CYCLOTRON_DATABASE_URL = 'postgres://posthog:posthog@localhost:5432/test_cyclotron' - } - - kafkaObserver = await createKafkaObserver(hub, [KAFKA_APP_METRICS_2, KAFKA_LOG_ENTRIES]) - - processedEventsConsumer = new CdpProcessedEventsConsumer(hub) - await processedEventsConsumer.start() - functionProcessor = new CdpFunctionCallbackConsumer(hub) - await functionProcessor.start() - - if (mode === 'cyclotron') { - cyclotronWorker = new CdpCyclotronWorker(hub) - await cyclotronWorker.start() - cyclotronFetchWorker = new CdpCyclotronWorkerFetch(hub) - await cyclotronFetchWorker.start() - } - - globals = createHogExecutionGlobals({ - project: { - id: team.id, - } as any, - event: { - uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - name: '$pageview', - properties: { - $current_url: 'https://posthog.com', - $lib_version: '1.0.0', - }, - timestamp: '2024-09-03T09:00:00Z', - } as any, - }) - - mockFetch.mockClear() - }) - - afterEach(async () => { - console.log('AfterEach', { - processedEventsConsumer, - functionProcessor, - kafkaObserver, - cyclotronWorker, - cyclotronFetchWorker, - }) - - const stoppers = [ - processedEventsConsumer?.stop().then(() => console.log('Stopped processedEventsConsumer')), - functionProcessor?.stop().then(() => console.log('Stopped functionProcessor')), - kafkaObserver?.stop().then(() => console.log('Stopped kafkaObserver')), - cyclotronWorker?.stop().then(() => console.log('Stopped cyclotronWorker')), - cyclotronFetchWorker?.stop().then(() => console.log('Stopped cyclotronFetchWorker')), - ] - - await Promise.all(stoppers) - - await closeHub(hub) - }) - - afterAll(() => { - jest.useRealTimers() - }) - - /** - * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios - */ - - it('should invoke a function in the worker loop until completed', async () => { - // NOTE: We can skip kafka as the entry point - const invocations = await processedEventsConsumer.processBatch([globals]) - expect(invocations).toHaveLength(1) - - await waitForExpect(() => { - expect(kafkaObserver.messages).toHaveLength(6) - }) - - expect(mockFetch).toHaveBeenCalledTimes(1) - - expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(` - Array [ - "https://example.com/posthog-webhook", - Object { - "body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"name\\":\\"$pageview\\",\\"distinct_id\\":\\"distinct_id\\",\\"url\\":\\"http://localhost:8000/events/1\\",\\"properties\\":{\\"$current_url\\":\\"https://posthog.com\\",\\"$lib_version\\":\\"1.0.0\\"},\\"timestamp\\":\\"2024-09-03T09:00:00Z\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/events/1\\"},\\"person\\":{\\"uuid\\":\\"uuid\\",\\"name\\":\\"test\\",\\"url\\":\\"http://localhost:8000/persons/1\\",\\"properties\\":{\\"email\\":\\"test@posthog.com\\"}},\\"event_url\\":\\"http://localhost:8000/events/1-test\\"}", - "headers": Object { - "version": "v=1.0.0", - }, - "method": "POST", - "timeout": 10000, - }, - ] - `) - - const logMessages = kafkaObserver.messages.filter((m) => m.topic === KAFKA_LOG_ENTRIES) - const metricsMessages = kafkaObserver.messages.filter((m) => m.topic === KAFKA_APP_METRICS_2) - - expect(metricsMessages).toMatchObject([ - { - topic: 'clickhouse_app_metrics2_test', - value: { - app_source: 'hog_function', - app_source_id: fnFetchNoFilters.id.toString(), - count: 1, - metric_kind: 'success', - metric_name: 'succeeded', - team_id: 2, - }, - }, - ]) - - expect(logMessages).toMatchObject([ - { - topic: 'log_entries_test', - value: { - level: 'debug', - log_source: 'hog_function', - log_source_id: fnFetchNoFilters.id.toString(), - message: 'Executing function', - team_id: 2, - }, - }, - { - topic: 'log_entries_test', - value: { - level: 'debug', - log_source: 'hog_function', - log_source_id: fnFetchNoFilters.id.toString(), - message: expect.stringContaining( - "Suspending function due to async function call 'fetch'. Payload:" - ), - team_id: 2, - }, - }, - { - topic: 'log_entries_test', - value: { - level: 'debug', - log_source: 'hog_function', - log_source_id: fnFetchNoFilters.id.toString(), - message: 'Resuming function', - team_id: 2, - }, - }, - { - topic: 'log_entries_test', - value: { - level: 'info', - log_source: 'hog_function', - log_source_id: fnFetchNoFilters.id.toString(), - message: `Fetch response:, {"status":200,"body":{"success":true}}`, - team_id: 2, - }, - }, - { - topic: 'log_entries_test', - value: { - level: 'debug', - log_source: 'hog_function', - log_source_id: fnFetchNoFilters.id.toString(), - message: expect.stringContaining('Function completed in'), - team_id: 2, - }, - }, - ]) - }) - }) -}) diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 11806c8595a10..b0a1c09f15d6f 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -5,7 +5,12 @@ import { Hub, Team } from '../../src/types' import { closeHub, createHub } from '../../src/utils/db/hub' import { getFirstTeam, resetTestDatabase } from '../helpers/sql' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' -import { createHogExecutionGlobals, insertHogFunction as _insertHogFunction } from './fixtures' +import { + createHogExecutionGlobals, + createIncomingEvent, + createMessage, + insertHogFunction as _insertHogFunction, +} from './fixtures' const mockConsumer = { on: jest.fn(), @@ -108,6 +113,10 @@ describe('CDP Processed Events Consumer', () => { }) describe('general event processing', () => { + beforeEach(() => { + hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = false + }) + describe('common processing', () => { let fnFetchNoFilters: HogFunctionType let fnPrinterPageviewFilters: HogFunctionType @@ -161,89 +170,23 @@ describe('CDP Processed Events Consumer', () => { matchInvocation(fnPrinterPageviewFilters, globals), ]) - expect(mockProducer.produce).toHaveBeenCalledTimes(11) + expect(mockProducer.produce).toHaveBeenCalledTimes(2) + expect(decodeAllKafkaMessages()).toMatchObject([ { - topic: 'log_entries_test', - value: { - message: 'Executing function', - log_source_id: fnFetchNoFilters.id, - }, - }, - { - topic: 'log_entries_test', - value: { - message: "Suspending function due to async function call 'fetch'. Payload: 1902 bytes", - log_source_id: fnFetchNoFilters.id, - }, - }, - { - topic: 'clickhouse_app_metrics2_test', - value: { - app_source: 'hog_function', - team_id: 2, - app_source_id: fnPrinterPageviewFilters.id, - metric_kind: 'success', - metric_name: 'succeeded', - count: 1, - }, - }, - { - topic: 'log_entries_test', - value: { - message: 'Executing function', - log_source_id: fnPrinterPageviewFilters.id, - }, - }, - { - topic: 'log_entries_test', - value: { - message: 'test', - log_source_id: fnPrinterPageviewFilters.id, - }, - }, - { - topic: 'log_entries_test', - value: { - message: '{"nested":{"foo":"***REDACTED***","bool":false,"null":null}}', - log_source_id: fnPrinterPageviewFilters.id, - }, - }, - { - topic: 'log_entries_test', - value: { - message: '{"foo":"***REDACTED***","bool":false,"null":null}', - log_source_id: fnPrinterPageviewFilters.id, - }, - }, - { - topic: 'log_entries_test', - value: { - message: 'substring: ***REDACTED***', - log_source_id: fnPrinterPageviewFilters.id, - }, - }, - { - topic: 'log_entries_test', - value: { - message: - '{"input_1":"test","secret_input_2":{"foo":"***REDACTED***","bool":false,"null":null},"secret_input_3":"***REDACTED***"}', - log_source_id: fnPrinterPageviewFilters.id, - }, - }, - { - topic: 'log_entries_test', + key: expect.any(String), + topic: 'cdp_function_callbacks_test', value: { - message: expect.stringContaining('Function completed'), - log_source_id: fnPrinterPageviewFilters.id, + state: expect.any(String), }, + waitForAck: true, }, { + key: expect.any(String), topic: 'cdp_function_callbacks_test', value: { state: expect.any(String), }, - key: expect.stringContaining(fnFetchNoFilters.id.toString()), waitForAck: true, }, ]) @@ -256,7 +199,7 @@ describe('CDP Processed Events Consumer', () => { expect(invocations).toHaveLength(1) expect(invocations).toMatchObject([matchInvocation(fnFetchNoFilters, globals)]) - expect(mockProducer.produce).toHaveBeenCalledTimes(4) + expect(mockProducer.produce).toHaveBeenCalledTimes(2) expect(decodeAllKafkaMessages()).toMatchObject([ { @@ -272,12 +215,6 @@ describe('CDP Processed Events Consumer', () => { timestamp: expect.any(String), }, }, - { - topic: 'log_entries_test', - }, - { - topic: 'log_entries_test', - }, { topic: 'cdp_function_callbacks_test', }, @@ -322,5 +259,97 @@ describe('CDP Processed Events Consumer', () => { ]) }) }) + + describe('kafka parsing', () => { + it('can parse incoming messages correctly', async () => { + await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + // Create a message that should be processed by this function + // Run the function and check that it was executed + await processor._handleKafkaBatch([ + createMessage( + createIncomingEvent(team.id, { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + event: '$pageview', + properties: JSON.stringify({ + $lib_version: '1.0.0', + }), + }) + ), + ]) + + // Generall check that the message seemed to get processed + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'cdp_function_callbacks_test', + value: { + state: expect.any(String), + }, + waitForAck: true, + }, + ]) + }) + }) + + describe('no delayed execution', () => { + beforeEach(() => { + hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = true + }) + + it('should invoke the initial function before enqueuing', async () => { + await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + // Create a message that should be processed by this function + // Run the function and check that it was executed + await processor._handleKafkaBatch([ + createMessage( + createIncomingEvent(team.id, { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + event: '$pageview', + properties: JSON.stringify({ + $lib_version: '1.0.0', + }), + }) + ), + ]) + + // General check that the message seemed to get processed + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'log_entries_test', + value: { + message: 'Executing function', + }, + waitForAck: true, + }, + { + key: expect.any(String), + topic: 'log_entries_test', + value: { + message: expect.stringContaining( + "Suspending function due to async function call 'fetch'. Payload" + ), + }, + waitForAck: true, + }, + { + key: expect.any(String), + topic: 'cdp_function_callbacks_test', + value: { + state: expect.any(String), + }, + waitForAck: true, + }, + ]) + }) + }) }) }) diff --git a/plugin-server/tests/cdp/helpers/kafka-observer.ts b/plugin-server/tests/cdp/helpers/kafka-observer.ts deleted file mode 100644 index 462c06fc1e137..0000000000000 --- a/plugin-server/tests/cdp/helpers/kafka-observer.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { KafkaConsumer, Message } from 'node-rdkafka' - -import { createAdminClient, ensureTopicExists } from '../../../src/kafka/admin' -import { createRdConnectionConfigFromEnvVars } from '../../../src/kafka/config' -import { createKafkaConsumer } from '../../../src/kafka/consumer' -import { Hub } from '../../../src/types' -import { delay, UUIDT } from '../../../src/utils/utils' - -export type TestKafkaObserver = { - messages: { - topic: string - value: any - }[] - consumer: KafkaConsumer - stop: () => Promise - expectMessageCount: (count: number) => Promise -} - -export const createKafkaObserver = async (hub: Hub, topics: string[]): Promise => { - const consumer = await createKafkaConsumer({ - ...createRdConnectionConfigFromEnvVars(hub), - 'group.id': `test-group-${new UUIDT().toString()}`, - }) - - const adminClient = createAdminClient(createRdConnectionConfigFromEnvVars(hub)) - await Promise.all(topics.map((topic) => ensureTopicExists(adminClient, topic, 1000))) - adminClient.disconnect() - - await new Promise((res, rej) => consumer.connect({}, (err) => (err ? rej(err) : res()))) - consumer.subscribe(topics) - const messages: { - topic: string - value: any - }[] = [] - - const poll = async () => { - await delay(50) - if (!consumer.isConnected()) { - return - } - const newMessages = await new Promise((res, rej) => - consumer.consume(10, (err, messages) => (err ? rej(err) : res(messages))) - ) - - messages.push( - ...newMessages.map((message) => ({ - topic: message.topic, - value: JSON.parse(message.value?.toString() ?? ''), - })) - ) - poll() - } - - poll() - - return { - messages, - consumer, - stop: () => new Promise((res) => consumer.disconnect(res)), - expectMessageCount: async (count: number): Promise => { - const timeout = 5000 - const now = Date.now() - while (messages.length < count && Date.now() - now < timeout) { - await delay(100) - } - - if (messages.length < count) { - throw new Error(`Expected ${count} messages, got ${messages.length}`) - } - }, - } -} diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 03addf077d964..7740078fe6268 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -2,7 +2,7 @@ import { DateTime } from 'luxon' import { HogExecutor } from '../../src/cdp/hog-executor' import { HogFunctionManager } from '../../src/cdp/hog-function-manager' -import { HogFunctionInvocation, HogFunctionType } from '../../src/cdp/types' +import { HogFunctionAsyncFunctionResponse, HogFunctionType } from '../../src/cdp/types' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' import { createHogExecutionGlobals, @@ -11,9 +11,8 @@ import { insertHogFunction as _insertHogFunction, } from './fixtures' -const setupFetchResponse = (invocation: HogFunctionInvocation, options?: { status?: number; body?: string }): void => { - invocation.queue = 'hog' - invocation.queueParameters = { +const createAsyncFunctionResponse = (response?: Record): HogFunctionAsyncFunctionResponse => { + return { timings: [ { kind: 'async_function', @@ -21,10 +20,11 @@ const setupFetchResponse = (invocation: HogFunctionInvocation, options?: { statu }, ], response: { - status: options?.status ?? 200, + status: 200, + body: 'success', + ...response, }, } - invocation.queueBlob = Buffer.from(options?.body ?? 'success') } describe('Hog Executor', () => { @@ -69,7 +69,6 @@ describe('Hog Executor', () => { hogFunction: invocation.hogFunction, queue: 'fetch', queueParameters: expect.any(Object), - queueBlob: expect.any(Buffer), timings: [ { kind: 'hog', @@ -134,8 +133,7 @@ describe('Hog Executor', () => { }, }) - const body = JSON.parse(Buffer.from(result.invocation.queueBlob!).toString()) - expect(body).toEqual({ + expect(JSON.parse(result.invocation.queueParameters!.body)).toEqual({ event: { uuid: 'uuid', name: 'test', @@ -165,7 +163,8 @@ describe('Hog Executor', () => { expect(result.invocation.vmState).toBeDefined() // Simulate what the callback does - setupFetchResponse(result.invocation) + result.invocation.queue = 'hog' + result.invocation.queueParameters = createAsyncFunctionResponse() const secondResult = executor.execute(result.invocation) logs.push(...secondResult.logs) @@ -186,7 +185,10 @@ describe('Hog Executor', () => { it('parses the responses body if a string', () => { const result = executor.execute(createInvocation(hogFunction)) const logs = result.logs.splice(0, 100) - setupFetchResponse(result.invocation, { body: JSON.stringify({ foo: 'bar' }) }) + result.invocation.queue = 'hog' + result.invocation.queueParameters = createAsyncFunctionResponse({ + body: JSON.stringify({ foo: 'bar' }), + }) const secondResult = executor.execute(result.invocation) logs.push(...secondResult.logs) @@ -397,16 +399,18 @@ describe('Hog Executor', () => { // Start the function const result1 = executor.execute(invocation) // Run the response one time simulating a successful fetch - setupFetchResponse(result1.invocation) + result1.invocation.queue = 'hog' + result1.invocation.queueParameters = createAsyncFunctionResponse() const result2 = executor.execute(result1.invocation) expect(result2.finished).toBe(false) expect(result2.error).toBe(undefined) expect(result2.invocation.queue).toBe('fetch') // This time we should see an error for hitting the loop limit - setupFetchResponse(result2.invocation) + result2.invocation.queue = 'hog' + result2.invocation.queueParameters = createAsyncFunctionResponse() const result3 = executor.execute(result1.invocation) - expect(result3.finished).toBe(true) + expect(result3.finished).toBe(false) expect(result3.error).toEqual('Exceeded maximum number of async steps: 2') expect(result3.logs.map((log) => log.message)).toEqual([ 'Resuming function', diff --git a/plugin-server/tests/cdp/hog-function-manager.test.ts b/plugin-server/tests/cdp/hog-function-manager.test.ts index 3f34fcb4fe378..1624999c93058 100644 --- a/plugin-server/tests/cdp/hog-function-manager.test.ts +++ b/plugin-server/tests/cdp/hog-function-manager.test.ts @@ -81,7 +81,6 @@ describe('HogFunctionManager', () => { }) afterEach(async () => { - await manager.stop() await closeHub(hub) }) diff --git a/plugin-server/tests/helpers/expectations.ts b/plugin-server/tests/helpers/expectations.ts deleted file mode 100644 index 6a4dcf9b3cc53..0000000000000 --- a/plugin-server/tests/helpers/expectations.ts +++ /dev/null @@ -1,17 +0,0 @@ -export const waitForExpect = async (fn: () => T | Promise, timeout = 10_000, interval = 1_000): Promise => { - // Allows for running expectations that are expected to pass eventually. - // This is useful for, e.g. waiting for events to have been ingested into - // the database. - - const start = Date.now() - while (true) { - try { - return await fn() - } catch (error) { - if (Date.now() - start > timeout) { - throw error - } - await new Promise((resolve) => setTimeout(resolve, interval)) - } - } -} diff --git a/rust/bin/migrate-cyclotron b/rust/bin/migrate-cyclotron deleted file mode 100755 index cde8d8b4d65fc..0000000000000 --- a/rust/bin/migrate-cyclotron +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh -SCRIPT_DIR=$(dirname "$(readlink -f "$0")") - -CYCLOTRON_DATABASE_NAME=${CYCLOTRON_DATABASE_NAME:-cyclotron} -CYCLOTRON_DATABASE_URL=${CYCLOTRON_DATABASE_URL:-postgres://posthog:posthog@localhost:5432/$CYCLOTRON_DATABASE_NAME} - -echo "Performing cyclotron migrations for $CYCLOTRON_DATABASE_URL (DATABASE_NAME=$CYCLOTRON_DATABASE_NAME)" - -sqlx database create -D "$CYCLOTRON_DATABASE_URL" -sqlx migrate run -D "$CYCLOTRON_DATABASE_URL" --source $SCRIPT_DIR/../cyclotron-core/migrations diff --git a/rust/cyclotron-node/src/helpers.ts b/rust/cyclotron-node/src/helpers.ts deleted file mode 100644 index ba1ace2a37161..0000000000000 --- a/rust/cyclotron-node/src/helpers.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { CyclotronInternalPoolConfig, CyclotronPoolConfig } from './types' - -export function convertToInternalPoolConfig(poolConfig: CyclotronPoolConfig): CyclotronInternalPoolConfig { - return { - db_url: poolConfig.dbUrl, - max_connections: poolConfig.maxConnections, - min_connections: poolConfig.minConnections, - acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, - max_lifetime_seconds: poolConfig.maxLifetimeSeconds, - idle_timeout_seconds: poolConfig.idleTimeoutSeconds, - } -} - -export function serializeObject(name: string, obj: Record | null): string | null { - if (obj === null) { - return null - } else if (typeof obj === 'object' && obj !== null) { - return JSON.stringify(obj) - } - throw new Error(`${name} must be either an object or null`) -} - -export function deserializeObject(name: string, str: any): Record | null { - if (str === null) { - return null - } else if (typeof str === 'string') { - return JSON.parse(str) - } - throw new Error(`${name} must be either a string or null`) -} diff --git a/rust/cyclotron-node/src/index.ts b/rust/cyclotron-node/src/index.ts index e905c5f6cd4ad..fb8dd659d80c3 100644 --- a/rust/cyclotron-node/src/index.ts +++ b/rust/cyclotron-node/src/index.ts @@ -1,3 +1,222 @@ -export * from './manager' -export * from './types' -export * from './worker' +// eslint-disable-next-line @typescript-eslint/no-var-requires +const cyclotron = require('../index.node') + +export interface PoolConfig { + dbUrl: string + maxConnections?: number + minConnections?: number + acquireTimeoutSeconds?: number + maxLifetimeSeconds?: number + idleTimeoutSeconds?: number +} + +// Type as expected by Cyclotron. +interface InternalPoolConfig { + db_url: string + max_connections?: number + min_connections?: number + acquire_timeout_seconds?: number + max_lifetime_seconds?: number + idle_timeout_seconds?: number +} + +export interface ManagerConfig { + shards: PoolConfig[] +} + +// Type as expected by Cyclotron. +interface InternalManagerConfig { + shards: InternalPoolConfig[] +} + +export interface JobInit { + teamId: number + functionId: string + queueName: string + priority?: number + scheduled?: Date + vmState?: string + parameters?: string + blob?: Uint8Array + metadata?: string +} + +// Type as expected by Cyclotron. +interface InternalJobInit { + team_id: number + function_id: string + queue_name: string + priority?: number + scheduled?: Date + vm_state?: string + parameters?: string + metadata?: string +} + +export type JobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' + +export interface Job { + id: string + teamId: number + functionId: string | null + created: Date + lockId: string | null + lastHeartbeat: Date | null + janitorTouchCount: number + transitionCount: number + lastTransition: Date + queueName: string + state: JobState + priority: number + scheduled: Date + vmState: string | null + metadata: string | null + parameters: string | null + blob: Uint8Array | null +} + +export async function initWorker(poolConfig: PoolConfig): Promise { + const initWorkerInternal: InternalPoolConfig = { + db_url: poolConfig.dbUrl, + max_connections: poolConfig.maxConnections, + min_connections: poolConfig.minConnections, + acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, + max_lifetime_seconds: poolConfig.maxLifetimeSeconds, + idle_timeout_seconds: poolConfig.idleTimeoutSeconds, + } + return await cyclotron.initWorker(JSON.stringify(initWorkerInternal)) +} + +export async function initManager(managerConfig: ManagerConfig): Promise { + const managerConfigInternal: InternalManagerConfig = { + shards: managerConfig.shards.map((shard) => ({ + db_url: shard.dbUrl, + max_connections: shard.maxConnections, + min_connections: shard.minConnections, + acquire_timeout_seconds: shard.acquireTimeoutSeconds, + max_lifetime_seconds: shard.maxLifetimeSeconds, + idle_timeout_seconds: shard.idleTimeoutSeconds, + })), + } + return await cyclotron.initManager(JSON.stringify(managerConfigInternal)) +} + +export async function maybeInitWorker(poolConfig: PoolConfig): Promise { + const initWorkerInternal: InternalPoolConfig = { + db_url: poolConfig.dbUrl, + max_connections: poolConfig.maxConnections, + min_connections: poolConfig.minConnections, + acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, + max_lifetime_seconds: poolConfig.maxLifetimeSeconds, + idle_timeout_seconds: poolConfig.idleTimeoutSeconds, + } + return await cyclotron.maybeInitWorker(JSON.stringify(initWorkerInternal)) +} + +export async function maybeInitManager(managerConfig: ManagerConfig): Promise { + const managerConfigInternal: InternalManagerConfig = { + shards: managerConfig.shards.map((shard) => ({ + db_url: shard.dbUrl, + max_connections: shard.maxConnections, + min_connections: shard.minConnections, + acquire_timeout_seconds: shard.acquireTimeoutSeconds, + max_lifetime_seconds: shard.maxLifetimeSeconds, + idle_timeout_seconds: shard.idleTimeoutSeconds, + })), + } + return await cyclotron.maybeInitManager(JSON.stringify(managerConfigInternal)) +} + +export async function createJob(job: JobInit): Promise { + job.priority ??= 1 + job.scheduled ??= new Date() + + const jobInitInternal: InternalJobInit = { + team_id: job.teamId, + function_id: job.functionId, + queue_name: job.queueName, + priority: job.priority, + scheduled: job.scheduled, + vm_state: job.vmState, + parameters: job.parameters, + metadata: job.metadata, + } + + const json = JSON.stringify(jobInitInternal) + return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined) +} + +export async function dequeueJobs(queueName: string, limit: number): Promise { + return await cyclotron.dequeueJobs(queueName, limit) +} + +export async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { + return await cyclotron.dequeueJobsWithVmState(queueName, limit) +} + +export async function flushJob(jobId: string): Promise { + return await cyclotron.flushJob(jobId) +} + +export function setState(jobId: string, jobState: JobState): Promise { + return cyclotron.setState(jobId, jobState) +} + +export function setQueue(jobId: string, queueName: string): Promise { + return cyclotron.setQueue(jobId, queueName) +} + +export function setPriority(jobId: string, priority: number): Promise { + return cyclotron.setPriority(jobId, priority) +} + +export function setScheduledAt(jobId: string, scheduledAt: Date): Promise { + return cyclotron.setScheduledAt(jobId, scheduledAt.toISOString()) +} + +export function serializeObject(name: string, obj: Record | null): string | null { + if (obj === null) { + return null + } else if (typeof obj === 'object' && obj !== null) { + return JSON.stringify(obj) + } + throw new Error(`${name} must be either an object or null`) +} + +export function setVmState(jobId: string, vmState: Record | null): Promise { + const serialized = serializeObject('vmState', vmState) + return cyclotron.setVmState(jobId, serialized) +} + +export function setMetadata(jobId: string, metadata: Record | null): Promise { + const serialized = serializeObject('metadata', metadata) + return cyclotron.setMetadata(jobId, serialized) +} + +export function setParameters(jobId: string, parameters: Record | null): Promise { + const serialized = serializeObject('parameters', parameters) + return cyclotron.setParameters(jobId, serialized) +} + +export function setBlob(jobId: string, blob: Uint8Array | null): Promise { + return cyclotron.setBlob(jobId, blob) +} + +export default { + initWorker, + initManager, + maybeInitWorker, + maybeInitManager, + createJob, + dequeueJobs, + dequeueJobsWithVmState, + flushJob, + setState, + setQueue, + setPriority, + setScheduledAt, + setVmState, + setMetadata, + setParameters, + setBlob, +} diff --git a/rust/cyclotron-node/src/manager.ts b/rust/cyclotron-node/src/manager.ts deleted file mode 100644 index bba6488828ba2..0000000000000 --- a/rust/cyclotron-node/src/manager.ts +++ /dev/null @@ -1,39 +0,0 @@ -// eslint-disable-next-line @typescript-eslint/no-var-requires -const cyclotron = require('../index.node') - -import { convertToInternalPoolConfig, serializeObject } from './helpers' -import { CyclotronJobInit, CyclotronPoolConfig } from './types' - -export class CyclotronManager { - constructor(private config: { shards: CyclotronPoolConfig[] }) { - this.config = config - } - - async connect(): Promise { - return await cyclotron.maybeInitManager( - JSON.stringify({ - shards: this.config.shards.map((shard) => convertToInternalPoolConfig(shard)), - }) - ) - } - - async createJob(job: CyclotronJobInit): Promise { - job.priority ??= 1 - job.scheduled ??= new Date() - - // TODO: Why is this type of job snake case whereas the dequeue return type is camel case? - const jobInitInternal = { - team_id: job.teamId, - function_id: job.functionId, - queue_name: job.queueName, - priority: job.priority, - scheduled: job.scheduled, - vm_state: job.vmState ? serializeObject('vmState', job.vmState) : null, - parameters: job.parameters ? serializeObject('parameters', job.parameters) : null, - metadata: job.metadata ? serializeObject('metadata', job.metadata) : null, - } - - const json = JSON.stringify(jobInitInternal) - return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined) - } -} diff --git a/rust/cyclotron-node/src/types.ts b/rust/cyclotron-node/src/types.ts deleted file mode 100644 index 88c8a26099083..0000000000000 --- a/rust/cyclotron-node/src/types.ts +++ /dev/null @@ -1,48 +0,0 @@ -export type CyclotronPoolConfig = { - dbUrl: string - maxConnections?: number - minConnections?: number - acquireTimeoutSeconds?: number - maxLifetimeSeconds?: number - idleTimeoutSeconds?: number -} - -// Type as expected by Cyclotron. -export type CyclotronInternalPoolConfig = { - db_url: string - max_connections?: number - min_connections?: number - acquire_timeout_seconds?: number - max_lifetime_seconds?: number - idle_timeout_seconds?: number -} - -export type CyclotronJobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' - -export type CyclotronJob = { - id: string - teamId: number - functionId: string | null - created: Date - lockId: string | null - lastHeartbeat: Date | null - janitorTouchCount: number - transitionCount: number - lastTransition: Date - queueName: string - state: CyclotronJobState - priority: number - scheduled: Date - vmState: object | null - metadata: object | null - parameters: object | null - blob: Uint8Array | null -} - -export type CyclotronJobInit = Pick & - Pick, 'scheduled' | 'vmState' | 'parameters' | 'metadata' | 'blob'> - -export type CyclotronJobUpdate = Pick< - Partial, - 'queueName' | 'priority' | 'vmState' | 'parameters' | 'metadata' | 'blob' -> diff --git a/rust/cyclotron-node/src/worker.ts b/rust/cyclotron-node/src/worker.ts deleted file mode 100644 index 7b3411863af7d..0000000000000 --- a/rust/cyclotron-node/src/worker.ts +++ /dev/null @@ -1,120 +0,0 @@ -// eslint-disable-next-line @typescript-eslint/no-var-requires -const cyclotron = require('../index.node') -import { convertToInternalPoolConfig, deserializeObject, serializeObject } from './helpers' -import { CyclotronJob, CyclotronJobState, CyclotronJobUpdate, CyclotronPoolConfig } from './types' - -const parseJob = (job: CyclotronJob): CyclotronJob => { - return { - ...job, - vmState: deserializeObject('vmState', job.vmState), - metadata: deserializeObject('metadata', job.metadata), - parameters: deserializeObject('parameters', job.parameters), - } -} - -export type CyclotronWorkerConfig = { - pool: CyclotronPoolConfig - /** The queue to be consumed from */ - queueName: string - /** Max number of jobs to consume in a batch. Default: 100 */ - batchMaxSize?: number - /** Whether the vmState will be included or not */ - includeVmState?: boolean - /** Amount of delay between dequeue polls. Default: 50ms */ - pollDelayMs?: number - /** Heartbeat timeout. After this time without response from the worker loop the worker will be considered unhealthy. Default 30000 */ - heartbeatTimeoutMs?: number -} - -export class CyclotronWorker { - isConsuming: boolean = false - lastHeartbeat: Date = new Date() - - private consumerLoopPromise: Promise | null = null - - constructor(private config: CyclotronWorkerConfig) { - this.config = config - } - - public isHealthy(): boolean { - return ( - this.isConsuming && - new Date().getTime() - this.lastHeartbeat.getTime() < (this.config.heartbeatTimeoutMs ?? 30000) - ) - } - - async connect(processBatch: (jobs: CyclotronJob[]) => Promise): Promise { - if (this.isConsuming) { - throw new Error('Already consuming') - } - - await cyclotron.maybeInitWorker(JSON.stringify(convertToInternalPoolConfig(this.config.pool))) - - this.isConsuming = true - this.consumerLoopPromise = this.startConsumerLoop(processBatch).finally(() => { - this.isConsuming = false - this.consumerLoopPromise = null - }) - } - - private async startConsumerLoop(processBatch: (jobs: CyclotronJob[]) => Promise): Promise { - try { - this.isConsuming = true - - const batchMaxSize = this.config.batchMaxSize ?? 100 - const pollDelayMs = this.config.pollDelayMs ?? 50 - - while (this.isConsuming) { - this.lastHeartbeat = new Date() - - const jobs = ( - this.config.includeVmState - ? await cyclotron.dequeueJobsWithVmState(this.config.queueName, batchMaxSize) - : await cyclotron.dequeueJobs(this.config.queueName, batchMaxSize) - ).map(parseJob) - - if (!jobs.length) { - // Wait a bit before polling again - await new Promise((resolve) => setTimeout(resolve, pollDelayMs)) - continue - } - - await processBatch(jobs) - } - } catch (e) { - // We only log here so as not to crash the parent process - console.error('[Cyclotron] Error in worker loop', e) - } - } - - async disconnect(): Promise { - this.isConsuming = false - await (this.consumerLoopPromise ?? Promise.resolve()) - } - - async flushJob(jobId: string): Promise { - return await cyclotron.flushJob(jobId) - } - - updateJob(id: CyclotronJob['id'], state: CyclotronJobState, updates?: CyclotronJobUpdate): void { - cyclotron.setState(id, state) - if (updates?.queueName) { - cyclotron.setQueue(id, updates.queueName) - } - if (updates?.priority) { - cyclotron.setPriority(id, updates.priority) - } - if (updates?.parameters) { - cyclotron.setParameters(id, serializeObject('parameters', updates.parameters)) - } - if (updates?.metadata) { - cyclotron.setMetadata(id, serializeObject('metadata', updates.metadata)) - } - if (updates?.vmState) { - cyclotron.setVmState(id, serializeObject('vmState', updates.vmState)) - } - if (updates?.blob) { - cyclotron.setBlob(id, updates.blob) - } - } -}