Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Changes to node cyclotron package #24481

Merged
merged 25 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@
rust/.env
rust/.github
rust/docker
rust/target
rust/target
rust/cyclotron-node/dist
rust/cyclotron-node/node_modules
rust/cyclotron-node/index.node
18 changes: 0 additions & 18 deletions bin/install-cyclotron-node

This file was deleted.

4 changes: 0 additions & 4 deletions bin/plugin-server
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ fi

./bin/migrate-check

if [[ -n $DEBUG ]]; then
./bin/install-cyclotron-node
fi

cd plugin-server

if [[ -n $DEBUG ]]; then
Expand Down
7 changes: 5 additions & 2 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",
"services": "pnpm services:stop && pnpm services:clean && pnpm services:start"
"services": "pnpm services:stop && pnpm services:clean && pnpm services:start",
"build:cyclotron": "cd ../rust/cyclotron-node && pnpm run package",
"pnpm:devPreinstall": "pnpm run build:cyclotron"
},
"graphile-worker": {
"maxContiguousErrors": 300
Expand Down Expand Up @@ -86,7 +88,8 @@
"uuid": "^9.0.1",
"v8-profiler-next": "^1.9.0",
"vm2": "3.9.18",
"detect-browser": "^5.3.0"
"detect-browser": "^5.3.0",
"@posthog/cyclotron": "file:../rust/cyclotron-node"
},
"devDependencies": {
"0x": "^5.5.0",
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugin-server/src/cdp/async-function-executor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import cyclotron from '@posthog/cyclotron'
import { Histogram } from 'prom-client'

import { buildIntegerMatcher } from '../config/config'
import { PluginsServerConfig, ValueMatcher } from '../types'
import { trackedFetch } from '../utils/fetch'
import { status } from '../utils/status'
import * as cyclotron from '../worker/cyclotron'
import { RustyHook } from '../worker/rusty-hook'
import { HogFunctionInvocationAsyncRequest, HogFunctionInvocationAsyncResponse } from './types'

Expand Down
27 changes: 23 additions & 4 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import cyclotron from '@posthog/cyclotron'
import { captureException } from '@sentry/node'
import { features, librdkafkaVersion, Message } from 'node-rdkafka'
import { Counter, Histogram } from 'prom-client'
Expand All @@ -20,7 +21,6 @@ import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { captureTeamEvent } from '../utils/posthog'
import { status } from '../utils/status'
import { castTimestampOrNow } from '../utils/utils'
import * as cyclotron from '../worker/cyclotron'
import { RustyHook } from '../worker/rusty-hook'
import { AsyncFunctionExecutor } from './async-function-executor'
import { GroupsManager } from './groups-manager'
Expand Down Expand Up @@ -444,7 +444,12 @@ abstract class CdpConsumerBase {
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub)

await Promise.all([this.hogFunctionManager.start()])
await Promise.all([
this.hogFunctionManager.start(),
this.hub.CYCLOTRON_DATABASE_URL
? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
: Promise.resolve(),
])

this.kafkaProducer = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
Expand Down Expand Up @@ -703,14 +708,14 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
protected name = 'CdpCyclotronWorker'
protected topic = 'UNUSED-CdpCyclotronWorker'
protected consumerGroupId = 'UNUSED-CdpCyclotronWorker'

private runningWorker: Promise<void> | undefined
private isUnhealthy = false

public async _handleEachBatch(_: Message[]): Promise<void> {
// Not called, we override `start` below to use Cyclotron instead.
}

public async innerStart() {
private async innerStart() {
try {
const limit = 100 // TODO: Make configurable.
while (!this.isStopping) {
Expand All @@ -722,15 +727,29 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
}
}
} catch (err) {
this.isUnhealthy = true
console.error('Error in Cyclotron worker', err)
throw err
}
}

public async start() {
await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL })

// Consumer `start` expects an async task is started, and not that `start` itself blocks
// indefinitely.
this.runningWorker = this.innerStart()

return Promise.resolve()
}

public async stop() {
await super.stop()
await this.runningWorker
}

public isHealthy() {
return this.isUnhealthy
}
}
2 changes: 1 addition & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_REDIS_PORT: 6479,

// Cyclotron
CYCLOTRON_DATABASE_URL: isTestEnv() ? 'postgres://posthog:posthog@localhost:5432/cyclotron' : '',
CYCLOTRON_DATABASE_URL: '',
}
}

Expand Down
13 changes: 0 additions & 13 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import { posthog } from '../utils/posthog'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedisClient, delay } from '../utils/utils'
import * as cyclotron from '../worker/cyclotron'
import { ActionManager } from '../worker/ingestion/action-manager'
import { ActionMatcher } from '../worker/ingestion/action-matcher'
import { AppMetrics } from '../worker/ingestion/app-metrics'
Expand Down Expand Up @@ -542,9 +541,6 @@ export async function startPluginsServer(

if (capabilities.cdpProcessedEvents) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
}
const consumer = new CdpProcessedEventsConsumer(hub)
await consumer.start()

Expand All @@ -555,9 +551,6 @@ export async function startPluginsServer(

if (capabilities.cdpFunctionCallbacks) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
}
const consumer = new CdpFunctionCallbackConsumer(hub)
await consumer.start()

Expand All @@ -575,9 +568,6 @@ export async function startPluginsServer(

if (capabilities.cdpFunctionOverflow) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
}
const consumer = new CdpOverflowConsumer(hub)
await consumer.start()

Expand All @@ -589,9 +579,6 @@ export async function startPluginsServer(
if (capabilities.cdpCyclotronWorker) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
await cyclotron.initManager({ shards: [{ dbUrl: hub.CYCLOTRON_DATABASE_URL }] })
await cyclotron.initWorker({ dbUrl: hub.CYCLOTRON_DATABASE_URL })

const worker = new CdpCyclotronWorker(hub)
await worker.start()
} else {
Expand Down
14 changes: 1 addition & 13 deletions production.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,19 @@
COPY babel.config.js tsconfig.json webpack.config.js tailwind.config.js ./
RUN pnpm build


#
# ---------------------------------------------------------
#
FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS cyclotron-node-build
FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS plugin-server-build
WORKDIR /code
COPY ./rust ./rust
RUN cd rust/cyclotron-node && \
cargo build -r && \
mv ../target/release/libcyclotron_node.so /code/index.node


#
# ---------------------------------------------------------
#
FROM node:18.19.1-bullseye-slim AS plugin-server-build
WORKDIR /code/plugin-server
SHELL ["/bin/bash", "-e", "-o", "pipefail", "-c"]

# Compile and install Node.js dependencies.
COPY ./plugin-server/package.json ./plugin-server/pnpm-lock.yaml ./plugin-server/tsconfig.json ./
COPY ./plugin-server/patches/ ./patches/
RUN apt-get update && \

Check warning on line 53 in production.Dockerfile

View workflow job for this annotation

GitHub Actions / Lint changed Dockerfiles

Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
apt-get install -y --no-install-recommends \
"make" \
"g++" \
Expand Down Expand Up @@ -102,7 +92,7 @@
# We install those dependencies on a custom folder that we will
# then copy to the last image.
COPY requirements.txt ./
RUN apt-get update && \

Check warning on line 95 in production.Dockerfile

View workflow job for this annotation

GitHub Actions / Lint changed Dockerfiles

Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
apt-get install -y --no-install-recommends \
"build-essential" \
"git" \
Expand Down Expand Up @@ -136,7 +126,7 @@
SHELL ["/bin/bash", "-e", "-o", "pipefail", "-c"]

# Fetch the GeoLite2-City database that will be used for IP geolocation within Django.
RUN apt-get update && \

Check warning on line 129 in production.Dockerfile

View workflow job for this annotation

GitHub Actions / Lint changed Dockerfiles

Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
apt-get install -y --no-install-recommends \
"ca-certificates" \
"curl" \
Expand All @@ -158,7 +148,7 @@

# Install OS runtime dependencies.
# Note: please add in this stage runtime dependences only!
RUN apt-get update && \

Check warning on line 151 in production.Dockerfile

View workflow job for this annotation

GitHub Actions / Lint changed Dockerfiles

Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`

Check notice on line 151 in production.Dockerfile

View workflow job for this annotation

GitHub Actions / Lint changed Dockerfiles

Delete the apt-get lists after installing something
apt-get install -y --no-install-recommends \
"chromium" \
"chromium-driver" \
Expand All @@ -169,7 +159,7 @@
"gettext-base"

# Install NodeJS 18.
RUN apt-get install -y --no-install-recommends \

Check warning on line 162 in production.Dockerfile

View workflow job for this annotation

GitHub Actions / Lint changed Dockerfiles

Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
"curl" \
&& \
curl -fsSL https://deb.nodesource.com/setup_18.x | bash - && \
Expand All @@ -193,8 +183,6 @@
COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/node_modules /code/plugin-server/node_modules
COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/package.json /code/plugin-server/package.json

# Copy the Rust cyclotron-node module.
COPY --from=cyclotron-node-build --chown=posthog:posthog /code/index.node /code/plugin-server/node_modules/cyclotron/index.node

# Copy the Python dependencies and Django staticfiles from the posthog-build stage.
COPY --from=posthog-build --chown=posthog:posthog /code/staticfiles /code/staticfiles
Expand Down Expand Up @@ -232,4 +220,4 @@
# Expose the port from which we serve OpenMetrics data.
EXPOSE 8001
COPY unit.json.tpl /docker-entrypoint.d/unit.json.tpl
USER root

Check warning on line 223 in production.Dockerfile

View workflow job for this annotation

GitHub Actions / Lint changed Dockerfiles

Last USER should not be root
Expand Down
1 change: 1 addition & 0 deletions rust/cyclotron-node/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ index.node
**/.DS_Store
npm-debug.log*cargo.log
cross.log
dist/
8 changes: 0 additions & 8 deletions rust/cyclotron-node/bin/build.sh

This file was deleted.

125 changes: 0 additions & 125 deletions rust/cyclotron-node/package-lock.json

This file was deleted.

Loading
Loading