Skip to content

Commit

Permalink
feat: p-s to support composeWebhook (#18465)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 authored Nov 9, 2023
1 parent 1c63966 commit e3298f8
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 47 deletions.
34 changes: 21 additions & 13 deletions plugin-server/functional_tests/plugins.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,23 +389,31 @@ test.concurrent(
properties: properties,
}

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })

await waitForExpect(async () => {
// We might have not completed the setup properly in time, so to avoid flaky tests, we'll
// try sending messages and checking the last log message until we get the expected result.
await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })
const logEntries = await fetchPluginConsoleLogEntries(pluginConfig.id)
const onEvent = logEntries.filter(({ message: [method] }) => method === 'onEvent')
expect(onEvent.length).toBeGreaterThan(0)

const onEventEvent = onEvent[0].message[1]
expect(onEventEvent.elements).toEqual([
const lastLogEntry = onEvent.length > 0 ? onEvent[onEvent.length - 1] : null
expect(lastLogEntry).toEqual(
expect.objectContaining({
attributes: {},
nth_child: 1,
nth_of_type: 2,
tag_name: 'div',
text: '💻',
}),
])
message: [
'onEvent',
expect.objectContaining({
elements: [
expect.objectContaining({
attributes: {},
nth_child: 1,
nth_of_type: 2,
tag_name: 'div',
text: '💻',
}),
],
}),
],
})
)
})
},
20000
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import * as Sentry from '@sentry/node'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'

import { RawClickHouseEvent } from '../../../types'
import { convertToIngestionEvent } from '../../../utils/event'
import { PluginConfig, PluginMethod, RawClickHouseEvent } from '../../../types'
import { convertToIngestionEvent, convertToPostHogEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { processOnEventStep } from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep'
import {
processComposeWebhookStep,
processOnEventStep,
} from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { runInstrumentedFunction } from '../../utils'
import { KafkaJSIngestionConsumer } from '../kafka-queue'
import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
Expand All @@ -13,28 +16,57 @@ import { eachBatchHandlerHelper } from './each-batch-webhooks'
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')

export async function handleOnEventPlugins(
pluginConfigs: PluginConfig[],
clickHouseEvent: RawClickHouseEvent,
queue: KafkaJSIngestionConsumer
): Promise<void> {
// Elements parsing can be extremely slow, so we skip it for some plugins
// # SKIP_ELEMENTS_PARSING_PLUGINS
const skipElementsChain = pluginConfigs.every((pluginConfig) =>
queue.pluginsServer.pluginConfigsToSkipElementsParsing?.(pluginConfig.plugin_id)
)

const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain)
await runInstrumentedFunction({
func: () => processOnEventStep(queue.pluginsServer, event),
statsKey: `kafka_queue.process_async_handlers_on_event`,
timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline',
timeoutContext: () => ({
event: JSON.stringify(event),
}),
teamId: event.teamId,
})
}

export async function handleComposeWebhookPlugins(
clickHouseEvent: RawClickHouseEvent,
queue: KafkaJSIngestionConsumer
): Promise<void> {
const event = convertToPostHogEvent(clickHouseEvent)
await runInstrumentedFunction({
func: () => processComposeWebhookStep(queue.pluginsServer, event),
statsKey: `kafka_queue.process_async_handlers_on_event`,
timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline',
timeoutContext: () => ({
event: JSON.stringify(event),
}),
teamId: event.team_id,
})
}

export async function eachMessageAppsOnEventHandlers(
clickHouseEvent: RawClickHouseEvent,
queue: KafkaJSIngestionConsumer
): Promise<void> {
const pluginConfigs = queue.pluginsServer.pluginConfigsPerTeam.get(clickHouseEvent.team_id)
if (pluginConfigs) {
// Elements parsing can be extremely slow, so we skip it for some plugins
// # SKIP_ELEMENTS_PARSING_PLUGINS
const skipElementsChain = pluginConfigs.every((pluginConfig) =>
queue.pluginsServer.pluginConfigsToSkipElementsParsing?.(pluginConfig.plugin_id)
)

const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain)
await runInstrumentedFunction({
func: () => processOnEventStep(queue.pluginsServer, event),
statsKey: `kafka_queue.process_async_handlers_on_event`,
timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline',
timeoutContext: () => ({
event: JSON.stringify(event),
}),
teamId: event.teamId,
})
// Split between onEvent and composeWebhook plugins
const onEventPlugins = pluginConfigs.filter((pluginConfig) => pluginConfig.method === PluginMethod.onEvent)
await Promise.all([
handleOnEventPlugins(onEventPlugins, clickHouseEvent, queue),
handleComposeWebhookPlugins(clickHouseEvent, queue),
])
} else {
eventDroppedCounter
.labels({
Expand Down
14 changes: 13 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import {
PluginConfigSchema,
PluginEvent,
PluginSettings,
PostHogEvent,
ProcessedPluginEvent,
Properties,
Webhook,
} from '@posthog/plugin-scaffold'
import { Pool as GenericPool } from 'generic-pool'
import { StatsD } from 'hot-shots'
Expand Down Expand Up @@ -381,6 +383,11 @@ export interface PluginCapabilities {
methods?: string[]
}

export enum PluginMethod {
onEvent = 'onEvent',
composeWebhook = 'composeWebhook',
}

export interface PluginConfig {
id: number
team_id: TeamId
Expand All @@ -394,6 +401,10 @@ export interface PluginConfig {
vm?: LazyPluginVM | null
created_at: string
updated_at?: string
// We're migrating to a new functions that take PostHogEvent instead of PluginEvent
// we'll need to know which method this plugin is using to call it the right way
// undefined for old plugins with multiple or deprecated methods
method?: PluginMethod
}

export interface PluginJsonConfig {
Expand All @@ -410,7 +421,7 @@ export interface PluginError {
time: string
name?: string
stack?: string
event?: PluginEvent | ProcessedPluginEvent | null
event?: PluginEvent | ProcessedPluginEvent | PostHogEvent | null
}

export interface PluginAttachmentDB {
Expand Down Expand Up @@ -476,6 +487,7 @@ export type VMMethods = {
getSettings?: () => PluginSettings
onEvent?: (event: ProcessedPluginEvent) => Promise<void>
exportEvents?: (events: PluginEvent[]) => Promise<void>
composeWebhook?: (event: PostHogEvent) => Webhook | null
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
}

Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/utils/db/error.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { captureException } from '@sentry/node'

import { Hub, PluginConfig, PluginError } from '../../types'
Expand Down Expand Up @@ -30,7 +30,7 @@ export async function processError(
server: Hub,
pluginConfig: PluginConfig | null,
error: Error | string,
event?: PluginEvent | ProcessedPluginEvent | null
event?: PluginEvent | ProcessedPluginEvent | PostHogEvent | null
): Promise<void> {
if (!pluginConfig) {
captureException(new Error('Tried to process error for nonexistent plugin config!'), {
Expand Down
14 changes: 13 additions & 1 deletion plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { Message } from 'node-rdkafka'

Expand Down Expand Up @@ -61,6 +61,18 @@ export function parseRawClickHouseEvent(rawEvent: RawClickHouseEvent): ClickHous
: null,
}
}
export function convertToPostHogEvent(event: RawClickHouseEvent): PostHogEvent {
const properties = event.properties ? JSON.parse(event.properties) : {}
properties['$elements_chain'] = event.elements_chain // TODO: tests
return {
uuid: event.uuid,
event: event.event!,
team_id: event.team_id,
distinct_id: event.distinct_id,
properties,
timestamp: new Date(clickHouseTimestampToISO(event.timestamp)),
}
}

export function convertToIngestionEvent(event: RawClickHouseEvent, skipElementsChain = false): PostIngestionEvent {
const properties = event.properties ? JSON.parse(event.properties) : {}
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface AppMetricIdentifier {
pluginConfigId: number
jobId?: string
// Keep in sync with posthog/queries/app_metrics/serializers.py
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook'
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook' | 'composeWebhook'
}

export interface AppMetric extends AppMetricIdentifier {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { PostHogEvent } from '@posthog/plugin-scaffold'

import { runInstrumentedFunction } from '../../../main/utils'
import { Hub, PostIngestionEvent } from '../../../types'
import { convertToProcessedPluginEvent } from '../../../utils/event'
import { runOnEvent } from '../../plugins/run'
import { runComposeWebhook, runOnEvent } from '../../plugins/run'
import { ActionMatcher } from '../action-matcher'
import { HookCommander, instrumentWebhookStep } from '../hooks'

Expand All @@ -21,6 +23,20 @@ export async function processOnEventStep(hub: Hub, event: PostIngestionEvent) {
return null
}

export async function processComposeWebhookStep(hub: Hub, event: PostHogEvent) {
await runInstrumentedFunction({
timeoutContext: () => ({
team_id: event.team_id,
event_uuid: event.uuid,
}),
func: () => runComposeWebhook(hub, event),
statsKey: `kafka_queue.single_compose_webhook`,
timeoutMessage: `After 30 seconds still running composeWebhook`,
teamId: event.team_id,
})
return null
}

export async function processWebhooksStep(
event: PostIngestionEvent,
actionMatcher: ActionMatcher,
Expand Down
12 changes: 11 additions & 1 deletion plugin-server/src/worker/plugins/loadPluginsFromDB.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PluginAttachment } from '@posthog/plugin-scaffold'

import { Hub, Plugin, PluginConfig, PluginConfigId, PluginId, TeamId } from '../../types'
import { Hub, Plugin, PluginConfig, PluginConfigId, PluginId, PluginMethod, TeamId } from '../../types'
import { getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from '../../utils/db/sql'

export async function loadPluginsFromDB(
Expand Down Expand Up @@ -43,11 +43,21 @@ export async function loadPluginsFromDB(
if (!plugin) {
continue
}
let method = undefined
if (plugin.capabilities?.methods) {
const methods = plugin.capabilities.methods
if (methods?.some((method) => [PluginMethod.onEvent.toString(), 'exportEvents'].includes(method))) {
method = PluginMethod.onEvent
} else if (methods?.some((method) => [PluginMethod.composeWebhook.toString()].includes(method))) {
method = PluginMethod.composeWebhook
}
}
const pluginConfig: PluginConfig = {
...row,
plugin: plugin,
attachments: attachmentsPerConfig.get(row.id) || {},
vm: null,
method,
}
pluginConfigs.set(row.id, pluginConfig)

Expand Down
Loading

0 comments on commit e3298f8

Please sign in to comment.