Skip to content

Commit

Permalink
chore: Remove runner abstraction from onEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 committed Sep 28, 2023
1 parent 51aebe1 commit edc0f71
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as Sentry from '@sentry/node'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { processOnEventStep } from 'worker/ingestion/event-pipeline/runAsyncHandlersStep'

import { RawClickHouseEvent } from '../../../types'
import { convertToIngestionEvent } from '../../../utils/event'
Expand Down Expand Up @@ -28,7 +29,7 @@ export async function eachMessageAppsOnEventHandlers(

const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain)
await runInstrumentedFunction({
func: () => queue.workerMethods.runAppsOnEventPipeline(event),
func: () => processOnEventStep(queue.pluginsServer, event),
statsKey: `kafka_queue.process_async_handlers_on_event`,
timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline',
timeoutContext: () => ({
Expand Down
12 changes: 1 addition & 11 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Counter } from 'prom-client'

import { BatchConsumer, startBatchConsumer } from '../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../kafka/config'
import { Hub, PipelineEvent, PostIngestionEvent, WorkerMethods } from '../../types'
import { Hub, PipelineEvent, WorkerMethods } from '../../types'
import { KafkaConfig } from '../../utils/db/hub'
import { timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
Expand Down Expand Up @@ -59,11 +59,6 @@ export class KafkaJSIngestionConsumer {
// references to queue.workerMethods buried deep in the codebase
// #onestepatatime
this.workerMethods = {
runAppsOnEventPipeline: (event: PostIngestionEvent) => {
this.pluginsServer.lastActivity = new Date().valueOf()
this.pluginsServer.lastActivityType = 'runAppsOnEventPipeline'
return piscina.run({ task: 'runAppsOnEventPipeline', args: { event } })
},
runEventPipeline: (event: PipelineEvent) => {
this.pluginsServer.lastActivity = new Date().valueOf()
this.pluginsServer.lastActivityType = 'runEventPipeline'
Expand Down Expand Up @@ -226,11 +221,6 @@ export class IngestionConsumer {
// references to queue.workerMethods buried deep in the codebase
// #onestepatatime
this.workerMethods = {
runAppsOnEventPipeline: (event: PostIngestionEvent) => {
this.pluginsServer.lastActivity = new Date().valueOf()
this.pluginsServer.lastActivityType = 'runAppsOnEventPipeline'
return piscina.run({ task: 'runAppsOnEventPipeline', args: { event } })
},
runEventPipeline: (event: PipelineEvent) => {
this.pluginsServer.lastActivity = new Date().valueOf()
this.pluginsServer.lastActivityType = 'runEventPipeline'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import { runInstrumentedFunction } from '../../../main/utils'
import { PostIngestionEvent } from '../../../types'
import { Hub, PostIngestionEvent } from '../../../types'
import { convertToProcessedPluginEvent } from '../../../utils/event'
import { runOnEvent } from '../../plugins/run'
import { ActionMatcher } from '../action-matcher'
import { HookCommander, instrumentWebhookStep } from '../hooks'
import { EventPipelineRunner } from './runner'

export async function processOnEventStep(runner: EventPipelineRunner, event: PostIngestionEvent) {
export async function processOnEventStep(hub: Hub, event: PostIngestionEvent) {
const processedPluginEvent = convertToProcessedPluginEvent(event)

await runInstrumentedFunction({
timeoutContext: () => ({
team_id: event.teamId,
event_uuid: event.eventUuid,
}),
func: () => runOnEvent(runner.hub, processedPluginEvent),
func: () => runOnEvent(hub, processedPluginEvent),
statsKey: `kafka_queue.single_on_event`,
timeoutMessage: `After 30 seconds still running onEvent`,
teamId: event.teamId,
Expand Down
23 changes: 1 addition & 22 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Counter } from 'prom-client'

import { eventDroppedCounter } from '../../../main/ingestion-queues/metrics'
import { runInSpan } from '../../../sentry'
import { Hub, PipelineEvent, PostIngestionEvent } from '../../../types'
import { Hub, PipelineEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { timeoutGuard } from '../../../utils/db/utils'
import { stringToBoolean } from '../../../utils/env-utils'
Expand All @@ -15,7 +15,6 @@ import { pluginsProcessEventStep } from './pluginsProcessEventStep'
import { populateTeamDataStep } from './populateTeamDataStep'
import { prepareEventStep } from './prepareEventStep'
import { processPersonsStep } from './processPersonsStep'
import { processOnEventStep } from './runAsyncHandlersStep'

export const silentFailuresAsyncHandlers = new Counter({
name: 'async_handlers_silent_failure',
Expand Down Expand Up @@ -166,26 +165,6 @@ export class EventPipelineRunner {
}
}

async runAppsOnEventPipeline(event: PostIngestionEvent): Promise<EventPipelineResult> {
try {
this.hub.statsd?.increment('kafka_queue.event_pipeline.start', { pipeline: 'onEvent' })
await this.runStep(processOnEventStep, [this, event], event.teamId, false)
this.hub.statsd?.increment('kafka_queue.onevent.processed')
return this.registerLastStep('processOnEventStep', event.teamId, [event])
} catch (error) {
if (error instanceof DependencyUnavailableError) {
// If this is an error with a dependency that we control, we want to
// ensure that the caller knows that the event was not processed,
// for a reason that we control and that is transient.
throw error
}

silentFailuresAsyncHandlers.inc()

return { lastStep: error.step, args: [], error: error.message }
}
}

registerLastStep(
stepName: string,
teamId: number | null,
Expand Down
7 changes: 1 addition & 6 deletions plugin-server/src/worker/tasks.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType, PostIngestionEvent } from '../types'
import { convertToProcessedPluginEvent } from '../utils/event'
import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType } from '../types'
import { EventPipelineRunner } from './ingestion/event-pipeline/runner'
import { loadSchedule } from './plugins/loadSchedule'
import { runPluginTask, runProcessEvent } from './plugins/run'
Expand Down Expand Up @@ -33,10 +32,6 @@ export const workerTasks: Record<string, TaskRunner> = {
const runner = new EventPipelineRunner(hub, args.event)
return await runner.runEventPipeline(args.event)
},
runAppsOnEventPipeline: async (hub, args: { event: PostIngestionEvent }) => {
const runner = new EventPipelineRunner(hub, convertToProcessedPluginEvent(args.event))
return await runner.runAppsOnEventPipeline(args.event)
},
reloadPlugins: async (hub) => {
await setupPlugins(hub)
},
Expand Down
6 changes: 1 addition & 5 deletions plugin-server/src/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ export const createTaskRunner =
return response
},
(transactionDuration: number) => {
if (
task === 'runEventPipeline' ||
task === 'runWebhooksHandlersEventPipeline' ||
task === 'runAppsOnEventPipeline'
) {
if (task === 'runEventPipeline') {
return transactionDuration > 0.5 ? 1 : 0.01
} else {
return 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
db: 'database',
},
workerMethods: {
runAppsOnEventPipeline: jest.fn(),
runWebhooksHandlersEventPipeline: jest.fn(),
runEventPipeline: jest.fn(() => Promise.resolve({})),
},
}
Expand Down Expand Up @@ -174,8 +172,6 @@ describe('eachBatchLegacyIngestion with overflow reroute', () => {
db: 'database',
},
workerMethods: {
runAppsOnEventPipeline: jest.fn(),
runWebhooksHandlersEventPipeline: jest.fn(),
runEventPipeline: jest.fn(() => Promise.resolve({})),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
db: 'database',
},
workerMethods: {
runAppsOnEventPipeline: jest.fn(),
runWebhooksHandlersEventPipeline: jest.fn(),
runEventPipeline: jest.fn(() => Promise.resolve({})),
},
}
Expand Down Expand Up @@ -150,8 +148,6 @@ describe('eachBatchLegacyIngestion with overflow consume', () => {
db: 'database',
},
workerMethods: {
runAppsOnEventPipeline: jest.fn(),
runWebhooksHandlersEventPipeline: jest.fn(),
runEventPipeline: jest.fn(() => Promise.resolve({})),
},
}
Expand Down
3 changes: 1 addition & 2 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ describe('eachBatchX', () => {
pluginConfigsPerTeam: new Map(),
},
workerMethods: {
runAppsOnEventPipeline: jest.fn(),
runWebhooksHandlersEventPipeline: jest.fn(),
runEventPipeline: jest.fn(() => Promise.resolve({})),
},
}
Expand All @@ -150,6 +148,7 @@ describe('eachBatchX', () => {
it('calls runAppsOnEventPipeline when useful', async () => {
queue.pluginsServer.pluginConfigsPerTeam.set(2, [pluginConfig39])
await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue)
// TODO fix to jest spy on the actual function
expect(queue.workerMethods.runAppsOnEventPipeline).toHaveBeenCalledWith({
...event,
properties: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import { convertToIngestionEvent } from '../../../../src/utils/event'
import { UUIDT } from '../../../../src/utils/utils'
import { ActionManager } from '../../../../src/worker/ingestion/action-manager'
import { ActionMatcher } from '../../../../src/worker/ingestion/action-matcher'
import { processWebhooksStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep'
import {
processOnEventStep,
processWebhooksStep,
} from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner'
import { HookCommander } from '../../../../src/worker/ingestion/hooks'
import { setupPlugins } from '../../../../src/worker/plugins/setup'
Expand All @@ -31,7 +34,7 @@ describe('Event Pipeline integration test', () => {
const result = await runner.runEventPipeline(event)
const postIngestionEvent = convertToIngestionEvent(result.args[0])
return Promise.all([
runner.runAppsOnEventPipeline(postIngestionEvent),
processOnEventStep(runner.hub, postIngestionEvent),
processWebhooksStep(postIngestionEvent, actionMatcher, hookCannon),
])
}
Expand Down
14 changes: 0 additions & 14 deletions plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,18 +242,4 @@ describe('EventPipelineRunner', () => {
})
})
})

describe('runAppsOnEventPipeline()', () => {
it('runs remaining steps', async () => {
jest.mocked(hub.db.fetchPerson).mockResolvedValue('testPerson')

await runner.runAppsOnEventPipeline({
...preIngestionEvent,
person_properties: {},
person_created_at: '2020-02-23T02:11:00.000Z' as ISOTimestamp,
})

expect(runner.steps).toEqual(['processOnEventStep'])
})
})
})

0 comments on commit edc0f71

Please sign in to comment.