From efaf22d28d06def710833c0ca09b444b2af1eb3e Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 12:24:22 +0100 Subject: [PATCH 01/24] Moved cyclotron stuff into the repo --- plugin-server/package.json | 6 +- plugin-server/pnpm-lock.yaml | 8 + plugin-server/src/cdp/cdp-consumers.ts | 14 +- plugin-server/src/worker/cyclotron.ts | 8 +- production.Dockerfile | 8 +- rust/cyclotron-node/.gitignore | 1 + rust/cyclotron-node/bin/build.sh | 8 - rust/cyclotron-node/package-lock.json | 16 +- rust/cyclotron-node/package.json | 24 +-- rust/cyclotron-node/src/index.ts | 239 +++++++++++++++++++++++++ rust/cyclotron-node/tsconfig.json | 24 +++ 11 files changed, 322 insertions(+), 34 deletions(-) delete mode 100755 rust/cyclotron-node/bin/build.sh create mode 100644 rust/cyclotron-node/src/index.ts create mode 100644 rust/cyclotron-node/tsconfig.json diff --git a/plugin-server/package.json b/plugin-server/package.json index 25df603b90e05..0ff780ff4310f 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -27,7 +27,8 @@ "services:start": "cd .. && docker compose -f docker-compose.dev.yml up", "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", - "services": "pnpm services:stop && pnpm services:clean && pnpm services:start" + "services": "pnpm services:stop && pnpm services:clean && pnpm services:start", + "preinstall": "cd ../rust/cyclotron-node && npm i && npm run build" }, "graphile-worker": { "maxContiguousErrors": 300 @@ -134,7 +135,8 @@ "prettier": "^2.8.8", "supertest": "^7.0.0", "ts-node": "^10.9.1", - "typescript": "^4.7.4" + "typescript": "^4.7.4", + "cyclotron-node": "file:../rust/cyclotron-node" }, "pnpm": { "patchedDependencies": { diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index c2c4bc52093ac..180dc1bee4ea4 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -243,6 +243,9 @@ devDependencies: c8: specifier: ^7.12.0 version: 7.13.0 + cyclotron-node: + specifier: file:../rust/cyclotron-node + version: file:../rust/cyclotron-node deepmerge: specifier: ^4.2.2 version: 4.3.1 @@ -10731,3 +10734,8 @@ packages: /yocto-queue@0.1.0: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + + file:../rust/cyclotron-node: + resolution: {directory: ../rust/cyclotron-node, type: directory} + name: cyclotron-node + dev: true diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index f81016520146e..73b36c1a81f2c 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -703,14 +703,14 @@ export class CdpCyclotronWorker extends CdpConsumerBase { protected name = 'CdpCyclotronWorker' protected topic = 'UNUSED-CdpCyclotronWorker' protected consumerGroupId = 'UNUSED-CdpCyclotronWorker' - private runningWorker: Promise | undefined + private isUnhealthy = false public async _handleEachBatch(_: Message[]): Promise { // Not called, we override `start` below to use Cyclotron instead. } - public async innerStart() { + private async innerStart() { try { const limit = 100 // TODO: Make configurable. while (!this.isStopping) { @@ -722,6 +722,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase { } } } catch (err) { + this.isUnhealthy = true console.error('Error in Cyclotron worker', err) throw err } @@ -733,4 +734,13 @@ export class CdpCyclotronWorker extends CdpConsumerBase { this.runningWorker = this.innerStart() return Promise.resolve() } + + public async stop() { + await super.stop() + await this.runningWorker + } + + public isHealthy() { + return this.isUnhealthy + } } diff --git a/plugin-server/src/worker/cyclotron.ts b/plugin-server/src/worker/cyclotron.ts index a9fbb719ca54c..64c7c08cb014d 100644 --- a/plugin-server/src/worker/cyclotron.ts +++ b/plugin-server/src/worker/cyclotron.ts @@ -1,10 +1,4 @@ -// We use a dynamic `require` here because `import` (at least in VS Code) doesn't seem to understand -// or find the Node C module. -const cyclotron = require('cyclotron') - -export function hello() { - return cyclotron.hello('hello, world') -} +import cyclotron from 'cyclotron-node' interface PoolConfig { dbUrl: string diff --git a/production.Dockerfile b/production.Dockerfile index 9e9ae81260d23..248cbaa2a7ae0 100644 --- a/production.Dockerfile +++ b/production.Dockerfile @@ -46,8 +46,7 @@ FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS cy WORKDIR /code COPY ./rust ./rust RUN cd rust/cyclotron-node && \ - cargo build -r && \ - mv ../target/release/libcyclotron_node.so /code/index.node + npm install && npm run build # @@ -57,6 +56,9 @@ FROM node:18.19.1-bullseye-slim AS plugin-server-build WORKDIR /code/plugin-server SHELL ["/bin/bash", "-e", "-o", "pipefail", "-c"] +# Copy the Rust cyclotron-node module. +COPY --from=cyclotron-node-build --chown=posthog:posthog /code/rust/cyclotron-node /code/rust/cyclotron-node + # Compile and install Node.js dependencies. COPY ./plugin-server/package.json ./plugin-server/pnpm-lock.yaml ./plugin-server/tsconfig.json ./ COPY ./plugin-server/patches/ ./patches/ @@ -193,8 +195,6 @@ COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/dist COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/node_modules /code/plugin-server/node_modules COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/package.json /code/plugin-server/package.json -# Copy the Rust cyclotron-node module. -COPY --from=cyclotron-node-build --chown=posthog:posthog /code/index.node /code/plugin-server/node_modules/cyclotron/index.node # Copy the Python dependencies and Django staticfiles from the posthog-build stage. COPY --from=posthog-build --chown=posthog:posthog /code/staticfiles /code/staticfiles diff --git a/rust/cyclotron-node/.gitignore b/rust/cyclotron-node/.gitignore index 911a72884db9e..01f3230c629f3 100644 --- a/rust/cyclotron-node/.gitignore +++ b/rust/cyclotron-node/.gitignore @@ -4,3 +4,4 @@ index.node **/.DS_Store npm-debug.log*cargo.log cross.log +dist/ diff --git a/rust/cyclotron-node/bin/build.sh b/rust/cyclotron-node/bin/build.sh deleted file mode 100755 index 8ec9586e166c6..0000000000000 --- a/rust/cyclotron-node/bin/build.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash -set -e - -npm install - -# This generates an "index.node", which -# you then `require()` the directory of to load. -npm run build diff --git a/rust/cyclotron-node/package-lock.json b/rust/cyclotron-node/package-lock.json index 4999d5683065d..65b37a4302757 100644 --- a/rust/cyclotron-node/package-lock.json +++ b/rust/cyclotron-node/package-lock.json @@ -9,7 +9,8 @@ "version": "0.1.0", "license": "MIT", "devDependencies": { - "@neon-rs/cli": "0.1.73" + "@neon-rs/cli": "0.1.73", + "typescript": "^4.7.4" } }, "node_modules/@cargo-messages/android-arm-eabi": { @@ -120,6 +121,19 @@ "@cargo-messages/win32-arm64-msvc": "0.1.72", "@cargo-messages/win32-x64-msvc": "0.1.72" } + }, + "node_modules/typescript": { + "version": "4.9.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", + "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", + "dev": true, + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=4.2.0" + } } } } diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 267a34e803cb4..4918fe910544e 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -2,20 +2,24 @@ "name": "cyclotron-node", "version": "0.1.0", "description": "Node bindings for cyclotron", - "main": "index.node", + "main": "dist/index.js", + "types": "dist/index.d.ts", "scripts": { "test": "cargo test", - "cargo-build": "cargo build --message-format=json > cargo.log", - "cross-build": "cross build --message-format=json > cross.log", - "postcargo-build": "neon dist < cargo.log", - "postcross-build": "neon dist -m /target < cross.log", - "debug": "npm run cargo-build --", - "build": "npm run cargo-build -- --release", - "cross": "npm run cross-build -- --release" + "build": "npm run build:cargo -- --release && npm run build:lib", + "build:cargo": "cargo build --message-format=json > cargo.log", + "build:cargo:debug": "npm run build:cargo --", + "build:cross": "cross build --message-format=json > cross.log", + "build:lib": "tsc" }, "author": "", "license": "MIT", "devDependencies": { - "@neon-rs/cli": "0.1.73" - } + "@neon-rs/cli": "0.1.73", + "typescript": "^4.7.4" + }, + "files": [ + "dist", + "index.node" + ] } diff --git a/rust/cyclotron-node/src/index.ts b/rust/cyclotron-node/src/index.ts new file mode 100644 index 0000000000000..a3cace0199b74 --- /dev/null +++ b/rust/cyclotron-node/src/index.ts @@ -0,0 +1,239 @@ +// eslint-disable-next-line @typescript-eslint/no-var-requires +const cyclotron = require('../index.node') + +export interface PoolConfig { + dbUrl: string + maxConnections?: number + minConnections?: number + acquireTimeoutSeconds?: number + maxLifetimeSeconds?: number + idleTimeoutSeconds?: number +} + +// Type as expected by Cyclotron. +interface InternalPoolConfig { + db_url: string + max_connections?: number + min_connections?: number + acquire_timeout_seconds?: number + max_lifetime_seconds?: number + idle_timeout_seconds?: number +} + +export interface ManagerConfig { + shards: PoolConfig[] +} + +// Type as expected by Cyclotron. +interface InternalManagerConfig { + shards: InternalPoolConfig[] +} + +export interface JobInit { + teamId: number + functionId: string + queueName: string + priority?: number + scheduled?: Date + vmState?: string + parameters?: string + metadata?: string +} + +// Type as expected by Cyclotron. +interface InternalJobInit { + team_id: number + function_id: string + queue_name: string + priority?: number + scheduled?: Date + vm_state?: string + parameters?: string + metadata?: string +} + +export type JobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' + +export interface Job { + id: string + teamId: number + functionId: string | null + created: Date + lockId: string | null + lastHeartbeat: Date | null + janitorTouchCount: number + transitionCount: number + lastTransition: Date + queueName: string + state: JobState + priority: number + scheduled: Date + vmState: string | null + metadata: string | null + parameters: string | null +} + +// Type as returned by Cyclotron. +interface InternalJob { + id: string + team_id: number + function_id: string | null + created: string + lock_id: string | null + last_heartbeat: string | null + janitor_touch_count: number + transition_count: number + last_transition: string + queue_name: string + state: JobState + priority: number + scheduled: string + vm_state: string | null + metadata: string | null + parameters: string | null +} + +export async function initWorker(poolConfig: PoolConfig): Promise { + const initWorkerInternal: InternalPoolConfig = { + db_url: poolConfig.dbUrl, + max_connections: poolConfig.maxConnections, + min_connections: poolConfig.minConnections, + acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, + max_lifetime_seconds: poolConfig.maxLifetimeSeconds, + idle_timeout_seconds: poolConfig.idleTimeoutSeconds, + } + return await cyclotron.initWorker(JSON.stringify(initWorkerInternal)) +} + +export async function initManager(managerConfig: ManagerConfig): Promise { + const managerConfigInternal: InternalManagerConfig = { + shards: managerConfig.shards.map((shard) => ({ + db_url: shard.dbUrl, + max_connections: shard.maxConnections, + min_connections: shard.minConnections, + acquire_timeout_seconds: shard.acquireTimeoutSeconds, + max_lifetime_seconds: shard.maxLifetimeSeconds, + idle_timeout_seconds: shard.idleTimeoutSeconds, + })), + } + return await cyclotron.initManager(JSON.stringify(managerConfigInternal)) +} + +export async function maybeInitWorker(poolConfig: PoolConfig): Promise { + const initWorkerInternal: InternalPoolConfig = { + db_url: poolConfig.dbUrl, + max_connections: poolConfig.maxConnections, + min_connections: poolConfig.minConnections, + acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, + max_lifetime_seconds: poolConfig.maxLifetimeSeconds, + idle_timeout_seconds: poolConfig.idleTimeoutSeconds, + } + return await cyclotron.maybeInitWorker(JSON.stringify(initWorkerInternal)) +} + +export async function maybeInitManager(managerConfig: ManagerConfig): Promise { + const managerConfigInternal: InternalManagerConfig = { + shards: managerConfig.shards.map((shard) => ({ + db_url: shard.dbUrl, + max_connections: shard.maxConnections, + min_connections: shard.minConnections, + acquire_timeout_seconds: shard.acquireTimeoutSeconds, + max_lifetime_seconds: shard.maxLifetimeSeconds, + idle_timeout_seconds: shard.idleTimeoutSeconds, + })), + } + return await cyclotron.maybeInitManager(JSON.stringify(managerConfigInternal)) +} + +export async function createJob(job: JobInit): Promise { + job.priority ??= 1 + job.scheduled ??= new Date() + + const jobInitInternal: InternalJobInit = { + team_id: job.teamId, + function_id: job.functionId, + queue_name: job.queueName, + priority: job.priority, + scheduled: job.scheduled, + vm_state: job.vmState, + parameters: job.parameters, + metadata: job.metadata, + } + return await cyclotron.createJob(JSON.stringify(jobInitInternal)) +} + +function convertInternalJobToJob(jobInternal: InternalJob): Job { + return { + id: jobInternal.id, + teamId: jobInternal.team_id, + functionId: jobInternal.function_id, + created: new Date(jobInternal.created), + lockId: jobInternal.lock_id, + lastHeartbeat: jobInternal.last_heartbeat ? new Date(jobInternal.last_heartbeat) : null, + janitorTouchCount: jobInternal.janitor_touch_count, + transitionCount: jobInternal.transition_count, + lastTransition: new Date(jobInternal.last_transition), + queueName: jobInternal.queue_name, + state: jobInternal.state, + priority: jobInternal.priority, + scheduled: new Date(jobInternal.scheduled), + vmState: jobInternal.vm_state, + metadata: jobInternal.metadata, + parameters: jobInternal.parameters, + } +} + +export async function dequeueJobs(queueName: string, limit: number): Promise { + const jobsStr = await cyclotron.dequeueJobs(queueName, limit) + const jobs: InternalJob[] = JSON.parse(jobsStr) + return jobs.map(convertInternalJobToJob) +} +export async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { + const jobsStr = await cyclotron.dequeueJobsWithVmState(queueName, limit) + const jobs: InternalJob[] = JSON.parse(jobsStr) + return jobs.map(convertInternalJobToJob) +} + +export async function flushJob(jobId: string): Promise { + return await cyclotron.flushJob(jobId) +} + +export function setState(jobId: string, jobState: JobState): Promise { + return cyclotron.setState(jobId, jobState) +} + +export function setQueue(jobId: string, queueName: string): Promise { + return cyclotron.setQueue(jobId, queueName) +} + +export function setPriority(jobId: string, priority: number): Promise { + return cyclotron.setPriority(jobId, priority) +} + +export function setScheduledAt(jobId: string, scheduledAt: Date): Promise { + return cyclotron.setScheduledAt(jobId, scheduledAt.toISOString()) +} + +function serializeObject(name: string, obj: Record | null): string | null { + if (obj === null) { + return null + } else if (typeof obj === 'object' && obj !== null) { + return JSON.stringify(obj) + } + throw new Error(`${name} must be either an object or null`) +} + +export function setVmState(jobId: string, vmState: Record | null): Promise { + const serialized = serializeObject('vmState', vmState) + return cyclotron.setVmState(jobId, serialized) +} + +export function setMetadata(jobId: string, metadata: Record | null): Promise { + const serialized = serializeObject('metadata', metadata) + return cyclotron.setMetadata(jobId, serialized) +} + +export function setParameters(jobId: string, parameters: Record | null): Promise { + const serialized = serializeObject('parameters', parameters) + return cyclotron.setParameters(jobId, serialized) +} diff --git a/rust/cyclotron-node/tsconfig.json b/rust/cyclotron-node/tsconfig.json new file mode 100644 index 0000000000000..4fa58397f068a --- /dev/null +++ b/rust/cyclotron-node/tsconfig.json @@ -0,0 +1,24 @@ +{ + "compilerOptions": { + "module": "CommonJS", + "target": "ESNext", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "moduleResolution": "node", + "esModuleInterop": true, + "allowJs": true, + "sourceMap": true, + "baseUrl": "src/", + "rootDir": "src/", + "outDir": "dist/", + "types": ["node"], + "resolveJsonModule": true, + "strict": true, + "noImplicitAny": true, + "useUnknownInCatchVariables": false + }, + "include": ["src"], + "exclude": ["node_modules", "dist", "bin"] +} From 10a49c28652e7f8dc36da08f4ecafa72af7f6311 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 12:31:43 +0100 Subject: [PATCH 02/24] Fix up imports --- plugin-server/src/cdp/cdp-consumers.ts | 2 +- plugin-server/src/worker/cyclotron.ts | 239 ------------------------- 2 files changed, 1 insertion(+), 240 deletions(-) delete mode 100644 plugin-server/src/worker/cyclotron.ts diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 73b36c1a81f2c..537ba137e68b3 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,4 +1,5 @@ import { captureException } from '@sentry/node' +import cyclotron from 'cyclotron-node' import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' @@ -20,7 +21,6 @@ import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { captureTeamEvent } from '../utils/posthog' import { status } from '../utils/status' import { castTimestampOrNow } from '../utils/utils' -import * as cyclotron from '../worker/cyclotron' import { RustyHook } from '../worker/rusty-hook' import { AsyncFunctionExecutor } from './async-function-executor' import { GroupsManager } from './groups-manager' diff --git a/plugin-server/src/worker/cyclotron.ts b/plugin-server/src/worker/cyclotron.ts deleted file mode 100644 index 64c7c08cb014d..0000000000000 --- a/plugin-server/src/worker/cyclotron.ts +++ /dev/null @@ -1,239 +0,0 @@ -import cyclotron from 'cyclotron-node' - -interface PoolConfig { - dbUrl: string - maxConnections?: number - minConnections?: number - acquireTimeoutSeconds?: number - maxLifetimeSeconds?: number - idleTimeoutSeconds?: number -} - -// Type as expected by Cyclotron. -interface InternalPoolConfig { - db_url: string - max_connections?: number - min_connections?: number - acquire_timeout_seconds?: number - max_lifetime_seconds?: number - idle_timeout_seconds?: number -} - -interface ManagerConfig { - shards: PoolConfig[] -} - -// Type as expected by Cyclotron. -interface InternalManagerConfig { - shards: InternalPoolConfig[] -} - -interface JobInit { - teamId: number - functionId: string - queueName: string - priority?: number - scheduled?: Date - vmState?: string - parameters?: string - metadata?: string -} - -// Type as expected by Cyclotron. -interface InternalJobInit { - team_id: number - function_id: string - queue_name: string - priority?: number - scheduled?: Date - vm_state?: string - parameters?: string - metadata?: string -} - -type JobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' - -interface Job { - id: string - teamId: number - functionId: string | null - created: Date - lockId: string | null - lastHeartbeat: Date | null - janitorTouchCount: number - transitionCount: number - lastTransition: Date - queueName: string - state: JobState - priority: number - scheduled: Date - vmState: string | null - metadata: string | null - parameters: string | null -} - -// Type as returned by Cyclotron. -interface InternalJob { - id: string - team_id: number - function_id: string | null - created: string - lock_id: string | null - last_heartbeat: string | null - janitor_touch_count: number - transition_count: number - last_transition: string - queue_name: string - state: JobState - priority: number - scheduled: string - vm_state: string | null - metadata: string | null - parameters: string | null -} - -export async function initWorker(poolConfig: PoolConfig) { - const initWorkerInternal: InternalPoolConfig = { - db_url: poolConfig.dbUrl, - max_connections: poolConfig.maxConnections, - min_connections: poolConfig.minConnections, - acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, - max_lifetime_seconds: poolConfig.maxLifetimeSeconds, - idle_timeout_seconds: poolConfig.idleTimeoutSeconds, - } - return await cyclotron.initWorker(JSON.stringify(initWorkerInternal)) -} - -export async function initManager(managerConfig: ManagerConfig) { - const managerConfigInternal: InternalManagerConfig = { - shards: managerConfig.shards.map((shard) => ({ - db_url: shard.dbUrl, - max_connections: shard.maxConnections, - min_connections: shard.minConnections, - acquire_timeout_seconds: shard.acquireTimeoutSeconds, - max_lifetime_seconds: shard.maxLifetimeSeconds, - idle_timeout_seconds: shard.idleTimeoutSeconds, - })), - } - return await cyclotron.initManager(JSON.stringify(managerConfigInternal)) -} - -export async function maybeInitWorker(poolConfig: PoolConfig) { - const initWorkerInternal: InternalPoolConfig = { - db_url: poolConfig.dbUrl, - max_connections: poolConfig.maxConnections, - min_connections: poolConfig.minConnections, - acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, - max_lifetime_seconds: poolConfig.maxLifetimeSeconds, - idle_timeout_seconds: poolConfig.idleTimeoutSeconds, - } - return await cyclotron.maybeInitWorker(JSON.stringify(initWorkerInternal)) -} - -export async function maybeInitManager(managerConfig: ManagerConfig) { - const managerConfigInternal: InternalManagerConfig = { - shards: managerConfig.shards.map((shard) => ({ - db_url: shard.dbUrl, - max_connections: shard.maxConnections, - min_connections: shard.minConnections, - acquire_timeout_seconds: shard.acquireTimeoutSeconds, - max_lifetime_seconds: shard.maxLifetimeSeconds, - idle_timeout_seconds: shard.idleTimeoutSeconds, - })), - } - return await cyclotron.maybeInitManager(JSON.stringify(managerConfigInternal)) -} - -export async function createJob(job: JobInit) { - job.priority ??= 1 - job.scheduled ??= new Date() - - const jobInitInternal: InternalJobInit = { - team_id: job.teamId, - function_id: job.functionId, - queue_name: job.queueName, - priority: job.priority, - scheduled: job.scheduled, - vm_state: job.vmState, - parameters: job.parameters, - metadata: job.metadata, - } - return await cyclotron.createJob(JSON.stringify(jobInitInternal)) -} - -function convertInternalJobToJob(jobInternal: InternalJob): Job { - return { - id: jobInternal.id, - teamId: jobInternal.team_id, - functionId: jobInternal.function_id, - created: new Date(jobInternal.created), - lockId: jobInternal.lock_id, - lastHeartbeat: jobInternal.last_heartbeat ? new Date(jobInternal.last_heartbeat) : null, - janitorTouchCount: jobInternal.janitor_touch_count, - transitionCount: jobInternal.transition_count, - lastTransition: new Date(jobInternal.last_transition), - queueName: jobInternal.queue_name, - state: jobInternal.state, - priority: jobInternal.priority, - scheduled: new Date(jobInternal.scheduled), - vmState: jobInternal.vm_state, - metadata: jobInternal.metadata, - parameters: jobInternal.parameters, - } -} - -export async function dequeueJobs(queueName: string, limit: number): Promise { - const jobsStr = await cyclotron.dequeueJobs(queueName, limit) - const jobs: InternalJob[] = JSON.parse(jobsStr) - return jobs.map(convertInternalJobToJob) -} -export async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { - const jobsStr = await cyclotron.dequeueJobsWithVmState(queueName, limit) - const jobs: InternalJob[] = JSON.parse(jobsStr) - return jobs.map(convertInternalJobToJob) -} - -export async function flushJob(jobId: string) { - return await cyclotron.flushJob(jobId) -} - -export function setState(jobId: string, jobState: JobState) { - return cyclotron.setState(jobId, jobState) -} - -export function setQueue(jobId: string, queueName: string) { - return cyclotron.setQueue(jobId, queueName) -} - -export function setPriority(jobId: string, priority: number) { - return cyclotron.setPriority(jobId, priority) -} - -export function setScheduledAt(jobId: string, scheduledAt: Date) { - return cyclotron.setScheduledAt(jobId, scheduledAt.toISOString()) -} - -function serializeObject(name: string, obj: Record | null): string | null { - if (obj === null) { - return null - } else if (typeof obj === 'object' && obj !== null) { - return JSON.stringify(obj) - } else { - throw new Error(`${name} must be either an object or null`) - } -} - -export function setVmState(jobId: string, vmState: Record | null) { - const serialized = serializeObject('vmState', vmState) - return cyclotron.setVmState(jobId, serialized) -} - -export function setMetadata(jobId: string, metadata: Record | null) { - const serialized = serializeObject('metadata', metadata) - return cyclotron.setMetadata(jobId, serialized) -} - -export function setParameters(jobId: string, parameters: Record | null) { - const serialized = serializeObject('parameters', parameters) - return cyclotron.setParameters(jobId, serialized) -} From f6460ca045f956236982b547739cbe4c944bc702 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 12:38:57 +0100 Subject: [PATCH 03/24] Fixes --- rust/cyclotron-node/package-lock.json | 16 ++++++++++++++++ rust/cyclotron-node/package.json | 1 + 2 files changed, 17 insertions(+) diff --git a/rust/cyclotron-node/package-lock.json b/rust/cyclotron-node/package-lock.json index 65b37a4302757..50dbfb1716650 100644 --- a/rust/cyclotron-node/package-lock.json +++ b/rust/cyclotron-node/package-lock.json @@ -10,6 +10,7 @@ "license": "MIT", "devDependencies": { "@neon-rs/cli": "0.1.73", + "@types/node": "^22.4.1", "typescript": "^4.7.4" } }, @@ -122,6 +123,15 @@ "@cargo-messages/win32-x64-msvc": "0.1.72" } }, + "node_modules/@types/node": { + "version": "22.4.1", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.4.1.tgz", + "integrity": "sha512-1tbpb9325+gPnKK0dMm+/LMriX0vKxf6RnB0SZUqfyVkQ4fMgUSySqhxE/y8Jvs4NyF1yHzTfG9KlnkIODxPKg==", + "dev": true, + "dependencies": { + "undici-types": "~6.19.2" + } + }, "node_modules/typescript": { "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", @@ -134,6 +144,12 @@ "engines": { "node": ">=4.2.0" } + }, + "node_modules/undici-types": { + "version": "6.19.8", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz", + "integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==", + "dev": true } } } diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 4918fe910544e..3c5e69c972b2a 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -16,6 +16,7 @@ "license": "MIT", "devDependencies": { "@neon-rs/cli": "0.1.73", + "@types/node": "^22.4.1", "typescript": "^4.7.4" }, "files": [ From 48313cdf9693266bac1432cee8ae4bd2330cff5a Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 14:07:40 +0100 Subject: [PATCH 04/24] Moved inits around --- bin/plugin-server | 7 +++---- plugin-server/package.json | 2 +- plugin-server/src/cdp/async-function-executor.ts | 2 +- plugin-server/src/cdp/cdp-consumers.ts | 11 ++++++++++- plugin-server/src/main/pluginsServer.ts | 13 ------------- 5 files changed, 15 insertions(+), 20 deletions(-) diff --git a/bin/plugin-server b/bin/plugin-server index 8107c6c756cdb..9206584539ffa 100755 --- a/bin/plugin-server +++ b/bin/plugin-server @@ -34,10 +34,6 @@ fi ./bin/migrate-check -if [[ -n $DEBUG ]]; then - ./bin/install-cyclotron-node -fi - cd plugin-server if [[ -n $DEBUG ]]; then @@ -51,6 +47,9 @@ if [ $? -ne 0 ]; then fi if [[ -n $DEBUG ]]; then + + pnpm build:cyclotron + if [[ -n $NO_WATCH ]]; then cmd="pnpm start:devNoWatch" else diff --git a/plugin-server/package.json b/plugin-server/package.json index 0ff780ff4310f..62c9feea8e0f9 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -28,7 +28,7 @@ "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", "services": "pnpm services:stop && pnpm services:clean && pnpm services:start", - "preinstall": "cd ../rust/cyclotron-node && npm i && npm run build" + "build:cyclotron": "cd ../rust/cyclotron-node && npm i && npm run build" }, "graphile-worker": { "maxContiguousErrors": 300 diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index bde37adf09787..58d750627b2b0 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -1,10 +1,10 @@ +import cyclotron from 'cyclotron-node' import { Histogram } from 'prom-client' import { buildIntegerMatcher } from '../config/config' import { PluginsServerConfig, ValueMatcher } from '../types' import { trackedFetch } from '../utils/fetch' import { status } from '../utils/status' -import * as cyclotron from '../worker/cyclotron' import { RustyHook } from '../worker/rusty-hook' import { HogFunctionInvocationAsyncRequest, HogFunctionInvocationAsyncResponse } from './types' diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 537ba137e68b3..6f5c6e572eba8 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -444,7 +444,12 @@ abstract class CdpConsumerBase { const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub) const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub) - await Promise.all([this.hogFunctionManager.start()]) + await Promise.all([ + this.hogFunctionManager.start(), + this.hub.CYCLOTRON_DATABASE_URL + ? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) + : Promise.resolve(), + ]) this.kafkaProducer = new KafkaProducerWrapper( await createKafkaProducer(globalConnectionConfig, globalProducerConfig) @@ -729,9 +734,13 @@ export class CdpCyclotronWorker extends CdpConsumerBase { } public async start() { + await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) + await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }) + // Consumer `start` expects an async task is started, and not that `start` itself blocks // indefinitely. this.runningWorker = this.innerStart() + return Promise.resolve() } diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 1f062cf64aa9a..3a7e8851774a4 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -26,7 +26,6 @@ import { posthog } from '../utils/posthog' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' import { createRedisClient, delay } from '../utils/utils' -import * as cyclotron from '../worker/cyclotron' import { ActionManager } from '../worker/ingestion/action-manager' import { ActionMatcher } from '../worker/ingestion/action-matcher' import { AppMetrics } from '../worker/ingestion/app-metrics' @@ -542,9 +541,6 @@ export async function startPluginsServer( if (capabilities.cdpProcessedEvents) { ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - if (hub.CYCLOTRON_DATABASE_URL) { - await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] }) - } const consumer = new CdpProcessedEventsConsumer(hub) await consumer.start() @@ -555,9 +551,6 @@ export async function startPluginsServer( if (capabilities.cdpFunctionCallbacks) { ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - if (hub.CYCLOTRON_DATABASE_URL) { - await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] }) - } const consumer = new CdpFunctionCallbackConsumer(hub) await consumer.start() @@ -575,9 +568,6 @@ export async function startPluginsServer( if (capabilities.cdpFunctionOverflow) { ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - if (hub.CYCLOTRON_DATABASE_URL) { - await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] }) - } const consumer = new CdpOverflowConsumer(hub) await consumer.start() @@ -589,9 +579,6 @@ export async function startPluginsServer( if (capabilities.cdpCyclotronWorker) { ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) if (hub.CYCLOTRON_DATABASE_URL) { - await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] }) - await cyclotron.initWorker({ dbUrl: hub.CYCLOTRON_DATABASE_URL }) - const worker = new CdpCyclotronWorker(hub) await worker.start() } else { From 4005cb94f6667404e6ff54e5300ed81d5a3bf537 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 16:11:08 +0100 Subject: [PATCH 05/24] Fix --- plugin-server/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index 62c9feea8e0f9..0158926e5849b 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -23,7 +23,7 @@ "prettier:check": "prettier --check .", "prepublishOnly": "pnpm build", "setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse", - "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment", + "setup:test": "npm run build:cyclotron && cd .. && TEST=1 python manage.py setup_test_environment", "services:start": "cd .. && docker compose -f docker-compose.dev.yml up", "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", From de2a14f98d5deaf27c24906fb60851abad414df7 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 16:25:33 +0100 Subject: [PATCH 06/24] Fixes --- rust/cyclotron-node/package.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 3c5e69c972b2a..6425c6fe485bd 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -6,11 +6,12 @@ "types": "dist/index.d.ts", "scripts": { "test": "cargo test", - "build": "npm run build:cargo -- --release && npm run build:lib", + "build": "npm run build:cargo -- --release && npm run build:move-lib && npm run build:typescript", + "build:move-lib": "cp ../target/release/libcyclotron_node.so index.node || cp ../target/release/libcyclotron_node.dylib index.node", "build:cargo": "cargo build --message-format=json > cargo.log", "build:cargo:debug": "npm run build:cargo --", "build:cross": "cross build --message-format=json > cross.log", - "build:lib": "tsc" + "build:typescript": "tsc" }, "author": "", "license": "MIT", From 3d98db7b529dfe84e989104121ee42fbd614b37f Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 16:39:01 +0100 Subject: [PATCH 07/24] Fixes --- plugin-server/package.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index 0158926e5849b..6bff9fca62470 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -87,7 +87,8 @@ "uuid": "^9.0.1", "v8-profiler-next": "^1.9.0", "vm2": "3.9.18", - "detect-browser": "^5.3.0" + "detect-browser": "^5.3.0", + "cyclotron-node": "file:../rust/cyclotron-node" }, "devDependencies": { "0x": "^5.5.0", @@ -135,8 +136,7 @@ "prettier": "^2.8.8", "supertest": "^7.0.0", "ts-node": "^10.9.1", - "typescript": "^4.7.4", - "cyclotron-node": "file:../rust/cyclotron-node" + "typescript": "^4.7.4" }, "pnpm": { "patchedDependencies": { From d38baffbdb737a594824a47454d95fde9d9a69b8 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 16:39:31 +0100 Subject: [PATCH 08/24] Fix --- plugin-server/pnpm-lock.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index 180dc1bee4ea4..6982a913eea99 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -73,6 +73,9 @@ dependencies: aws-sdk: specifier: ^2.927.0 version: 2.1366.0 + cyclotron-node: + specifier: file:../rust/cyclotron-node + version: file:../rust/cyclotron-node detect-browser: specifier: ^5.3.0 version: 5.3.0 @@ -243,9 +246,6 @@ devDependencies: c8: specifier: ^7.12.0 version: 7.13.0 - cyclotron-node: - specifier: file:../rust/cyclotron-node - version: file:../rust/cyclotron-node deepmerge: specifier: ^4.2.2 version: 4.3.1 @@ -10738,4 +10738,4 @@ packages: file:../rust/cyclotron-node: resolution: {directory: ../rust/cyclotron-node, type: directory} name: cyclotron-node - dev: true + dev: false From 281c6a80d37b02c8ed970f5707d4fa6d4fa4ccad Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 16:57:02 +0100 Subject: [PATCH 09/24] Fix up install logic --- plugin-server/package.json | 4 +- production.Dockerfile | 14 +-- rust/cyclotron-node/package-lock.json | 155 -------------------------- rust/cyclotron-node/package.json | 4 +- rust/cyclotron-node/pnpm-lock.yaml | 103 +++++++++++++++++ 5 files changed, 108 insertions(+), 172 deletions(-) delete mode 100644 rust/cyclotron-node/package-lock.json create mode 100644 rust/cyclotron-node/pnpm-lock.yaml diff --git a/plugin-server/package.json b/plugin-server/package.json index 6bff9fca62470..3ec7cbdb058a9 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -23,12 +23,12 @@ "prettier:check": "prettier --check .", "prepublishOnly": "pnpm build", "setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse", - "setup:test": "npm run build:cyclotron && cd .. && TEST=1 python manage.py setup_test_environment", "services:start": "cd .. && docker compose -f docker-compose.dev.yml up", "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", "services": "pnpm services:stop && pnpm services:clean && pnpm services:start", - "build:cyclotron": "cd ../rust/cyclotron-node && npm i && npm run build" + "build:cyclotron": "cd ../rust/cyclotron-node && pnpm i && pnpm run build", + "preinstall": "pnpm run build:cyclotron" }, "graphile-worker": { "maxContiguousErrors": 300 diff --git a/production.Dockerfile b/production.Dockerfile index 248cbaa2a7ae0..07906afd9bc4c 100644 --- a/production.Dockerfile +++ b/production.Dockerfile @@ -38,27 +38,15 @@ COPY ./bin/ ./bin/ COPY babel.config.js tsconfig.json webpack.config.js tailwind.config.js ./ RUN pnpm build - # # --------------------------------------------------------- # -FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS cyclotron-node-build +FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS plugin-server-build WORKDIR /code COPY ./rust ./rust -RUN cd rust/cyclotron-node && \ - npm install && npm run build - - -# -# --------------------------------------------------------- -# -FROM node:18.19.1-bullseye-slim AS plugin-server-build WORKDIR /code/plugin-server SHELL ["/bin/bash", "-e", "-o", "pipefail", "-c"] -# Copy the Rust cyclotron-node module. -COPY --from=cyclotron-node-build --chown=posthog:posthog /code/rust/cyclotron-node /code/rust/cyclotron-node - # Compile and install Node.js dependencies. COPY ./plugin-server/package.json ./plugin-server/pnpm-lock.yaml ./plugin-server/tsconfig.json ./ COPY ./plugin-server/patches/ ./patches/ diff --git a/rust/cyclotron-node/package-lock.json b/rust/cyclotron-node/package-lock.json deleted file mode 100644 index 50dbfb1716650..0000000000000 --- a/rust/cyclotron-node/package-lock.json +++ /dev/null @@ -1,155 +0,0 @@ -{ - "name": "cyclotron-node", - "version": "0.1.0", - "lockfileVersion": 3, - "requires": true, - "packages": { - "": { - "name": "cyclotron-node", - "version": "0.1.0", - "license": "MIT", - "devDependencies": { - "@neon-rs/cli": "0.1.73", - "@types/node": "^22.4.1", - "typescript": "^4.7.4" - } - }, - "node_modules/@cargo-messages/android-arm-eabi": { - "version": "0.1.72", - "resolved": "https://registry.npmjs.org/@cargo-messages/android-arm-eabi/-/android-arm-eabi-0.1.72.tgz", - "integrity": "sha512-gGZxIM1mj+Y5x+ULND6ZCNr7f70OJi9wDlycSK8hGONy9wrChN6JAIHryddC5cqcwlYAoQ6IDcDFElnhAYbybA==", - "cpu": [ - "arm" - ], - "dev": true, - "optional": true, - "os": [ - "android" - ] - }, - "node_modules/@cargo-messages/darwin-arm64": { - "version": "0.1.72", - "resolved": "https://registry.npmjs.org/@cargo-messages/darwin-arm64/-/darwin-arm64-0.1.72.tgz", - "integrity": "sha512-EAzN5MLaXPljZKZDO5qR+aBs44eSq2ZbEnS7AI/FziE3MzeXbrGOS3fLba5+7yWPFXJyZolXzePm8N1EBv8ovg==", - "cpu": [ - "arm64" - ], - "dev": true, - "optional": true, - "os": [ - "darwin" - ] - }, - "node_modules/@cargo-messages/darwin-x64": { - "version": "0.1.72", - "resolved": "https://registry.npmjs.org/@cargo-messages/darwin-x64/-/darwin-x64-0.1.72.tgz", - "integrity": "sha512-RLo6j8s3nYbjdd1LDct4wamfChyRit7zokUuxtIYCu9XOlltkN5vnj1vwnrPvoqCMZ/7CbbuHFwSTn9A71de/w==", - "cpu": [ - "x64" - ], - "dev": true, - "optional": true, - "os": [ - "darwin" - ] - }, - "node_modules/@cargo-messages/linux-arm-gnueabihf": { - "version": "0.1.72", - "resolved": "https://registry.npmjs.org/@cargo-messages/linux-arm-gnueabihf/-/linux-arm-gnueabihf-0.1.72.tgz", - "integrity": "sha512-tHsRshuzfjrX6SDW3jg6al8vMNLTMgczGnVYl5RuBZf/yrAUuwe30KxA9ge6w6mW6Ox797DyBchzAc9OLgTgmQ==", - "cpu": [ - "arm" - ], - "dev": true, - "optional": true, - "os": [ - "linux" - ] - }, - "node_modules/@cargo-messages/linux-x64-gnu": { - "version": "0.1.72", - "resolved": "https://registry.npmjs.org/@cargo-messages/linux-x64-gnu/-/linux-x64-gnu-0.1.72.tgz", - "integrity": "sha512-VGtL6CCnUbhsP4aYuBNT5kfrAL7o0qjrxw97a+ax13t+nJd26tVEEIKHMu5drvvS/Nm/hn7sLT8zMnnCv0pvHg==", - "cpu": [ - "x64" - ], - "dev": true, - "optional": true, - "os": [ - "linux" - ] - }, - "node_modules/@cargo-messages/win32-arm64-msvc": { - "version": "0.1.72", - "resolved": "https://registry.npmjs.org/@cargo-messages/win32-arm64-msvc/-/win32-arm64-msvc-0.1.72.tgz", - "integrity": "sha512-V93Cgz39K+yqa3MveNbhh29pYCp8izK5uEavjPoxlNxAbsMCWH+s0verGDdUcfGxjR1H2V7oZ4FszPqR2SqMRQ==", - "cpu": [ - "arm64" - ], - "dev": true, - "optional": true, - "os": [ - "win32" - ] - }, - "node_modules/@cargo-messages/win32-x64-msvc": { - "version": "0.1.72", - "resolved": "https://registry.npmjs.org/@cargo-messages/win32-x64-msvc/-/win32-x64-msvc-0.1.72.tgz", - "integrity": "sha512-knz3uSrO0OSbq3U5VWfCY8FB4NsM43BOWLZ7x4sfaMOC1XWv+IyvDdkLe6DhJx8KUw46KIAimYs9YROrp6l46Q==", - "cpu": [ - "x64" - ], - "dev": true, - "optional": true, - "os": [ - "win32" - ] - }, - "node_modules/@neon-rs/cli": { - "version": "0.1.73", - "resolved": "https://registry.npmjs.org/@neon-rs/cli/-/cli-0.1.73.tgz", - "integrity": "sha512-1kv8S/feB6UQWQQwsnGfkSkEBOtlFDNExnioL81E2BwvUWgjQPaseHgpi2EpWVgsPUgur5eBm4QowmlpWkD4/w==", - "dev": true, - "bin": { - "neon": "index.js" - }, - "optionalDependencies": { - "@cargo-messages/android-arm-eabi": "0.1.72", - "@cargo-messages/darwin-arm64": "0.1.72", - "@cargo-messages/darwin-x64": "0.1.72", - "@cargo-messages/linux-arm-gnueabihf": "0.1.72", - "@cargo-messages/linux-x64-gnu": "0.1.72", - "@cargo-messages/win32-arm64-msvc": "0.1.72", - "@cargo-messages/win32-x64-msvc": "0.1.72" - } - }, - "node_modules/@types/node": { - "version": "22.4.1", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.4.1.tgz", - "integrity": "sha512-1tbpb9325+gPnKK0dMm+/LMriX0vKxf6RnB0SZUqfyVkQ4fMgUSySqhxE/y8Jvs4NyF1yHzTfG9KlnkIODxPKg==", - "dev": true, - "dependencies": { - "undici-types": "~6.19.2" - } - }, - "node_modules/typescript": { - "version": "4.9.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", - "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", - "dev": true, - "bin": { - "tsc": "bin/tsc", - "tsserver": "bin/tsserver" - }, - "engines": { - "node": ">=4.2.0" - } - }, - "node_modules/undici-types": { - "version": "6.19.8", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz", - "integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==", - "dev": true - } - } -} diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 6425c6fe485bd..0bfcfdd9d9bc1 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -6,10 +6,10 @@ "types": "dist/index.d.ts", "scripts": { "test": "cargo test", - "build": "npm run build:cargo -- --release && npm run build:move-lib && npm run build:typescript", + "build": "pnpm run build:cargo --release && pnpm run build:move-lib && pnpm run build:typescript", "build:move-lib": "cp ../target/release/libcyclotron_node.so index.node || cp ../target/release/libcyclotron_node.dylib index.node", "build:cargo": "cargo build --message-format=json > cargo.log", - "build:cargo:debug": "npm run build:cargo --", + "build:cargo:debug": "pnpm run build:cargo", "build:cross": "cross build --message-format=json > cross.log", "build:typescript": "tsc" }, diff --git a/rust/cyclotron-node/pnpm-lock.yaml b/rust/cyclotron-node/pnpm-lock.yaml new file mode 100644 index 0000000000000..7e73d417ac098 --- /dev/null +++ b/rust/cyclotron-node/pnpm-lock.yaml @@ -0,0 +1,103 @@ +lockfileVersion: '6.0' + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + +devDependencies: + '@neon-rs/cli': + specifier: 0.1.73 + version: 0.1.73 + '@types/node': + specifier: ^22.4.1 + version: 22.4.1 + typescript: + specifier: ^4.7.4 + version: 4.9.5 + +packages: + + /@cargo-messages/android-arm-eabi@0.1.72: + resolution: {integrity: sha512-gGZxIM1mj+Y5x+ULND6ZCNr7f70OJi9wDlycSK8hGONy9wrChN6JAIHryddC5cqcwlYAoQ6IDcDFElnhAYbybA==} + cpu: [arm] + os: [android] + requiresBuild: true + dev: true + optional: true + + /@cargo-messages/darwin-arm64@0.1.72: + resolution: {integrity: sha512-EAzN5MLaXPljZKZDO5qR+aBs44eSq2ZbEnS7AI/FziE3MzeXbrGOS3fLba5+7yWPFXJyZolXzePm8N1EBv8ovg==} + cpu: [arm64] + os: [darwin] + requiresBuild: true + dev: true + optional: true + + /@cargo-messages/darwin-x64@0.1.72: + resolution: {integrity: sha512-RLo6j8s3nYbjdd1LDct4wamfChyRit7zokUuxtIYCu9XOlltkN5vnj1vwnrPvoqCMZ/7CbbuHFwSTn9A71de/w==} + cpu: [x64] + os: [darwin] + requiresBuild: true + dev: true + optional: true + + /@cargo-messages/linux-arm-gnueabihf@0.1.72: + resolution: {integrity: sha512-tHsRshuzfjrX6SDW3jg6al8vMNLTMgczGnVYl5RuBZf/yrAUuwe30KxA9ge6w6mW6Ox797DyBchzAc9OLgTgmQ==} + cpu: [arm] + os: [linux] + requiresBuild: true + dev: true + optional: true + + /@cargo-messages/linux-x64-gnu@0.1.72: + resolution: {integrity: sha512-VGtL6CCnUbhsP4aYuBNT5kfrAL7o0qjrxw97a+ax13t+nJd26tVEEIKHMu5drvvS/Nm/hn7sLT8zMnnCv0pvHg==} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: true + optional: true + + /@cargo-messages/win32-arm64-msvc@0.1.72: + resolution: {integrity: sha512-V93Cgz39K+yqa3MveNbhh29pYCp8izK5uEavjPoxlNxAbsMCWH+s0verGDdUcfGxjR1H2V7oZ4FszPqR2SqMRQ==} + cpu: [arm64] + os: [win32] + requiresBuild: true + dev: true + optional: true + + /@cargo-messages/win32-x64-msvc@0.1.72: + resolution: {integrity: sha512-knz3uSrO0OSbq3U5VWfCY8FB4NsM43BOWLZ7x4sfaMOC1XWv+IyvDdkLe6DhJx8KUw46KIAimYs9YROrp6l46Q==} + cpu: [x64] + os: [win32] + requiresBuild: true + dev: true + optional: true + + /@neon-rs/cli@0.1.73: + resolution: {integrity: sha512-1kv8S/feB6UQWQQwsnGfkSkEBOtlFDNExnioL81E2BwvUWgjQPaseHgpi2EpWVgsPUgur5eBm4QowmlpWkD4/w==} + hasBin: true + optionalDependencies: + '@cargo-messages/android-arm-eabi': 0.1.72 + '@cargo-messages/darwin-arm64': 0.1.72 + '@cargo-messages/darwin-x64': 0.1.72 + '@cargo-messages/linux-arm-gnueabihf': 0.1.72 + '@cargo-messages/linux-x64-gnu': 0.1.72 + '@cargo-messages/win32-arm64-msvc': 0.1.72 + '@cargo-messages/win32-x64-msvc': 0.1.72 + dev: true + + /@types/node@22.4.1: + resolution: {integrity: sha512-1tbpb9325+gPnKK0dMm+/LMriX0vKxf6RnB0SZUqfyVkQ4fMgUSySqhxE/y8Jvs4NyF1yHzTfG9KlnkIODxPKg==} + dependencies: + undici-types: 6.19.8 + dev: true + + /typescript@4.9.5: + resolution: {integrity: sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==} + engines: {node: '>=4.2.0'} + hasBin: true + dev: true + + /undici-types@6.19.8: + resolution: {integrity: sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==} + dev: true From ae9cbba7084c128765ad1431afa420db0c4e41f4 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 17:08:35 +0100 Subject: [PATCH 10/24] Fix --- plugin-server/package.json | 1 + rust/cyclotron-node/package.json | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index 3ec7cbdb058a9..559de97edb9c0 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -23,6 +23,7 @@ "prettier:check": "prettier --check .", "prepublishOnly": "pnpm build", "setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse", + "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment", "services:start": "cd .. && docker compose -f docker-compose.dev.yml up", "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 0bfcfdd9d9bc1..8061300b0660e 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -7,7 +7,7 @@ "scripts": { "test": "cargo test", "build": "pnpm run build:cargo --release && pnpm run build:move-lib && pnpm run build:typescript", - "build:move-lib": "cp ../target/release/libcyclotron_node.so index.node || cp ../target/release/libcyclotron_node.dylib index.node", + "build:move-lib": "cp ../target/release/libcyclotron_node.dylib index.node || cp ../target/release/libcyclotron_node.so index.node", "build:cargo": "cargo build --message-format=json > cargo.log", "build:cargo:debug": "pnpm run build:cargo", "build:cross": "cross build --message-format=json > cross.log", From 03899ad236ebdaac3a0835428a9cae65ff2fab1d Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 17:09:12 +0100 Subject: [PATCH 11/24] Fix --- bin/install-cyclotron-node | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100755 bin/install-cyclotron-node diff --git a/bin/install-cyclotron-node b/bin/install-cyclotron-node deleted file mode 100755 index 51bee921d499c..0000000000000 --- a/bin/install-cyclotron-node +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash -ex - -# Builds the Cyclotron node module and installs it into plugin-server/node-modules/cyclotron - -if ! command -v rustc &> /dev/null || ! command -v cargo &> /dev/null; then - echo "Rust and Cargo need to be installed, please 'brew install rust' or equivalent" -fi - -cd rust/cyclotron-node - -npm install -npm run build - -cd ../../plugin-server - -mkdir -p node_modules/cyclotron - -cp ../rust/cyclotron-node/index.node node_modules/cyclotron/ From 12ab08051beda0af99ea2661c7024069d559693e Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 17:17:45 +0100 Subject: [PATCH 12/24] Fix remove the build step --- bin/plugin-server | 3 --- 1 file changed, 3 deletions(-) diff --git a/bin/plugin-server b/bin/plugin-server index 9206584539ffa..157e7896fc90b 100755 --- a/bin/plugin-server +++ b/bin/plugin-server @@ -47,9 +47,6 @@ if [ $? -ne 0 ]; then fi if [[ -n $DEBUG ]]; then - - pnpm build:cyclotron - if [[ -n $NO_WATCH ]]; then cmd="pnpm start:devNoWatch" else From 37d6a55c0f4fa01c6468a9c4fff941b6db0f84b7 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 17:19:42 +0100 Subject: [PATCH 13/24] Fixes --- .github/workflows/ci-plugin-server.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index a24eaf53d4e69..520f60f17cc74 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -234,6 +234,8 @@ jobs: run: | cd plugin-server pnpm install --frozen-lockfile + ls -al node_modules/ + la -al node_modules/cyclotron-node/ pnpm build - name: Wait for Clickhouse, Redis & Kafka From 81af2ebad5f1e9c4b60ebe259b94430ce3631764 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 17:26:34 +0100 Subject: [PATCH 14/24] Fix --- .github/workflows/ci-plugin-server.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index 520f60f17cc74..7978d81de9c61 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -235,7 +235,7 @@ jobs: cd plugin-server pnpm install --frozen-lockfile ls -al node_modules/ - la -al node_modules/cyclotron-node/ + ls -al node_modules/cyclotron-node/ pnpm build - name: Wait for Clickhouse, Redis & Kafka From d6c6c5f2758a2e4a8a90367401d4ba14e6b496ea Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 20 Aug 2024 18:36:51 +0100 Subject: [PATCH 15/24] fuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu --- plugin-server/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index 559de97edb9c0..f38a64b03a689 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -29,7 +29,7 @@ "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", "services": "pnpm services:stop && pnpm services:clean && pnpm services:start", "build:cyclotron": "cd ../rust/cyclotron-node && pnpm i && pnpm run build", - "preinstall": "pnpm run build:cyclotron" + "pnpm:devPreinstall": "pnpm run build:cyclotron" }, "graphile-worker": { "maxContiguousErrors": 300 From a1ea74439e948ef296237b0e88a94f84a647d9ba Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 20 Aug 2024 14:14:42 -0600 Subject: [PATCH 16/24] remove neon-cli, since we aren't using it --- rust/cyclotron-node/package.json | 1 - rust/cyclotron-node/pnpm-lock.yaml | 74 +----------------------------- 2 files changed, 1 insertion(+), 74 deletions(-) diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 8061300b0660e..abb730b4c91a8 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -16,7 +16,6 @@ "author": "", "license": "MIT", "devDependencies": { - "@neon-rs/cli": "0.1.73", "@types/node": "^22.4.1", "typescript": "^4.7.4" }, diff --git a/rust/cyclotron-node/pnpm-lock.yaml b/rust/cyclotron-node/pnpm-lock.yaml index 7e73d417ac098..f21814fcc1099 100644 --- a/rust/cyclotron-node/pnpm-lock.yaml +++ b/rust/cyclotron-node/pnpm-lock.yaml @@ -1,13 +1,10 @@ -lockfileVersion: '6.0' +lockfileVersion: '6.1' settings: autoInstallPeers: true excludeLinksFromLockfile: false devDependencies: - '@neon-rs/cli': - specifier: 0.1.73 - version: 0.1.73 '@types/node': specifier: ^22.4.1 version: 22.4.1 @@ -17,75 +14,6 @@ devDependencies: packages: - /@cargo-messages/android-arm-eabi@0.1.72: - resolution: {integrity: sha512-gGZxIM1mj+Y5x+ULND6ZCNr7f70OJi9wDlycSK8hGONy9wrChN6JAIHryddC5cqcwlYAoQ6IDcDFElnhAYbybA==} - cpu: [arm] - os: [android] - requiresBuild: true - dev: true - optional: true - - /@cargo-messages/darwin-arm64@0.1.72: - resolution: {integrity: sha512-EAzN5MLaXPljZKZDO5qR+aBs44eSq2ZbEnS7AI/FziE3MzeXbrGOS3fLba5+7yWPFXJyZolXzePm8N1EBv8ovg==} - cpu: [arm64] - os: [darwin] - requiresBuild: true - dev: true - optional: true - - /@cargo-messages/darwin-x64@0.1.72: - resolution: {integrity: sha512-RLo6j8s3nYbjdd1LDct4wamfChyRit7zokUuxtIYCu9XOlltkN5vnj1vwnrPvoqCMZ/7CbbuHFwSTn9A71de/w==} - cpu: [x64] - os: [darwin] - requiresBuild: true - dev: true - optional: true - - /@cargo-messages/linux-arm-gnueabihf@0.1.72: - resolution: {integrity: sha512-tHsRshuzfjrX6SDW3jg6al8vMNLTMgczGnVYl5RuBZf/yrAUuwe30KxA9ge6w6mW6Ox797DyBchzAc9OLgTgmQ==} - cpu: [arm] - os: [linux] - requiresBuild: true - dev: true - optional: true - - /@cargo-messages/linux-x64-gnu@0.1.72: - resolution: {integrity: sha512-VGtL6CCnUbhsP4aYuBNT5kfrAL7o0qjrxw97a+ax13t+nJd26tVEEIKHMu5drvvS/Nm/hn7sLT8zMnnCv0pvHg==} - cpu: [x64] - os: [linux] - requiresBuild: true - dev: true - optional: true - - /@cargo-messages/win32-arm64-msvc@0.1.72: - resolution: {integrity: sha512-V93Cgz39K+yqa3MveNbhh29pYCp8izK5uEavjPoxlNxAbsMCWH+s0verGDdUcfGxjR1H2V7oZ4FszPqR2SqMRQ==} - cpu: [arm64] - os: [win32] - requiresBuild: true - dev: true - optional: true - - /@cargo-messages/win32-x64-msvc@0.1.72: - resolution: {integrity: sha512-knz3uSrO0OSbq3U5VWfCY8FB4NsM43BOWLZ7x4sfaMOC1XWv+IyvDdkLe6DhJx8KUw46KIAimYs9YROrp6l46Q==} - cpu: [x64] - os: [win32] - requiresBuild: true - dev: true - optional: true - - /@neon-rs/cli@0.1.73: - resolution: {integrity: sha512-1kv8S/feB6UQWQQwsnGfkSkEBOtlFDNExnioL81E2BwvUWgjQPaseHgpi2EpWVgsPUgur5eBm4QowmlpWkD4/w==} - hasBin: true - optionalDependencies: - '@cargo-messages/android-arm-eabi': 0.1.72 - '@cargo-messages/darwin-arm64': 0.1.72 - '@cargo-messages/darwin-x64': 0.1.72 - '@cargo-messages/linux-arm-gnueabihf': 0.1.72 - '@cargo-messages/linux-x64-gnu': 0.1.72 - '@cargo-messages/win32-arm64-msvc': 0.1.72 - '@cargo-messages/win32-x64-msvc': 0.1.72 - dev: true - /@types/node@22.4.1: resolution: {integrity: sha512-1tbpb9325+gPnKK0dMm+/LMriX0vKxf6RnB0SZUqfyVkQ4fMgUSySqhxE/y8Jvs4NyF1yHzTfG9KlnkIODxPKg==} dependencies: From ae5ff72618a4bc12c320fb71a070bc794d4da038 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 20 Aug 2024 14:16:52 -0600 Subject: [PATCH 17/24] use preinstall, which appears to actually run before install, even with --frozen-lockfile --- plugin-server/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index f38a64b03a689..559de97edb9c0 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -29,7 +29,7 @@ "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", "services": "pnpm services:stop && pnpm services:clean && pnpm services:start", "build:cyclotron": "cd ../rust/cyclotron-node && pnpm i && pnpm run build", - "pnpm:devPreinstall": "pnpm run build:cyclotron" + "preinstall": "pnpm run build:cyclotron" }, "graphile-worker": { "maxContiguousErrors": 300 From 7df4f1aefa8a256ccbf607533283bbe754117ce8 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 20 Aug 2024 15:30:53 -0600 Subject: [PATCH 18/24] try dynamic import --- plugin-server/src/cdp/async-function-executor.ts | 2 +- plugin-server/src/cdp/cdp-consumers.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index 58d750627b2b0..cb67695e8b1e0 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -1,4 +1,4 @@ -import cyclotron from 'cyclotron-node' +const cyclotron = require('cyclotron-node') import { Histogram } from 'prom-client' import { buildIntegerMatcher } from '../config/config' diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 6f5c6e572eba8..cc1039a65f0df 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,5 +1,5 @@ import { captureException } from '@sentry/node' -import cyclotron from 'cyclotron-node' +const cyclotron = require('cyclotron-node') import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' From 2b77a5b37ba22d2aefd6e87ae592e00909d81ac9 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 21 Aug 2024 09:36:33 +0100 Subject: [PATCH 19/24] revert --- plugin-server/package.json | 2 +- plugin-server/src/cdp/async-function-executor.ts | 2 +- plugin-server/src/cdp/cdp-consumers.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index 559de97edb9c0..f38a64b03a689 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -29,7 +29,7 @@ "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", "services": "pnpm services:stop && pnpm services:clean && pnpm services:start", "build:cyclotron": "cd ../rust/cyclotron-node && pnpm i && pnpm run build", - "preinstall": "pnpm run build:cyclotron" + "pnpm:devPreinstall": "pnpm run build:cyclotron" }, "graphile-worker": { "maxContiguousErrors": 300 diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index cb67695e8b1e0..58d750627b2b0 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -1,4 +1,4 @@ -const cyclotron = require('cyclotron-node') +import cyclotron from 'cyclotron-node' import { Histogram } from 'prom-client' import { buildIntegerMatcher } from '../config/config' diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index cc1039a65f0df..6f5c6e572eba8 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,5 +1,5 @@ import { captureException } from '@sentry/node' -const cyclotron = require('cyclotron-node') +import cyclotron from 'cyclotron-node' import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' From ec673f3f2c9b41adf88c0293ab4f74e2f43d394a Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 21 Aug 2024 09:42:17 +0100 Subject: [PATCH 20/24] Fix up --- rust/cyclotron-node/src/index.ts | 46 ++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/rust/cyclotron-node/src/index.ts b/rust/cyclotron-node/src/index.ts index a3cace0199b74..5f4c38e74545e 100644 --- a/rust/cyclotron-node/src/index.ts +++ b/rust/cyclotron-node/src/index.ts @@ -93,7 +93,7 @@ interface InternalJob { parameters: string | null } -export async function initWorker(poolConfig: PoolConfig): Promise { +async function initWorker(poolConfig: PoolConfig): Promise { const initWorkerInternal: InternalPoolConfig = { db_url: poolConfig.dbUrl, max_connections: poolConfig.maxConnections, @@ -105,7 +105,7 @@ export async function initWorker(poolConfig: PoolConfig): Promise { return await cyclotron.initWorker(JSON.stringify(initWorkerInternal)) } -export async function initManager(managerConfig: ManagerConfig): Promise { +async function initManager(managerConfig: ManagerConfig): Promise { const managerConfigInternal: InternalManagerConfig = { shards: managerConfig.shards.map((shard) => ({ db_url: shard.dbUrl, @@ -119,7 +119,7 @@ export async function initManager(managerConfig: ManagerConfig): Promise { return await cyclotron.initManager(JSON.stringify(managerConfigInternal)) } -export async function maybeInitWorker(poolConfig: PoolConfig): Promise { +async function maybeInitWorker(poolConfig: PoolConfig): Promise { const initWorkerInternal: InternalPoolConfig = { db_url: poolConfig.dbUrl, max_connections: poolConfig.maxConnections, @@ -131,7 +131,7 @@ export async function maybeInitWorker(poolConfig: PoolConfig): Promise { return await cyclotron.maybeInitWorker(JSON.stringify(initWorkerInternal)) } -export async function maybeInitManager(managerConfig: ManagerConfig): Promise { +async function maybeInitManager(managerConfig: ManagerConfig): Promise { const managerConfigInternal: InternalManagerConfig = { shards: managerConfig.shards.map((shard) => ({ db_url: shard.dbUrl, @@ -183,34 +183,34 @@ function convertInternalJobToJob(jobInternal: InternalJob): Job { } } -export async function dequeueJobs(queueName: string, limit: number): Promise { +async function dequeueJobs(queueName: string, limit: number): Promise { const jobsStr = await cyclotron.dequeueJobs(queueName, limit) const jobs: InternalJob[] = JSON.parse(jobsStr) return jobs.map(convertInternalJobToJob) } -export async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { +async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { const jobsStr = await cyclotron.dequeueJobsWithVmState(queueName, limit) const jobs: InternalJob[] = JSON.parse(jobsStr) return jobs.map(convertInternalJobToJob) } -export async function flushJob(jobId: string): Promise { +async function flushJob(jobId: string): Promise { return await cyclotron.flushJob(jobId) } -export function setState(jobId: string, jobState: JobState): Promise { +function setState(jobId: string, jobState: JobState): Promise { return cyclotron.setState(jobId, jobState) } -export function setQueue(jobId: string, queueName: string): Promise { +function setQueue(jobId: string, queueName: string): Promise { return cyclotron.setQueue(jobId, queueName) } -export function setPriority(jobId: string, priority: number): Promise { +function setPriority(jobId: string, priority: number): Promise { return cyclotron.setPriority(jobId, priority) } -export function setScheduledAt(jobId: string, scheduledAt: Date): Promise { +function setScheduledAt(jobId: string, scheduledAt: Date): Promise { return cyclotron.setScheduledAt(jobId, scheduledAt.toISOString()) } @@ -223,17 +223,35 @@ function serializeObject(name: string, obj: Record | null): string throw new Error(`${name} must be either an object or null`) } -export function setVmState(jobId: string, vmState: Record | null): Promise { +function setVmState(jobId: string, vmState: Record | null): Promise { const serialized = serializeObject('vmState', vmState) return cyclotron.setVmState(jobId, serialized) } -export function setMetadata(jobId: string, metadata: Record | null): Promise { +function setMetadata(jobId: string, metadata: Record | null): Promise { const serialized = serializeObject('metadata', metadata) return cyclotron.setMetadata(jobId, serialized) } -export function setParameters(jobId: string, parameters: Record | null): Promise { +function setParameters(jobId: string, parameters: Record | null): Promise { const serialized = serializeObject('parameters', parameters) return cyclotron.setParameters(jobId, serialized) } + +export default { + initWorker, + initManager, + maybeInitWorker, + maybeInitManager, + createJob, + dequeueJobs, + dequeueJobsWithVmState, + flushJob, + setState, + setQueue, + setPriority, + setScheduledAt, + setVmState, + setMetadata, + setParameters, +} From d25cec3b2e9d06fa12544700b85479626c2f5e35 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 21 Aug 2024 09:49:04 +0100 Subject: [PATCH 21/24] Fixes --- plugin-server/src/config/config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 579930f5fb8f5..7de2856530e14 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -193,7 +193,7 @@ export function getDefaultConfig(): PluginsServerConfig { CDP_REDIS_PORT: 6479, // Cyclotron - CYCLOTRON_DATABASE_URL: isTestEnv() ? 'postgres://posthog:posthog@localhost:5432/cyclotron' : '', + CYCLOTRON_DATABASE_URL: '', } } From 9cdf3b7b9f10e5408a1a08291d10449edf82bcbb Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 21 Aug 2024 09:50:57 +0100 Subject: [PATCH 22/24] Rename package --- .github/workflows/ci-plugin-server.yml | 2 -- plugin-server/package.json | 2 +- plugin-server/pnpm-lock.yaml | 8 ++++---- plugin-server/src/cdp/async-function-executor.ts | 2 +- plugin-server/src/cdp/cdp-consumers.ts | 2 +- rust/cyclotron-node/package.json | 2 +- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index 7978d81de9c61..a24eaf53d4e69 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -234,8 +234,6 @@ jobs: run: | cd plugin-server pnpm install --frozen-lockfile - ls -al node_modules/ - ls -al node_modules/cyclotron-node/ pnpm build - name: Wait for Clickhouse, Redis & Kafka diff --git a/plugin-server/package.json b/plugin-server/package.json index f38a64b03a689..4ce865ed3c3b2 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -89,7 +89,7 @@ "v8-profiler-next": "^1.9.0", "vm2": "3.9.18", "detect-browser": "^5.3.0", - "cyclotron-node": "file:../rust/cyclotron-node" + "@posthog/cyclotron": "file:../rust/cyclotron-node" }, "devDependencies": { "0x": "^5.5.0", diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index 6982a913eea99..f242e25145a74 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -43,6 +43,9 @@ dependencies: '@posthog/clickhouse': specifier: ^1.7.0 version: 1.7.0 + '@posthog/cyclotron': + specifier: file:../rust/cyclotron-node + version: file:../rust/cyclotron-node '@posthog/hogvm': specifier: ^1.0.32 version: 1.0.32(luxon@3.4.4)(re2@1.20.3) @@ -73,9 +76,6 @@ dependencies: aws-sdk: specifier: ^2.927.0 version: 2.1366.0 - cyclotron-node: - specifier: file:../rust/cyclotron-node - version: file:../rust/cyclotron-node detect-browser: specifier: ^5.3.0 version: 5.3.0 @@ -10737,5 +10737,5 @@ packages: file:../rust/cyclotron-node: resolution: {directory: ../rust/cyclotron-node, type: directory} - name: cyclotron-node + name: '@posthog/cyclotron' dev: false diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index 58d750627b2b0..78a9374a78171 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -1,4 +1,4 @@ -import cyclotron from 'cyclotron-node' +import cyclotron from '@posthog/cyclotron' import { Histogram } from 'prom-client' import { buildIntegerMatcher } from '../config/config' diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 6f5c6e572eba8..fef401d472927 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,5 +1,5 @@ +import cyclotron from '@posthog/cyclotron' import { captureException } from '@sentry/node' -import cyclotron from 'cyclotron-node' import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index abb730b4c91a8..ce0965f7cc251 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -1,5 +1,5 @@ { - "name": "cyclotron-node", + "name": "@posthog/cyclotron", "version": "0.1.0", "description": "Node bindings for cyclotron", "main": "dist/index.js", From cb8c8b1a84a719d7efae3ee64eb0a3319d9ef9f2 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 21 Aug 2024 10:03:56 +0100 Subject: [PATCH 23/24] Fix --- rust/cyclotron-node/pnpm-lock.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/cyclotron-node/pnpm-lock.yaml b/rust/cyclotron-node/pnpm-lock.yaml index f21814fcc1099..9866808970bae 100644 --- a/rust/cyclotron-node/pnpm-lock.yaml +++ b/rust/cyclotron-node/pnpm-lock.yaml @@ -1,4 +1,4 @@ -lockfileVersion: '6.1' +lockfileVersion: '6.0' settings: autoInstallPeers: true From d29b7b8b06181b5301051db6e1c6230fb5c4dee6 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 21 Aug 2024 10:44:00 +0100 Subject: [PATCH 24/24] Added to docker ignore --- .dockerignore | 5 ++++- plugin-server/package.json | 2 +- rust/cyclotron-node/package.json | 3 ++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.dockerignore b/.dockerignore index 86b09f296acf9..5ff397890187a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -43,4 +43,7 @@ rust/.env rust/.github rust/docker -rust/target \ No newline at end of file +rust/target +rust/cyclotron-node/dist +rust/cyclotron-node/node_modules +rust/cyclotron-node/index.node diff --git a/plugin-server/package.json b/plugin-server/package.json index 4ce865ed3c3b2..e5442bf51b68d 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -28,7 +28,7 @@ "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", "services": "pnpm services:stop && pnpm services:clean && pnpm services:start", - "build:cyclotron": "cd ../rust/cyclotron-node && pnpm i && pnpm run build", + "build:cyclotron": "cd ../rust/cyclotron-node && pnpm run package", "pnpm:devPreinstall": "pnpm run build:cyclotron" }, "graphile-worker": { diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index ce0965f7cc251..a445cae4e9206 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -11,7 +11,8 @@ "build:cargo": "cargo build --message-format=json > cargo.log", "build:cargo:debug": "pnpm run build:cargo", "build:cross": "cross build --message-format=json > cross.log", - "build:typescript": "tsc" + "build:typescript": "tsc", + "package": "NODE_ENV=development pnpm i --dev && pnpm run build" }, "author": "", "license": "MIT",