Skip to content

Commit

Permalink
feat: Remove exportEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 committed Dec 5, 2023
1 parent fc0bd60 commit 1ed7987
Show file tree
Hide file tree
Showing 25 changed files with 40 additions and 3,712 deletions.
122 changes: 10 additions & 112 deletions plugin-server/functional_tests/exports-v1.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { createServer, Server } from 'http'
import { UUIDT } from '../src/utils/utils'
import { capture, createAndReloadPluginConfig, createOrganization, createPlugin, createTeam } from './api'
import { waitForExpect } from './expectations'
import { produce } from './kafka'

let organizationId: string
let server: Server
Expand Down Expand Up @@ -43,10 +42,10 @@ test.concurrent(`exports: exporting events on ingestion`, async () => {
plugin_type: 'source',
is_global: false,
source__index_ts: `
export const exportEvents = async (events, { global, config }) => {
export const onEvent = async (event, { global, config }) => {
await fetch(
"http://localhost:${server.address()?.port}/${teamId}",
{method: "POST", body: JSON.stringify(events)}
{method: "POST", body: JSON.stringify(event)}
)
}
`,
Expand All @@ -67,14 +66,11 @@ test.concurrent(`exports: exporting events on ingestion`, async () => {
},
})

// Then check that the exportEvents function was called
// Then check that the onEvent function was called
await waitForExpect(
() => {
const exportEvents = webHookCalledWith[`/${teamId}`]
expect(exportEvents.length).toBeGreaterThan(0)
const exportedEvents = exportEvents[0]

expect(exportedEvents).toEqual([
const onEvents = webHookCalledWith[`/${teamId}`]
expect(onEvents).toEqual([
expect.objectContaining({
distinct_id: distinctId,
team_id: teamId,
Expand Down Expand Up @@ -102,10 +98,10 @@ test.concurrent(`exports: exporting $autocapture events on ingestion`, async ()
plugin_type: 'source',
is_global: false,
source__index_ts: `
export const exportEvents = async (events, { global, config }) => {
export const onEvent = async (event, { global, config }) => {
await fetch(
"http://localhost:${server.address()?.port}/${teamId}",
{method: "POST", body: JSON.stringify(events)}
{method: "POST", body: JSON.stringify(event)}
)
}
`,
Expand All @@ -128,13 +124,11 @@ test.concurrent(`exports: exporting $autocapture events on ingestion`, async ()
},
})

// Then check that the exportEvents function was called
// Then check that the onEvent function was called
await waitForExpect(
() => {
const exportEvents = webHookCalledWith[`/${teamId}`]
expect(exportEvents.length).toBeGreaterThan(0)
const exportedEvents = exportEvents[0]
expect(exportedEvents).toEqual([
const onEvents = webHookCalledWith[`/${teamId}`]
expect(onEvents).toEqual([
expect.objectContaining({
distinct_id: distinctId,
team_id: teamId,
Expand Down Expand Up @@ -163,99 +157,3 @@ test.concurrent(`exports: exporting $autocapture events on ingestion`, async ()
1_000
)
})

test.concurrent(`exports: historical exports`, async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()

const plugin = await createPlugin({
organization_id: organizationId,
name: 'export plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export const exportEvents = async (events, { global, config }) => {
await fetch(
"http://localhost:${server.address()?.port}/${teamId}",
{method: "POST", body: JSON.stringify(events)}
)
}
`,
})
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)

// First let's capture an event and wait for it to be ingested so
// so we can check that the historical event is the same as the one
// passed to processEvent on initial ingestion.
await capture({
teamId,
distinctId,
uuid,
event: '$autocapture',
properties: {
name: 'hehe',
uuid: new UUIDT().toString(),
$elements: [{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' }],
},
})

// Then check that the exportEvents function was called
const [exportedEvent] = await waitForExpect(
() => {
const exportEvents = webHookCalledWith[`/${teamId}`]
expect(exportEvents.length).toBeGreaterThan(0)
return exportEvents[0]
},
60_000,
1_000
)

// NOTE: the frontend doesn't actually push to this queue but rather
// adds directly to PostgreSQL using the graphile-worker stored
// procedure `add_job`. I'd rather keep these tests graphile
// unaware.
await produce({
topic: 'jobs',
message: Buffer.from(
JSON.stringify({
type: 'Export historical events',
pluginConfigId: pluginConfig.id,
pluginConfigTeam: teamId,
payload: {
dateFrom: new Date(Date.now() - 60000).toISOString(),
dateTo: new Date(Date.now()).toISOString(),
},
})
),
key: teamId.toString(),
})

// Then check that the exportEvents function was called with the
// same data that was used with the non-historical export, with the
// additions of details related to the historical export.
await waitForExpect(
() => {
const historicallyExportedEvents = webHookCalledWith[`/${teamId}`].filter((events) =>
events.some((event) => event.properties['$$is_historical_export_event'])
)
expect(historicallyExportedEvents.length).toBeGreaterThan(0)

const historicallyExportedEvent = historicallyExportedEvents[0]
expect(historicallyExportedEvent).toEqual([
expect.objectContaining({
...exportedEvent,
ip: '', // NOTE: for some reason this is "" when exported historically, but null otherwise.
properties: {
...exportedEvent.properties,
$$is_historical_export_event: true,
$$historical_export_timestamp: expect.any(String),
$$historical_export_source_db: 'clickhouse',
},
}),
])
},
60_000,
1_000
)
})
4 changes: 0 additions & 4 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ export function getDefaultConfig(): PluginsServerConfig {
PLUGIN_SERVER_MODE: null,
PLUGIN_LOAD_SEQUENTIALLY: false,
KAFKAJS_LOG_LEVEL: 'WARN',
HISTORICAL_EXPORTS_ENABLED: true,
HISTORICAL_EXPORTS_MAX_RETRY_COUNT: 15,
HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW: 10 * 60 * 1000,
HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: 1.5,
APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false,
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0,
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ export const startAsyncOnEventHandlerConsumer = async ({
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
and processes any onEvent plugin handlers configured for the team. This
also includes `exportEvents` handlers defined in plugins as these are
also handled via modifying `onEvent` to call `exportEvents`.
and processes any onEvent plugin handlers configured for the team.
At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
Expand Down Expand Up @@ -61,9 +59,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
and processes any onEvent plugin handlers configured for the team. This
also includes `exportEvents` handlers defined in plugins as these are
also handled via modifying `onEvent` to call `exportEvents`.
and processes any onEvent plugin handlers configured for the team.
At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
Expand Down
12 changes: 0 additions & 12 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { ReaderModel } from '@maxmind/geoip2-node'
import ClickHouse from '@posthog/clickhouse'
import {
Element,
Meta,
PluginAttachment,
PluginConfigSchema,
PluginEvent,
Expand Down Expand Up @@ -193,10 +192,6 @@ export interface PluginsServerConfig {
PLUGIN_SERVER_MODE: PluginServerMode | null
PLUGIN_LOAD_SEQUENTIALLY: boolean // could help with reducing memory usage spikes on startup
KAFKAJS_LOG_LEVEL: 'NOTHING' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR'
HISTORICAL_EXPORTS_ENABLED: boolean // enables historical exports for export apps
HISTORICAL_EXPORTS_MAX_RETRY_COUNT: number
HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW: number
HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: number
APP_METRICS_GATHERED_FOR_ALL: boolean // whether to gather app metrics for all teams
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: number
USE_KAFKA_FOR_SCHEDULED_TASKS: boolean // distribute scheduled tasks across the scheduler workers
Expand Down Expand Up @@ -490,7 +485,6 @@ export type VMMethods = {
teardownPlugin?: () => Promise<void>
getSettings?: () => PluginSettings
onEvent?: (event: ProcessedPluginEvent) => Promise<void>
exportEvents?: (events: PluginEvent[]) => Promise<void>
composeWebhook?: (event: PostHogEvent) => Webhook | null
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
}
Expand Down Expand Up @@ -526,12 +520,6 @@ export interface PluginConfigVMResponse {
usedImports: Set<string>
}

export interface PluginConfigVMInternalResponse<M extends Meta = Meta> {
methods: VMMethods
tasks: Record<PluginTaskType, Record<string, PluginTask>>
meta: M
}

export interface EventUsage {
event: string
usage_count: number | null
Expand Down
19 changes: 17 additions & 2 deletions plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin
import { DateTime } from 'luxon'
import { Message } from 'node-rdkafka'

import { ClickHouseEvent, PipelineEvent, PostIngestionEvent, RawClickHouseEvent } from '../types'
import { convertDatabaseElementsToRawElements } from '../worker/vm/upgrades/utils/fetchEventsForInterval'
import { ClickHouseEvent, Element, PipelineEvent, PostIngestionEvent, RawClickHouseEvent } from '../types'
import { chainToElements } from './db/elements-chain'
import { personInitialAndUTMProperties } from './db/utils'
import {
Expand All @@ -12,6 +11,22 @@ import {
clickHouseTimestampToISO,
} from './utils'

interface RawElement extends Element {
$el_text?: string
}

const convertDatabaseElementsToRawElements = (elements: RawElement[]): RawElement[] => {
for (const element of elements) {
if (element.attributes && element.attributes.attr__class) {
element.attr_class = element.attributes.attr__class
}
if (element.text) {
element.$el_text = element.text
}
}
return elements
}

export function convertToProcessedPluginEvent(event: PostIngestionEvent): ProcessedPluginEvent {
return {
distinct_id: event.distinctId,
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface AppMetricIdentifier {
pluginConfigId: number
jobId?: string
// Keep in sync with posthog/queries/app_metrics/serializers.py
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook' | 'composeWebhook'
category: 'processEvent' | 'onEvent' | 'scheduledTask' | 'webhook' | 'composeWebhook'
}

export interface AppMetric extends AppMetricIdentifier {
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/plugins/loadPluginsFromDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export async function loadPluginsFromDB(
let method = undefined
if (plugin.capabilities?.methods) {
const methods = plugin.capabilities.methods
if (methods?.some((method) => [PluginMethod.onEvent.toString(), 'exportEvents'].includes(method))) {
if (methods?.some((method) => [PluginMethod.onEvent.toString()].includes(method))) {
method = PluginMethod.onEvent
} else if (methods?.some((method) => [PluginMethod.composeWebhook.toString()].includes(method))) {
method = PluginMethod.composeWebhook
Expand Down
4 changes: 1 addition & 3 deletions plugin-server/src/worker/vm/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ function shouldSetupPlugin(serverCapability: keyof PluginServerCapabilities, plu
return (pluginCapabilities.jobs || []).length > 0
}
if (serverCapability === 'processAsyncOnEventHandlers') {
return pluginCapabilities.methods?.some((method) =>
['onEvent', 'exportEvents', 'composeWebhook'].includes(method)
)
return pluginCapabilities.methods?.some((method) => ['onEvent', 'composeWebhook'].includes(method))
}

return false
Expand Down
4 changes: 0 additions & 4 deletions plugin-server/src/worker/vm/lazy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ export class LazyPluginVM {
this.initVm()
}

public async getExportEvents(): Promise<PluginConfigVMResponse['methods']['exportEvents'] | null> {
return await this.getVmMethod('exportEvents')
}

public async getOnEvent(): Promise<PluginConfigVMResponse['methods']['onEvent'] | null> {
return await this.getVmMethod('onEvent')
}
Expand Down
Loading

0 comments on commit 1ed7987

Please sign in to comment.