Skip to content

Commit

Permalink
feat: Changes to node cyclotron package (#24481)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Aug 21, 2024
1 parent af6dd6c commit d88b46a
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 230 deletions.
5 changes: 4 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@
rust/.env
rust/.github
rust/docker
rust/target
rust/target
rust/cyclotron-node/dist
rust/cyclotron-node/node_modules
rust/cyclotron-node/index.node
18 changes: 0 additions & 18 deletions bin/install-cyclotron-node

This file was deleted.

4 changes: 0 additions & 4 deletions bin/plugin-server
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ fi

./bin/migrate-check

if [[ -n $DEBUG ]]; then
./bin/install-cyclotron-node
fi

cd plugin-server

if [[ -n $DEBUG ]]; then
Expand Down
7 changes: 5 additions & 2 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",
"services": "pnpm services:stop && pnpm services:clean && pnpm services:start"
"services": "pnpm services:stop && pnpm services:clean && pnpm services:start",
"build:cyclotron": "cd ../rust/cyclotron-node && pnpm run package",
"pnpm:devPreinstall": "pnpm run build:cyclotron"
},
"graphile-worker": {
"maxContiguousErrors": 300
Expand Down Expand Up @@ -86,7 +88,8 @@
"uuid": "^9.0.1",
"v8-profiler-next": "^1.9.0",
"vm2": "3.9.18",
"detect-browser": "^5.3.0"
"detect-browser": "^5.3.0",
"@posthog/cyclotron": "file:../rust/cyclotron-node"
},
"devDependencies": {
"0x": "^5.5.0",
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugin-server/src/cdp/async-function-executor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import cyclotron from '@posthog/cyclotron'
import { Histogram } from 'prom-client'

import { buildIntegerMatcher } from '../config/config'
import { PluginsServerConfig, ValueMatcher } from '../types'
import { trackedFetch } from '../utils/fetch'
import { status } from '../utils/status'
import * as cyclotron from '../worker/cyclotron'
import { RustyHook } from '../worker/rusty-hook'
import { HogFunctionInvocationAsyncRequest, HogFunctionInvocationAsyncResponse } from './types'

Expand Down
27 changes: 23 additions & 4 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import cyclotron from '@posthog/cyclotron'
import { captureException } from '@sentry/node'
import { features, librdkafkaVersion, Message } from 'node-rdkafka'
import { Counter, Histogram } from 'prom-client'
Expand All @@ -20,7 +21,6 @@ import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { captureTeamEvent } from '../utils/posthog'
import { status } from '../utils/status'
import { castTimestampOrNow } from '../utils/utils'
import * as cyclotron from '../worker/cyclotron'
import { RustyHook } from '../worker/rusty-hook'
import { AsyncFunctionExecutor } from './async-function-executor'
import { GroupsManager } from './groups-manager'
Expand Down Expand Up @@ -444,7 +444,12 @@ abstract class CdpConsumerBase {
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub)

await Promise.all([this.hogFunctionManager.start()])
await Promise.all([
this.hogFunctionManager.start(),
this.hub.CYCLOTRON_DATABASE_URL
? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
: Promise.resolve(),
])

this.kafkaProducer = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
Expand Down Expand Up @@ -703,14 +708,14 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
protected name = 'CdpCyclotronWorker'
protected topic = 'UNUSED-CdpCyclotronWorker'
protected consumerGroupId = 'UNUSED-CdpCyclotronWorker'

private runningWorker: Promise<void> | undefined
private isUnhealthy = false

public async _handleEachBatch(_: Message[]): Promise<void> {
// Not called, we override `start` below to use Cyclotron instead.
}

public async innerStart() {
private async innerStart() {
try {
const limit = 100 // TODO: Make configurable.
while (!this.isStopping) {
Expand All @@ -722,15 +727,29 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
}
}
} catch (err) {
this.isUnhealthy = true
console.error('Error in Cyclotron worker', err)
throw err
}
}

public async start() {
await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL })

// Consumer `start` expects an async task is started, and not that `start` itself blocks
// indefinitely.
this.runningWorker = this.innerStart()

return Promise.resolve()
}

public async stop() {
await super.stop()
await this.runningWorker
}

public isHealthy() {
return this.isUnhealthy
}
}
2 changes: 1 addition & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_REDIS_PORT: 6479,

// Cyclotron
CYCLOTRON_DATABASE_URL: isTestEnv() ? 'postgres://posthog:posthog@localhost:5432/cyclotron' : '',
CYCLOTRON_DATABASE_URL: '',
}
}

Expand Down
13 changes: 0 additions & 13 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import { posthog } from '../utils/posthog'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedisClient, delay } from '../utils/utils'
import * as cyclotron from '../worker/cyclotron'
import { ActionManager } from '../worker/ingestion/action-manager'
import { ActionMatcher } from '../worker/ingestion/action-matcher'
import { AppMetrics } from '../worker/ingestion/app-metrics'
Expand Down Expand Up @@ -542,9 +541,6 @@ export async function startPluginsServer(

if (capabilities.cdpProcessedEvents) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
}
const consumer = new CdpProcessedEventsConsumer(hub)
await consumer.start()

Expand All @@ -555,9 +551,6 @@ export async function startPluginsServer(

if (capabilities.cdpFunctionCallbacks) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
}
const consumer = new CdpFunctionCallbackConsumer(hub)
await consumer.start()

Expand All @@ -575,9 +568,6 @@ export async function startPluginsServer(

if (capabilities.cdpFunctionOverflow) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
}
const consumer = new CdpOverflowConsumer(hub)
await consumer.start()

Expand All @@ -589,9 +579,6 @@ export async function startPluginsServer(
if (capabilities.cdpCyclotronWorker) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
await cyclotron.initWorker({ dbUrl: hub.CYCLOTRON_DATABASE_URL })

const worker = new CdpCyclotronWorker(hub)
await worker.start()
} else {
Expand Down
14 changes: 1 addition & 13 deletions production.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,12 @@ COPY ./bin/ ./bin/
COPY babel.config.js tsconfig.json webpack.config.js tailwind.config.js ./
RUN pnpm build


#
# ---------------------------------------------------------
#
FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS cyclotron-node-build
FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS plugin-server-build
WORKDIR /code
COPY ./rust ./rust
RUN cd rust/cyclotron-node && \
cargo build -r && \
mv ../target/release/libcyclotron_node.so /code/index.node


#
# ---------------------------------------------------------
#
FROM node:18.19.1-bullseye-slim AS plugin-server-build
WORKDIR /code/plugin-server
SHELL ["/bin/bash", "-e", "-o", "pipefail", "-c"]

Expand Down Expand Up @@ -193,8 +183,6 @@ COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/dist
COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/node_modules /code/plugin-server/node_modules
COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/package.json /code/plugin-server/package.json

# Copy the Rust cyclotron-node module.
COPY --from=cyclotron-node-build --chown=posthog:posthog /code/index.node /code/plugin-server/node_modules/cyclotron/index.node

# Copy the Python dependencies and Django staticfiles from the posthog-build stage.
COPY --from=posthog-build --chown=posthog:posthog /code/staticfiles /code/staticfiles
Expand Down
1 change: 1 addition & 0 deletions rust/cyclotron-node/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ index.node
**/.DS_Store
npm-debug.log*cargo.log
cross.log
dist/
8 changes: 0 additions & 8 deletions rust/cyclotron-node/bin/build.sh

This file was deleted.

125 changes: 0 additions & 125 deletions rust/cyclotron-node/package-lock.json

This file was deleted.

Loading

0 comments on commit d88b46a

Please sign in to comment.