Skip to content

Commit

Permalink
feat: make rusty-hook Hog-aware, sending responses back via kafka (#2…
Browse files Browse the repository at this point in the history
…3619)

Co-authored-by: Ben White <[email protected]>
  • Loading branch information
bretthoerner and benjackwhite authored Jul 16, 2024
1 parent 0adb5d5 commit 8f4df45
Show file tree
Hide file tree
Showing 26 changed files with 1,622 additions and 177 deletions.
4 changes: 3 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@
"DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog",
"KAFKA_HOSTS": "localhost:9092",
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True"
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "*"
},
"presentation": {
"group": "main"
Expand Down
1 change: 1 addition & 0 deletions bin/start
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
export DEBUG=${DEBUG:-1}
export SKIP_SERVICE_VERSION_REQUIREMENTS=1
export BILLING_SERVICE_URL=${BILLING_SERVICE_URL:-https://billing.dev.posthog.dev}
export HOG_HOOK_URL=${HOG_HOOK_URL:-http://localhost:3300/hoghook}

service_warning() {
echo -e "\033[0;31m$1 isn't ready. You can run the stack with:\ndocker compose -f docker-compose.dev.yml up\nIf you have already ran that, just make sure that services are starting properly, and sit back.\nWaiting for $1 to start...\033[0m"
Expand Down
15 changes: 11 additions & 4 deletions plugin-server/src/cdp/async-function-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PluginsServerConfig } from '../types'
import { buildIntegerMatcher } from '../config/config'
import { PluginsServerConfig, ValueMatcher } from '../types'
import { trackedFetch } from '../utils/fetch'
import { status } from '../utils/status'
import { RustyHook } from '../worker/rusty-hook'
Expand All @@ -9,7 +10,11 @@ export type AsyncFunctionExecutorOptions = {
}

export class AsyncFunctionExecutor {
constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {}
hogHookEnabledForTeams: ValueMatcher<number>

constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {
this.hogHookEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS, true)
}

async execute(
request: HogFunctionInvocationResult,
Expand Down Expand Up @@ -74,8 +79,10 @@ export class AsyncFunctionExecutor {
// Finally overwrite the args with the sanitized ones
request.asyncFunctionRequest.args = [url, { method, headers, body }]

if (!options?.sync === false) {
// TODO: Add rusty hook support
// If the caller hasn't forced it to be synchronous and the team has the rustyhook enabled, enqueue it
if (!options?.sync && this.hogHookEnabledForTeams(request.teamId)) {
await this.rustyHook.enqueueForHog(request)
return
}

status.info('🦔', `[HogExecutor] Webhook not sent via rustyhook, sending directly instead`)
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ export function getDefaultConfig(): PluginsServerConfig {
RUSTY_HOOK_FOR_TEAMS: '',
RUSTY_HOOK_ROLLOUT_PERCENTAGE: 0,
RUSTY_HOOK_URL: '',
HOG_HOOK_URL: '',
CAPTURE_CONFIG_REDIS_HOST: null,

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
Expand Down Expand Up @@ -180,6 +181,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_WATCHER_MIN_OBSERVATIONS: 3,
CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: 0.8,
CDP_WATCHER_DISABLED_RATING_THRESHOLD: 0.5,
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
}
}

Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export type CdpConfig = {
CDP_WATCHER_MIN_OBSERVATIONS: number
CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: number
CDP_WATCHER_DISABLED_RATING_THRESHOLD: number
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
}

export interface PluginsServerConfig extends CdpConfig {
Expand Down Expand Up @@ -219,6 +220,7 @@ export interface PluginsServerConfig extends CdpConfig {
RUSTY_HOOK_FOR_TEAMS: string
RUSTY_HOOK_ROLLOUT_PERCENTAGE: number
RUSTY_HOOK_URL: string
HOG_HOOK_URL: string
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean
PIPELINE_STEP_STALLED_LOG_TIMEOUT: number
CAPTURE_CONFIG_REDIS_HOST: string | null // Redis cluster to use to coordinate with capture (overflow, routing)
Expand Down
52 changes: 51 additions & 1 deletion plugin-server/src/worker/rusty-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Webhook } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import fetch from 'node-fetch'

import { HogFunctionInvocationResult } from '../cdp/types'
import { buildIntegerMatcher } from '../config/config'
import { PluginsServerConfig, ValueMatcher } from '../types'
import { isProdEnv } from '../utils/env-utils'
Expand Down Expand Up @@ -29,7 +30,11 @@ export class RustyHook {
constructor(
private serverConfig: Pick<
PluginsServerConfig,
'RUSTY_HOOK_URL' | 'RUSTY_HOOK_FOR_TEAMS' | 'RUSTY_HOOK_ROLLOUT_PERCENTAGE' | 'EXTERNAL_REQUEST_TIMEOUT_MS'
| 'RUSTY_HOOK_URL'
| 'HOG_HOOK_URL'
| 'RUSTY_HOOK_FOR_TEAMS'
| 'RUSTY_HOOK_ROLLOUT_PERCENTAGE'
| 'EXTERNAL_REQUEST_TIMEOUT_MS'
>
) {
this.enabledForTeams = buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true)
Expand Down Expand Up @@ -122,4 +127,49 @@ export class RustyHook {

return true
}

public async enqueueForHog(payload: HogFunctionInvocationResult): Promise<boolean> {
// This is a temporary copy of `enqueueIfEnabledForTeam` above for Hog fetches because the
// API differs. It will likely be replaced with a Kafka topic soon.

const body = JSON.stringify(payload)

// We attempt to enqueue into the rusty-hook service until we succeed. This is deliberatly
// designed to block up the consumer if rusty-hook is down or if we deploy code that
// sends malformed requests. The entire purpose of rusty-hook is to reliably deliver webhooks,
// so we'd rather leave items in the Kafka topic until we manage to get them into rusty-hook.
let attempt = 0
while (true) {
try {
attempt += 1
const response = await fetch(this.serverConfig.HOG_HOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,

// Sure, it's not an external request, but we should have a timeout and this is as
// good as any.
timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS,
})

if (response.ok) {
// Success, exit the loop.
break
}

// Throw to unify error handling below.
throw new Error(
`rusty-hook for Hog returned ${response.status} ${response.statusText}: ${await response.text()}`
)
} catch (error) {
status.error('🔴', 'Webhook enqueue to rusty-hook for Hog failed', { error, attempt })
Sentry.captureException(error)
}

const delayMs = Math.min(2 ** (attempt - 1) * RUSTY_HOOK_BASE_DELAY_MS, MAX_RUSTY_HOOK_DELAY_MS)
await sleep(delayMs)
}

return true
}
}
9 changes: 6 additions & 3 deletions plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ describe('CDP Processed Events Consuner', () => {
},
})

expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toEqual({
const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])
// Parse body so it can match by object equality rather than exact string equality
msg.value.asyncFunctionRequest.args[1].body = JSON.parse(msg.value.asyncFunctionRequest.args[1].body)
expect(msg).toEqual({
key: expect.any(String),
topic: 'cdp_function_callbacks_test',
value: {
Expand All @@ -225,7 +228,7 @@ describe('CDP Processed Events Consuner', () => {
'https://example.com/posthog-webhook',
{
headers: { version: 'v=1.0.0' },
body: JSON.stringify({
body: {
event: {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
name: '$pageview',
Expand All @@ -241,7 +244,7 @@ describe('CDP Processed Events Consuner', () => {
person: null,
event_url:
'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test',
}),
},
method: 'POST',
},
],
Expand Down
Loading

0 comments on commit 8f4df45

Please sign in to comment.