diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts index e7d52d3a26bbe..1c0f42bd6a0ac 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts @@ -1,5 +1,6 @@ import * as Sentry from '@sentry/node' import { EachBatchPayload, KafkaMessage } from 'kafkajs' +import { processOnEventStep } from 'worker/ingestion/event-pipeline/runAsyncHandlersStep' import { RawClickHouseEvent } from '../../../types' import { convertToIngestionEvent } from '../../../utils/event' @@ -28,7 +29,7 @@ export async function eachMessageAppsOnEventHandlers( const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain) await runInstrumentedFunction({ - func: () => queue.workerMethods.runAppsOnEventPipeline(event), + func: () => processOnEventStep(queue.pluginsServer, event), statsKey: `kafka_queue.process_async_handlers_on_event`, timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline', timeoutContext: () => ({ diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 7989efd4b356a..d6f1d2792ec16 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -6,7 +6,7 @@ import { Counter } from 'prom-client' import { BatchConsumer, startBatchConsumer } from '../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../kafka/config' -import { Hub, PipelineEvent, PostIngestionEvent, WorkerMethods } from '../../types' +import { Hub, PipelineEvent, WorkerMethods } from '../../types' import { KafkaConfig } from '../../utils/db/hub' import { timeoutGuard } from '../../utils/db/utils' import { status } from '../../utils/status' @@ -59,11 +59,6 @@ export class KafkaJSIngestionConsumer { // references to queue.workerMethods buried deep in the codebase // #onestepatatime this.workerMethods = { - runAppsOnEventPipeline: (event: PostIngestionEvent) => { - this.pluginsServer.lastActivity = new Date().valueOf() - this.pluginsServer.lastActivityType = 'runAppsOnEventPipeline' - return piscina.run({ task: 'runAppsOnEventPipeline', args: { event } }) - }, runEventPipeline: (event: PipelineEvent) => { this.pluginsServer.lastActivity = new Date().valueOf() this.pluginsServer.lastActivityType = 'runEventPipeline' @@ -226,11 +221,6 @@ export class IngestionConsumer { // references to queue.workerMethods buried deep in the codebase // #onestepatatime this.workerMethods = { - runAppsOnEventPipeline: (event: PostIngestionEvent) => { - this.pluginsServer.lastActivity = new Date().valueOf() - this.pluginsServer.lastActivityType = 'runAppsOnEventPipeline' - return piscina.run({ task: 'runAppsOnEventPipeline', args: { event } }) - }, runEventPipeline: (event: PipelineEvent) => { this.pluginsServer.lastActivity = new Date().valueOf() this.pluginsServer.lastActivityType = 'runEventPipeline' diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts index 83821e720c0dd..4fcc4d1bd2a49 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts @@ -1,12 +1,11 @@ import { runInstrumentedFunction } from '../../../main/utils' -import { PostIngestionEvent } from '../../../types' +import { Hub, PostIngestionEvent } from '../../../types' import { convertToProcessedPluginEvent } from '../../../utils/event' import { runOnEvent } from '../../plugins/run' import { ActionMatcher } from '../action-matcher' import { HookCommander, instrumentWebhookStep } from '../hooks' -import { EventPipelineRunner } from './runner' -export async function processOnEventStep(runner: EventPipelineRunner, event: PostIngestionEvent) { +export async function processOnEventStep(hub: Hub, event: PostIngestionEvent) { const processedPluginEvent = convertToProcessedPluginEvent(event) await runInstrumentedFunction({ @@ -14,7 +13,7 @@ export async function processOnEventStep(runner: EventPipelineRunner, event: Pos team_id: event.teamId, event_uuid: event.eventUuid, }), - func: () => runOnEvent(runner.hub, processedPluginEvent), + func: () => runOnEvent(hub, processedPluginEvent), statsKey: `kafka_queue.single_on_event`, timeoutMessage: `After 30 seconds still running onEvent`, teamId: event.teamId, diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 5c9caa4aacafa..a72ce7579ddec 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -4,7 +4,7 @@ import { Counter } from 'prom-client' import { eventDroppedCounter } from '../../../main/ingestion-queues/metrics' import { runInSpan } from '../../../sentry' -import { Hub, PipelineEvent, PostIngestionEvent } from '../../../types' +import { Hub, PipelineEvent } from '../../../types' import { DependencyUnavailableError } from '../../../utils/db/error' import { timeoutGuard } from '../../../utils/db/utils' import { stringToBoolean } from '../../../utils/env-utils' @@ -15,7 +15,6 @@ import { pluginsProcessEventStep } from './pluginsProcessEventStep' import { populateTeamDataStep } from './populateTeamDataStep' import { prepareEventStep } from './prepareEventStep' import { processPersonsStep } from './processPersonsStep' -import { processOnEventStep } from './runAsyncHandlersStep' export const silentFailuresAsyncHandlers = new Counter({ name: 'async_handlers_silent_failure', @@ -166,26 +165,6 @@ export class EventPipelineRunner { } } - async runAppsOnEventPipeline(event: PostIngestionEvent): Promise { - try { - this.hub.statsd?.increment('kafka_queue.event_pipeline.start', { pipeline: 'onEvent' }) - await this.runStep(processOnEventStep, [this, event], event.teamId, false) - this.hub.statsd?.increment('kafka_queue.onevent.processed') - return this.registerLastStep('processOnEventStep', event.teamId, [event]) - } catch (error) { - if (error instanceof DependencyUnavailableError) { - // If this is an error with a dependency that we control, we want to - // ensure that the caller knows that the event was not processed, - // for a reason that we control and that is transient. - throw error - } - - silentFailuresAsyncHandlers.inc() - - return { lastStep: error.step, args: [], error: error.message } - } - } - registerLastStep( stepName: string, teamId: number | null, diff --git a/plugin-server/src/worker/tasks.ts b/plugin-server/src/worker/tasks.ts index 3677bb59616ad..16c9355babc82 100644 --- a/plugin-server/src/worker/tasks.ts +++ b/plugin-server/src/worker/tasks.ts @@ -1,7 +1,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' -import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType, PostIngestionEvent } from '../types' -import { convertToProcessedPluginEvent } from '../utils/event' +import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType } from '../types' import { EventPipelineRunner } from './ingestion/event-pipeline/runner' import { loadSchedule } from './plugins/loadSchedule' import { runPluginTask, runProcessEvent } from './plugins/run' @@ -33,10 +32,6 @@ export const workerTasks: Record = { const runner = new EventPipelineRunner(hub, args.event) return await runner.runEventPipeline(args.event) }, - runAppsOnEventPipeline: async (hub, args: { event: PostIngestionEvent }) => { - const runner = new EventPipelineRunner(hub, convertToProcessedPluginEvent(args.event)) - return await runner.runAppsOnEventPipeline(args.event) - }, reloadPlugins: async (hub) => { await setupPlugins(hub) }, diff --git a/plugin-server/src/worker/worker.ts b/plugin-server/src/worker/worker.ts index 02f3dc92754bc..248f7edca7864 100644 --- a/plugin-server/src/worker/worker.ts +++ b/plugin-server/src/worker/worker.ts @@ -87,11 +87,7 @@ export const createTaskRunner = return response }, (transactionDuration: number) => { - if ( - task === 'runEventPipeline' || - task === 'runWebhooksHandlersEventPipeline' || - task === 'runAppsOnEventPipeline' - ) { + if (task === 'runEventPipeline') { return transactionDuration > 0.5 ? 1 : 0.01 } else { return 1 diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 701632f0c7b57..51122c706b15a 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -55,8 +55,6 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } @@ -174,8 +172,6 @@ describe('eachBatchLegacyIngestion with overflow reroute', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index a2403137e20af..e97de06c7d4b0 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -56,8 +56,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } @@ -150,8 +148,6 @@ describe('eachBatchLegacyIngestion with overflow consume', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index 0580f53d2724b..120c82b2d901a 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -139,8 +139,6 @@ describe('eachBatchX', () => { pluginConfigsPerTeam: new Map(), }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } @@ -150,6 +148,7 @@ describe('eachBatchX', () => { it('calls runAppsOnEventPipeline when useful', async () => { queue.pluginsServer.pluginConfigsPerTeam.set(2, [pluginConfig39]) await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue) + // TODO fix to jest spy on the actual function expect(queue.workerMethods.runAppsOnEventPipeline).toHaveBeenCalledWith({ ...event, properties: { diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts index 71643e2668b48..00dde9c82fe97 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts @@ -9,7 +9,10 @@ import { convertToIngestionEvent } from '../../../../src/utils/event' import { UUIDT } from '../../../../src/utils/utils' import { ActionManager } from '../../../../src/worker/ingestion/action-manager' import { ActionMatcher } from '../../../../src/worker/ingestion/action-matcher' -import { processWebhooksStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' +import { + processOnEventStep, + processWebhooksStep, +} from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner' import { HookCommander } from '../../../../src/worker/ingestion/hooks' import { setupPlugins } from '../../../../src/worker/plugins/setup' @@ -31,7 +34,7 @@ describe('Event Pipeline integration test', () => { const result = await runner.runEventPipeline(event) const postIngestionEvent = convertToIngestionEvent(result.args[0]) return Promise.all([ - runner.runAppsOnEventPipeline(postIngestionEvent), + processOnEventStep(runner.hub, postIngestionEvent), processWebhooksStep(postIngestionEvent, actionMatcher, hookCannon), ]) } diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index b2ad3a4290a72..a48e743c31b48 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -242,18 +242,4 @@ describe('EventPipelineRunner', () => { }) }) }) - - describe('runAppsOnEventPipeline()', () => { - it('runs remaining steps', async () => { - jest.mocked(hub.db.fetchPerson).mockResolvedValue('testPerson') - - await runner.runAppsOnEventPipeline({ - ...preIngestionEvent, - person_properties: {}, - person_created_at: '2020-02-23T02:11:00.000Z' as ISOTimestamp, - }) - - expect(runner.steps).toEqual(['processOnEventStep']) - }) - }) })