Skip to content

Commit

Permalink
feat(plugin-server): Use rusty-hook for non-plugin webhooks/resthooks (
Browse files Browse the repository at this point in the history
…#19749)

* Extract enqueueInRustyHook and metric to their own files

* Use raiseIfUserProvidedUrlUnsafe in enqueueInRustyHook, like trackedFetch does

* Extract rusty-hook bits to a single object, so its easier to pass around

* Weave RustyHook object into HookCommander

* Use rusty-hook for non-plugin webhooks/resthooks

* Fix imports and tests
  • Loading branch information
bretthoerner authored Jan 16, 2024
1 parent 9700a2b commit 49d6ebe
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Consumer, Kafka } from 'kafkajs'
import * as schedule from 'node-schedule'
import { AppMetrics } from 'worker/ingestion/app-metrics'
import { RustyHook } from 'worker/rusty-hook'

import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub, PluginsServerConfig } from '../../types'
Expand All @@ -23,7 +24,7 @@ export const startAsyncOnEventHandlerConsumer = async ({
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
and processes any onEvent plugin handlers configured for the team.
and processes any onEvent plugin handlers configured for the team.
At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
Expand All @@ -45,18 +46,20 @@ export const startAsyncWebhooksHandlerConsumer = async ({
teamManager,
organizationManager,
serverConfig,
rustyHook,
appMetrics,
}: {
kafka: Kafka
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
serverConfig: PluginsServerConfig
rustyHook: RustyHook
appMetrics: AppMetrics
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
and processes any onEvent plugin handlers configured for the team.
and processes any onEvent plugin handlers configured for the team.
At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
Expand All @@ -78,6 +81,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
postgres,
teamManager,
organizationManager,
rustyHook,
appMetrics,
serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS
)
Expand Down
11 changes: 10 additions & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Counter } from 'prom-client'
import v8Profiler from 'v8-profiler-next'

import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { buildIntegerMatcher, defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
Expand All @@ -24,6 +24,7 @@ import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { RustyHook } from '../worker/rusty-hook'
import { GraphileWorker } from './graphile-worker/graphile-worker'
import { loadPluginSchedule } from './graphile-worker/schedule'
import { startGraphileWorker } from './graphile-worker/worker-setup'
Expand Down Expand Up @@ -356,6 +357,13 @@ export async function startPluginsServer(
const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig)
const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager)
const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))
const rustyHook =
hub?.rustyHook ??
new RustyHook(
buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true),
serverConfig.RUSTY_HOOK_URL,
serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS
)
const appMetrics =
hub?.appMetrics ??
new AppMetrics(
Expand All @@ -371,6 +379,7 @@ export async function startPluginsServer(
teamManager: teamManager,
organizationManager: organizationManager,
serverConfig: serverConfig,
rustyHook: rustyHook,
appMetrics: appMetrics,
})

Expand Down
3 changes: 2 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Kafka } from 'kafkajs'
import { DateTime } from 'luxon'
import { Job } from 'node-schedule'
import { VM } from 'vm2'
import { RustyHook } from 'worker/rusty-hook'

import { ObjectStorage } from './main/services/object_storage'
import { DB } from './utils/db/db'
Expand Down Expand Up @@ -267,6 +268,7 @@ export interface Hub extends PluginsServerConfig {
rootAccessManager: RootAccessManager
eventsProcessor: EventsProcessor
appMetrics: AppMetrics
rustyHook: RustyHook
// geoip database, setup in workers
mmdb?: ReaderModel
// diagnostics
Expand All @@ -280,7 +282,6 @@ export interface Hub extends PluginsServerConfig {
pluginConfigsToSkipElementsParsing: ValueMatcher<number>
poeEmbraceJoinForTeams: ValueMatcher<number>
poeWritesExcludeTeams: ValueMatcher<number>
rustyHookForTeams: ValueMatcher<number>
// lookups
eventsToDropByToken: Map<string, string[]>
}
Expand Down
8 changes: 7 additions & 1 deletion plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { AppMetrics } from '../../worker/ingestion/app-metrics'
import { OrganizationManager } from '../../worker/ingestion/organization-manager'
import { EventsProcessor } from '../../worker/ingestion/process-event'
import { TeamManager } from '../../worker/ingestion/team-manager'
import { RustyHook } from '../../worker/rusty-hook'
import { isTestEnv } from '../env-utils'
import { status } from '../status'
import { createRedisPool, UUIDT } from '../utils'
Expand Down Expand Up @@ -141,6 +142,11 @@ export async function createHub(
const organizationManager = new OrganizationManager(postgres, teamManager)
const pluginsApiKeyManager = new PluginsApiKeyManager(db)
const rootAccessManager = new RootAccessManager(db)
const rustyHook = new RustyHook(
buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true),
serverConfig.RUSTY_HOOK_URL,
serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS
)

const enqueuePluginJob = async (job: EnqueuedPluginJob) => {
// NOTE: we use the producer directly here rather than using the wrapper
Expand Down Expand Up @@ -185,11 +191,11 @@ export async function createHub(
organizationManager,
pluginsApiKeyManager,
rootAccessManager,
rustyHook,
conversionBufferEnabledTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true),
poeWritesExcludeTeams: buildIntegerMatcher(process.env.POE_WRITES_EXCLUDE_TEAMS, false),
rustyHookForTeams: buildIntegerMatcher(process.env.RUSTY_HOOK_FOR_TEAMS, true),
eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID),
}

Expand Down
44 changes: 42 additions & 2 deletions plugin-server/src/worker/ingestion/hooks.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { captureException } from '@sentry/node'
import { Histogram } from 'prom-client'
import { format } from 'util'
import { RustyHook } from 'worker/rusty-hook'

import { Action, Hook, PostIngestionEvent, Team } from '../../types'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
Expand Down Expand Up @@ -254,6 +255,7 @@ export class HookCommander {
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
rustyHook: RustyHook
appMetrics: AppMetrics
siteUrl: string
/** Hook request timeout in ms. */
Expand All @@ -263,6 +265,7 @@ export class HookCommander {
postgres: PostgresRouter,
teamManager: TeamManager,
organizationManager: OrganizationManager,
rustyHook: RustyHook,
appMetrics: AppMetrics,
timeout: number
) {
Expand All @@ -275,6 +278,7 @@ export class HookCommander {
status.warn('⚠️', 'SITE_URL env is not set for webhooks')
this.siteUrl = ''
}
this.rustyHook = rustyHook
this.appMetrics = appMetrics
this.EXTERNAL_REQUEST_TIMEOUT = timeout
}
Expand Down Expand Up @@ -350,6 +354,24 @@ export class HookCommander {
const message = this.formatMessage(webhookUrl, action, event, team)
end()

const body = JSON.stringify(message, undefined, 4)
const enqueuedInRustyHook = await this.rustyHook.enqueueIfEnabledForTeam({
webhook: {
url: webhookUrl,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
},
teamId: event.teamId,
pluginId: -2, // -2 is hardcoded to mean webhooks
pluginConfigId: -2, // -2 is hardcoded to mean webhooks
})

if (enqueuedInRustyHook) {
// Rusty-Hook handles it from here, so we're done.
return
}

const slowWarningTimeout = this.EXTERNAL_REQUEST_TIMEOUT * 0.7
const timeout = setTimeout(() => {
status.warn(
Expand All @@ -363,7 +385,7 @@ export class HookCommander {
await instrumentWebhookStep('fetch', async () => {
const request = await trackedFetch(webhookUrl, {
method: 'POST',
body: JSON.stringify(message, undefined, 4),
body,
headers: { 'Content-Type': 'application/json' },
timeout: this.EXTERNAL_REQUEST_TIMEOUT,
})
Expand Down Expand Up @@ -425,6 +447,24 @@ export class HookCommander {
data: { ...data, person: sendablePerson },
}

const body = JSON.stringify(payload, undefined, 4)
const enqueuedInRustyHook = await this.rustyHook.enqueueIfEnabledForTeam({
webhook: {
url: hook.target,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
},
teamId: event.teamId,
pluginId: -1, // -1 is hardcoded to mean resthooks
pluginConfigId: -1, // -1 is hardcoded to mean resthooks
})

if (enqueuedInRustyHook) {
// Rusty-Hook handles it from here, so we're done.
return
}

const slowWarningTimeout = this.EXTERNAL_REQUEST_TIMEOUT * 0.7
const timeout = setTimeout(() => {
status.warn(
Expand All @@ -437,7 +477,7 @@ export class HookCommander {
try {
const request = await trackedFetch(hook.target, {
method: 'POST',
body: JSON.stringify(payload, undefined, 4),
body,
headers: { 'Content-Type': 'application/json' },
timeout: this.EXTERNAL_REQUEST_TIMEOUT,
})
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/src/worker/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Summary } from 'prom-client'

export const pluginActionMsSummary = new Summary({
name: 'plugin_action_ms',
help: 'Time to run plugin action',
labelNames: ['plugin_id', 'action', 'status'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})
99 changes: 12 additions & 87 deletions plugin-server/src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
import { PluginEvent, PostHogEvent, ProcessedPluginEvent, Webhook } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import fetch from 'node-fetch'
import { Summary } from 'prom-client'

import { Hub, PluginConfig, PluginTaskType, VMMethods } from '../../types'
import { processError } from '../../utils/db/error'
import { trackedFetch } from '../../utils/fetch'
import { status } from '../../utils/status'
import { IllegalOperationError, sleep } from '../../utils/utils'

const pluginActionMsSummary = new Summary({
name: 'plugin_action_ms',
help: 'Time to run plugin action',
labelNames: ['plugin_id', 'action', 'status'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})
import { IllegalOperationError } from '../../utils/utils'
import { pluginActionMsSummary } from '../metrics'

async function runSingleTeamPluginOnEvent(
hub: Hub,
Expand Down Expand Up @@ -73,80 +64,6 @@ export async function runOnEvent(hub: Hub, event: ProcessedPluginEvent): Promise
)
}

const RUSTY_HOOK_BASE_DELAY_MS = 100
const MAX_RUSTY_HOOK_DELAY_MS = 30_000

interface RustyWebhookPayload {
parameters: Webhook
metadata: {
team_id: number
plugin_id: number
plugin_config_id: number
}
}

async function enqueueInRustyHook(hub: Hub, webhook: Webhook, pluginConfig: PluginConfig) {
webhook.method ??= 'POST'
webhook.headers ??= {}

const rustyWebhookPayload: RustyWebhookPayload = {
parameters: webhook,
metadata: {
team_id: pluginConfig.team_id,
plugin_id: pluginConfig.plugin_id,
plugin_config_id: pluginConfig.id,
},
}
const body = JSON.stringify(rustyWebhookPayload, undefined, 4)

// We attempt to enqueue into the rusty-hook service until we succeed. This is deliberatly
// designed to block up the consumer if rusty-hook is down or if we deploy code that
// sends malformed requests. The entire purpose of rusty-hook is to reliably deliver webhooks,
// so we'd rather leave items in the Kafka topic until we manage to get them into rusty-hook.
let attempt = 0
while (true) {
const timer = new Date()
try {
attempt += 1
const response = await fetch(hub.RUSTY_HOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,

// Sure, it's not an external request, but we should have a timeout and this is as
// good as any.
timeout: hub.EXTERNAL_REQUEST_TIMEOUT_MS,
})

if (response.ok) {
// Success, exit the loop.
pluginActionMsSummary
.labels(pluginConfig.plugin_id.toString(), 'enqueueRustyHook', 'success')
.observe(new Date().getTime() - timer.getTime())

break
}

// Throw to unify error handling below.
throw new Error(`rusty-hook returned ${response.status} ${response.statusText}: ${await response.text()}`)
} catch (error) {
pluginActionMsSummary
.labels(pluginConfig.plugin_id.toString(), 'enqueueRustyHook', 'error')
.observe(new Date().getTime() - timer.getTime())

const redactedWebhook = {
parameters: { ...rustyWebhookPayload.parameters, body: '<redacted>' },
metadata: rustyWebhookPayload.metadata,
}
status.error('🔴', 'Webhook enqueue to rusty-hook failed', { error, redactedWebhook, attempt })
Sentry.captureException(error, { extra: { redactedWebhook } })
}

const delayMs = Math.min(2 ** (attempt - 1) * RUSTY_HOOK_BASE_DELAY_MS, MAX_RUSTY_HOOK_DELAY_MS)
await sleep(delayMs)
}
}

async function runSingleTeamPluginComposeWebhook(
hub: Hub,
event: PostHogEvent,
Expand Down Expand Up @@ -175,8 +92,16 @@ async function runSingleTeamPluginComposeWebhook(
return
}

if (hub.rustyHookForTeams?.(event.team_id)) {
return await enqueueInRustyHook(hub, webhook, pluginConfig)
const enqueuedInRustyHook = await hub.rustyHook.enqueueIfEnabledForTeam({
webhook,
teamId: event.team_id,
pluginId: pluginConfig.plugin_id,
pluginConfigId: pluginConfig.id,
})

if (enqueuedInRustyHook) {
// Rusty-Hook handles it from here, so we're done.
return
}

const request = await trackedFetch(webhook.url, {
Expand Down
Loading

0 comments on commit 49d6ebe

Please sign in to comment.