diff --git a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts index 38653f698eaa6..b4d7e07e7da75 100644 --- a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts @@ -1,6 +1,7 @@ import { Consumer, Kafka } from 'kafkajs' import * as schedule from 'node-schedule' import { AppMetrics } from 'worker/ingestion/app-metrics' +import { RustyHook } from 'worker/rusty-hook' import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' import { Hub, PluginsServerConfig } from '../../types' @@ -23,7 +24,7 @@ export const startAsyncOnEventHandlerConsumer = async ({ }) => { /* Consumes analytics events from the Kafka topic `clickhouse_events_json` - and processes any onEvent plugin handlers configured for the team. + and processes any onEvent plugin handlers configured for the team. At the moment this is just a wrapper around `IngestionConsumer`. We may want to further remove that abstraction in the future. @@ -45,6 +46,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ teamManager, organizationManager, serverConfig, + rustyHook, appMetrics, }: { kafka: Kafka @@ -52,11 +54,12 @@ export const startAsyncWebhooksHandlerConsumer = async ({ teamManager: TeamManager organizationManager: OrganizationManager serverConfig: PluginsServerConfig + rustyHook: RustyHook appMetrics: AppMetrics }) => { /* Consumes analytics events from the Kafka topic `clickhouse_events_json` - and processes any onEvent plugin handlers configured for the team. + and processes any onEvent plugin handlers configured for the team. At the moment this is just a wrapper around `IngestionConsumer`. We may want to further remove that abstraction in the future. @@ -78,6 +81,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ postgres, teamManager, organizationManager, + rustyHook, appMetrics, serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS ) diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index f44567183e144..42cdee24b3bab 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -10,7 +10,7 @@ import { Counter } from 'prom-client' import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' -import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' +import { buildIntegerMatcher, defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' @@ -24,6 +24,7 @@ import { OrganizationManager } from '../worker/ingestion/organization-manager' import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' +import { RustyHook } from '../worker/rusty-hook' import { GraphileWorker } from './graphile-worker/graphile-worker' import { loadPluginSchedule } from './graphile-worker/schedule' import { startGraphileWorker } from './graphile-worker/worker-setup' @@ -356,6 +357,13 @@ export async function startPluginsServer( const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig) const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager) const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) + const rustyHook = + hub?.rustyHook ?? + new RustyHook( + buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true), + serverConfig.RUSTY_HOOK_URL, + serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS + ) const appMetrics = hub?.appMetrics ?? new AppMetrics( @@ -371,6 +379,7 @@ export async function startPluginsServer( teamManager: teamManager, organizationManager: organizationManager, serverConfig: serverConfig, + rustyHook: rustyHook, appMetrics: appMetrics, }) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 208e8cb48b43f..0031ec514f39a 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -17,6 +17,7 @@ import { Kafka } from 'kafkajs' import { DateTime } from 'luxon' import { Job } from 'node-schedule' import { VM } from 'vm2' +import { RustyHook } from 'worker/rusty-hook' import { ObjectStorage } from './main/services/object_storage' import { DB } from './utils/db/db' @@ -267,6 +268,7 @@ export interface Hub extends PluginsServerConfig { rootAccessManager: RootAccessManager eventsProcessor: EventsProcessor appMetrics: AppMetrics + rustyHook: RustyHook // geoip database, setup in workers mmdb?: ReaderModel // diagnostics @@ -280,7 +282,6 @@ export interface Hub extends PluginsServerConfig { pluginConfigsToSkipElementsParsing: ValueMatcher poeEmbraceJoinForTeams: ValueMatcher poeWritesExcludeTeams: ValueMatcher - rustyHookForTeams: ValueMatcher // lookups eventsToDropByToken: Map } diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 9a7af116308a3..0e14d29bf5643 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -26,6 +26,7 @@ import { AppMetrics } from '../../worker/ingestion/app-metrics' import { OrganizationManager } from '../../worker/ingestion/organization-manager' import { EventsProcessor } from '../../worker/ingestion/process-event' import { TeamManager } from '../../worker/ingestion/team-manager' +import { RustyHook } from '../../worker/rusty-hook' import { isTestEnv } from '../env-utils' import { status } from '../status' import { createRedisPool, UUIDT } from '../utils' @@ -141,6 +142,11 @@ export async function createHub( const organizationManager = new OrganizationManager(postgres, teamManager) const pluginsApiKeyManager = new PluginsApiKeyManager(db) const rootAccessManager = new RootAccessManager(db) + const rustyHook = new RustyHook( + buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true), + serverConfig.RUSTY_HOOK_URL, + serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS + ) const enqueuePluginJob = async (job: EnqueuedPluginJob) => { // NOTE: we use the producer directly here rather than using the wrapper @@ -185,11 +191,11 @@ export async function createHub( organizationManager, pluginsApiKeyManager, rootAccessManager, + rustyHook, conversionBufferEnabledTeams, pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true), poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true), poeWritesExcludeTeams: buildIntegerMatcher(process.env.POE_WRITES_EXCLUDE_TEAMS, false), - rustyHookForTeams: buildIntegerMatcher(process.env.RUSTY_HOOK_FOR_TEAMS, true), eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID), } diff --git a/plugin-server/src/worker/ingestion/hooks.ts b/plugin-server/src/worker/ingestion/hooks.ts index c2a6c0536df46..3cc811bc795db 100644 --- a/plugin-server/src/worker/ingestion/hooks.ts +++ b/plugin-server/src/worker/ingestion/hooks.ts @@ -1,6 +1,7 @@ import { captureException } from '@sentry/node' import { Histogram } from 'prom-client' import { format } from 'util' +import { RustyHook } from 'worker/rusty-hook' import { Action, Hook, PostIngestionEvent, Team } from '../../types' import { PostgresRouter, PostgresUse } from '../../utils/db/postgres' @@ -254,6 +255,7 @@ export class HookCommander { postgres: PostgresRouter teamManager: TeamManager organizationManager: OrganizationManager + rustyHook: RustyHook appMetrics: AppMetrics siteUrl: string /** Hook request timeout in ms. */ @@ -263,6 +265,7 @@ export class HookCommander { postgres: PostgresRouter, teamManager: TeamManager, organizationManager: OrganizationManager, + rustyHook: RustyHook, appMetrics: AppMetrics, timeout: number ) { @@ -275,6 +278,7 @@ export class HookCommander { status.warn('⚠️', 'SITE_URL env is not set for webhooks') this.siteUrl = '' } + this.rustyHook = rustyHook this.appMetrics = appMetrics this.EXTERNAL_REQUEST_TIMEOUT = timeout } @@ -350,6 +354,24 @@ export class HookCommander { const message = this.formatMessage(webhookUrl, action, event, team) end() + const body = JSON.stringify(message, undefined, 4) + const enqueuedInRustyHook = await this.rustyHook.enqueueIfEnabledForTeam({ + webhook: { + url: webhookUrl, + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body, + }, + teamId: event.teamId, + pluginId: -2, // -2 is hardcoded to mean webhooks + pluginConfigId: -2, // -2 is hardcoded to mean webhooks + }) + + if (enqueuedInRustyHook) { + // Rusty-Hook handles it from here, so we're done. + return + } + const slowWarningTimeout = this.EXTERNAL_REQUEST_TIMEOUT * 0.7 const timeout = setTimeout(() => { status.warn( @@ -363,7 +385,7 @@ export class HookCommander { await instrumentWebhookStep('fetch', async () => { const request = await trackedFetch(webhookUrl, { method: 'POST', - body: JSON.stringify(message, undefined, 4), + body, headers: { 'Content-Type': 'application/json' }, timeout: this.EXTERNAL_REQUEST_TIMEOUT, }) @@ -425,6 +447,24 @@ export class HookCommander { data: { ...data, person: sendablePerson }, } + const body = JSON.stringify(payload, undefined, 4) + const enqueuedInRustyHook = await this.rustyHook.enqueueIfEnabledForTeam({ + webhook: { + url: hook.target, + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body, + }, + teamId: event.teamId, + pluginId: -1, // -1 is hardcoded to mean resthooks + pluginConfigId: -1, // -1 is hardcoded to mean resthooks + }) + + if (enqueuedInRustyHook) { + // Rusty-Hook handles it from here, so we're done. + return + } + const slowWarningTimeout = this.EXTERNAL_REQUEST_TIMEOUT * 0.7 const timeout = setTimeout(() => { status.warn( @@ -437,7 +477,7 @@ export class HookCommander { try { const request = await trackedFetch(hook.target, { method: 'POST', - body: JSON.stringify(payload, undefined, 4), + body, headers: { 'Content-Type': 'application/json' }, timeout: this.EXTERNAL_REQUEST_TIMEOUT, }) diff --git a/plugin-server/src/worker/metrics.ts b/plugin-server/src/worker/metrics.ts new file mode 100644 index 0000000000000..3cd18b3521948 --- /dev/null +++ b/plugin-server/src/worker/metrics.ts @@ -0,0 +1,8 @@ +import { Summary } from 'prom-client' + +export const pluginActionMsSummary = new Summary({ + name: 'plugin_action_ms', + help: 'Time to run plugin action', + labelNames: ['plugin_id', 'action', 'status'], + percentiles: [0.5, 0.9, 0.95, 0.99], +}) diff --git a/plugin-server/src/worker/plugins/run.ts b/plugin-server/src/worker/plugins/run.ts index 97341742e2711..4f0be132e0e06 100644 --- a/plugin-server/src/worker/plugins/run.ts +++ b/plugin-server/src/worker/plugins/run.ts @@ -1,20 +1,11 @@ import { PluginEvent, PostHogEvent, ProcessedPluginEvent, Webhook } from '@posthog/plugin-scaffold' -import * as Sentry from '@sentry/node' -import fetch from 'node-fetch' -import { Summary } from 'prom-client' import { Hub, PluginConfig, PluginTaskType, VMMethods } from '../../types' import { processError } from '../../utils/db/error' import { trackedFetch } from '../../utils/fetch' import { status } from '../../utils/status' -import { IllegalOperationError, sleep } from '../../utils/utils' - -const pluginActionMsSummary = new Summary({ - name: 'plugin_action_ms', - help: 'Time to run plugin action', - labelNames: ['plugin_id', 'action', 'status'], - percentiles: [0.5, 0.9, 0.95, 0.99], -}) +import { IllegalOperationError } from '../../utils/utils' +import { pluginActionMsSummary } from '../metrics' async function runSingleTeamPluginOnEvent( hub: Hub, @@ -73,80 +64,6 @@ export async function runOnEvent(hub: Hub, event: ProcessedPluginEvent): Promise ) } -const RUSTY_HOOK_BASE_DELAY_MS = 100 -const MAX_RUSTY_HOOK_DELAY_MS = 30_000 - -interface RustyWebhookPayload { - parameters: Webhook - metadata: { - team_id: number - plugin_id: number - plugin_config_id: number - } -} - -async function enqueueInRustyHook(hub: Hub, webhook: Webhook, pluginConfig: PluginConfig) { - webhook.method ??= 'POST' - webhook.headers ??= {} - - const rustyWebhookPayload: RustyWebhookPayload = { - parameters: webhook, - metadata: { - team_id: pluginConfig.team_id, - plugin_id: pluginConfig.plugin_id, - plugin_config_id: pluginConfig.id, - }, - } - const body = JSON.stringify(rustyWebhookPayload, undefined, 4) - - // We attempt to enqueue into the rusty-hook service until we succeed. This is deliberatly - // designed to block up the consumer if rusty-hook is down or if we deploy code that - // sends malformed requests. The entire purpose of rusty-hook is to reliably deliver webhooks, - // so we'd rather leave items in the Kafka topic until we manage to get them into rusty-hook. - let attempt = 0 - while (true) { - const timer = new Date() - try { - attempt += 1 - const response = await fetch(hub.RUSTY_HOOK_URL, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body, - - // Sure, it's not an external request, but we should have a timeout and this is as - // good as any. - timeout: hub.EXTERNAL_REQUEST_TIMEOUT_MS, - }) - - if (response.ok) { - // Success, exit the loop. - pluginActionMsSummary - .labels(pluginConfig.plugin_id.toString(), 'enqueueRustyHook', 'success') - .observe(new Date().getTime() - timer.getTime()) - - break - } - - // Throw to unify error handling below. - throw new Error(`rusty-hook returned ${response.status} ${response.statusText}: ${await response.text()}`) - } catch (error) { - pluginActionMsSummary - .labels(pluginConfig.plugin_id.toString(), 'enqueueRustyHook', 'error') - .observe(new Date().getTime() - timer.getTime()) - - const redactedWebhook = { - parameters: { ...rustyWebhookPayload.parameters, body: '' }, - metadata: rustyWebhookPayload.metadata, - } - status.error('🔴', 'Webhook enqueue to rusty-hook failed', { error, redactedWebhook, attempt }) - Sentry.captureException(error, { extra: { redactedWebhook } }) - } - - const delayMs = Math.min(2 ** (attempt - 1) * RUSTY_HOOK_BASE_DELAY_MS, MAX_RUSTY_HOOK_DELAY_MS) - await sleep(delayMs) - } -} - async function runSingleTeamPluginComposeWebhook( hub: Hub, event: PostHogEvent, @@ -175,8 +92,16 @@ async function runSingleTeamPluginComposeWebhook( return } - if (hub.rustyHookForTeams?.(event.team_id)) { - return await enqueueInRustyHook(hub, webhook, pluginConfig) + const enqueuedInRustyHook = await hub.rustyHook.enqueueIfEnabledForTeam({ + webhook, + teamId: event.team_id, + pluginId: pluginConfig.plugin_id, + pluginConfigId: pluginConfig.id, + }) + + if (enqueuedInRustyHook) { + // Rusty-Hook handles it from here, so we're done. + return } const request = await trackedFetch(webhook.url, { diff --git a/plugin-server/src/worker/rusty-hook.ts b/plugin-server/src/worker/rusty-hook.ts new file mode 100644 index 0000000000000..d71fae955db73 --- /dev/null +++ b/plugin-server/src/worker/rusty-hook.ts @@ -0,0 +1,114 @@ +import { Webhook } from '@posthog/plugin-scaffold' +import * as Sentry from '@sentry/node' +import fetch from 'node-fetch' + +import { ValueMatcher } from '../types' +import { isProdEnv } from '../utils/env-utils' +import { raiseIfUserProvidedUrlUnsafe } from '../utils/fetch' +import { status } from '../utils/status' +import { sleep } from '../utils/utils' +import { pluginActionMsSummary } from './metrics' + +const RUSTY_HOOK_BASE_DELAY_MS = 100 +const MAX_RUSTY_HOOK_DELAY_MS = 30_000 + +interface RustyWebhookPayload { + parameters: Webhook + metadata: { + team_id: number + plugin_id: number + plugin_config_id: number + } +} + +export class RustyHook { + constructor( + private enabledForTeams: ValueMatcher, + private serviceUrl: string, + private requestTimeoutMs: number + ) {} + + public async enqueueIfEnabledForTeam({ + webhook, + teamId, + pluginId, + pluginConfigId, + }: { + webhook: Webhook + teamId: number + pluginId: number + pluginConfigId: number + }): Promise { + if (!this.enabledForTeams(teamId)) { + return false + } + + webhook.method ??= 'POST' + webhook.headers ??= {} + + if (isProdEnv() && !process.env.NODE_ENV?.includes('functional-tests')) { + await raiseIfUserProvidedUrlUnsafe(webhook.url) + } + + const rustyWebhookPayload: RustyWebhookPayload = { + parameters: webhook, + metadata: { + team_id: teamId, + plugin_id: pluginId, + plugin_config_id: pluginConfigId, + }, + } + const body = JSON.stringify(rustyWebhookPayload, undefined, 4) + + // We attempt to enqueue into the rusty-hook service until we succeed. This is deliberatly + // designed to block up the consumer if rusty-hook is down or if we deploy code that + // sends malformed requests. The entire purpose of rusty-hook is to reliably deliver webhooks, + // so we'd rather leave items in the Kafka topic until we manage to get them into rusty-hook. + let attempt = 0 + while (true) { + const timer = new Date() + try { + attempt += 1 + const response = await fetch(this.serviceUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body, + + // Sure, it's not an external request, but we should have a timeout and this is as + // good as any. + timeout: this.requestTimeoutMs, + }) + + if (response.ok) { + // Success, exit the loop. + pluginActionMsSummary + .labels(pluginId.toString(), 'enqueueRustyHook', 'success') + .observe(new Date().getTime() - timer.getTime()) + + break + } + + // Throw to unify error handling below. + throw new Error( + `rusty-hook returned ${response.status} ${response.statusText}: ${await response.text()}` + ) + } catch (error) { + pluginActionMsSummary + .labels(pluginId.toString(), 'enqueueRustyHook', 'error') + .observe(new Date().getTime() - timer.getTime()) + + const redactedWebhook = { + parameters: { ...rustyWebhookPayload.parameters, body: '' }, + metadata: rustyWebhookPayload.metadata, + } + status.error('🔴', 'Webhook enqueue to rusty-hook failed', { error, redactedWebhook, attempt }) + Sentry.captureException(error, { extra: { redactedWebhook } }) + } + + const delayMs = Math.min(2 ** (attempt - 1) * RUSTY_HOOK_BASE_DELAY_MS, MAX_RUSTY_HOOK_DELAY_MS) + await sleep(delayMs) + } + + return true + } +} diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index 709262ed79aa6..a98490d00f4e9 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -209,7 +209,7 @@ describe('eachBatchX', () => { queue.pluginsServer.postgres, queue.pluginsServer.teamManager, queue.pluginsServer.organizationManager, - new Set(), + queue.pluginsServer.rustyHook, queue.pluginsServer.appMetrics, queue.pluginsServer.EXTERNAL_REQUEST_TIMEOUT_MS ) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts index f2603cee6fac3..10aba7b024bf6 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts @@ -52,6 +52,7 @@ describe('Event Pipeline integration test', () => { hub.db.postgres, hub.teamManager, hub.organizationManager, + hub.rustyHook, hub.appMetrics, hub.EXTERNAL_REQUEST_TIMEOUT_MS ) diff --git a/plugin-server/tests/worker/ingestion/hooks.test.ts b/plugin-server/tests/worker/ingestion/hooks.test.ts index 36886c297920d..e4161e0e81fe0 100644 --- a/plugin-server/tests/worker/ingestion/hooks.test.ts +++ b/plugin-server/tests/worker/ingestion/hooks.test.ts @@ -490,6 +490,7 @@ describe('hooks', () => { {} as any, {} as any, {} as any, + { enqueueIfEnabledForTeam: async () => Promise.resolve(false) }, { queueError: () => Promise.resolve(), queueMetric: () => Promise.resolve() } as unknown as AppMetrics, 20000 )