diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts index e7d52d3a26bbe..8bebd169d9bca 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts @@ -4,6 +4,7 @@ import { EachBatchPayload, KafkaMessage } from 'kafkajs' import { RawClickHouseEvent } from '../../../types' import { convertToIngestionEvent } from '../../../utils/event' import { status } from '../../../utils/status' +import { processOnEventStep } from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep' import { runInstrumentedFunction } from '../../utils' import { KafkaJSIngestionConsumer } from '../kafka-queue' import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics' @@ -28,7 +29,7 @@ export async function eachMessageAppsOnEventHandlers( const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain) await runInstrumentedFunction({ - func: () => queue.workerMethods.runAppsOnEventPipeline(event), + func: () => processOnEventStep(queue.pluginsServer, event), statsKey: `kafka_queue.process_async_handlers_on_event`, timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline', timeoutContext: () => ({ diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 2d1b2c0d251a0..433bbb975fea1 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -6,7 +6,7 @@ import { Counter } from 'prom-client' import { BatchConsumer, startBatchConsumer } from '../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../kafka/config' -import { Hub, PipelineEvent, PostIngestionEvent, WorkerMethods } from '../../types' +import { Hub, PipelineEvent, WorkerMethods } from '../../types' import { KafkaConfig } from '../../utils/db/hub' import { timeoutGuard } from '../../utils/db/utils' import { status } from '../../utils/status' @@ -59,11 +59,6 @@ export class KafkaJSIngestionConsumer { // references to queue.workerMethods buried deep in the codebase // #onestepatatime this.workerMethods = { - runAppsOnEventPipeline: (event: PostIngestionEvent) => { - this.pluginsServer.lastActivity = new Date().valueOf() - this.pluginsServer.lastActivityType = 'runAppsOnEventPipeline' - return piscina.run({ task: 'runAppsOnEventPipeline', args: { event } }) - }, runEventPipeline: (event: PipelineEvent) => { this.pluginsServer.lastActivity = new Date().valueOf() this.pluginsServer.lastActivityType = 'runEventPipeline' @@ -226,11 +221,6 @@ export class IngestionConsumer { // references to queue.workerMethods buried deep in the codebase // #onestepatatime this.workerMethods = { - runAppsOnEventPipeline: (event: PostIngestionEvent) => { - this.pluginsServer.lastActivity = new Date().valueOf() - this.pluginsServer.lastActivityType = 'runAppsOnEventPipeline' - return piscina.run({ task: 'runAppsOnEventPipeline', args: { event } }) - }, runEventPipeline: (event: PipelineEvent) => { this.pluginsServer.lastActivity = new Date().valueOf() this.pluginsServer.lastActivityType = 'runEventPipeline' diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 83cf936a3b713..32efc1f1daa14 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -476,7 +476,6 @@ export interface PluginTask { } export type WorkerMethods = { - runAppsOnEventPipeline: (event: PostIngestionEvent) => Promise runEventPipeline: (event: PipelineEvent) => Promise } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts index 83821e720c0dd..4fcc4d1bd2a49 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts @@ -1,12 +1,11 @@ import { runInstrumentedFunction } from '../../../main/utils' -import { PostIngestionEvent } from '../../../types' +import { Hub, PostIngestionEvent } from '../../../types' import { convertToProcessedPluginEvent } from '../../../utils/event' import { runOnEvent } from '../../plugins/run' import { ActionMatcher } from '../action-matcher' import { HookCommander, instrumentWebhookStep } from '../hooks' -import { EventPipelineRunner } from './runner' -export async function processOnEventStep(runner: EventPipelineRunner, event: PostIngestionEvent) { +export async function processOnEventStep(hub: Hub, event: PostIngestionEvent) { const processedPluginEvent = convertToProcessedPluginEvent(event) await runInstrumentedFunction({ @@ -14,7 +13,7 @@ export async function processOnEventStep(runner: EventPipelineRunner, event: Pos team_id: event.teamId, event_uuid: event.eventUuid, }), - func: () => runOnEvent(runner.hub, processedPluginEvent), + func: () => runOnEvent(hub, processedPluginEvent), statsKey: `kafka_queue.single_on_event`, timeoutMessage: `After 30 seconds still running onEvent`, teamId: event.teamId, diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 5c9caa4aacafa..a72ce7579ddec 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -4,7 +4,7 @@ import { Counter } from 'prom-client' import { eventDroppedCounter } from '../../../main/ingestion-queues/metrics' import { runInSpan } from '../../../sentry' -import { Hub, PipelineEvent, PostIngestionEvent } from '../../../types' +import { Hub, PipelineEvent } from '../../../types' import { DependencyUnavailableError } from '../../../utils/db/error' import { timeoutGuard } from '../../../utils/db/utils' import { stringToBoolean } from '../../../utils/env-utils' @@ -15,7 +15,6 @@ import { pluginsProcessEventStep } from './pluginsProcessEventStep' import { populateTeamDataStep } from './populateTeamDataStep' import { prepareEventStep } from './prepareEventStep' import { processPersonsStep } from './processPersonsStep' -import { processOnEventStep } from './runAsyncHandlersStep' export const silentFailuresAsyncHandlers = new Counter({ name: 'async_handlers_silent_failure', @@ -166,26 +165,6 @@ export class EventPipelineRunner { } } - async runAppsOnEventPipeline(event: PostIngestionEvent): Promise { - try { - this.hub.statsd?.increment('kafka_queue.event_pipeline.start', { pipeline: 'onEvent' }) - await this.runStep(processOnEventStep, [this, event], event.teamId, false) - this.hub.statsd?.increment('kafka_queue.onevent.processed') - return this.registerLastStep('processOnEventStep', event.teamId, [event]) - } catch (error) { - if (error instanceof DependencyUnavailableError) { - // If this is an error with a dependency that we control, we want to - // ensure that the caller knows that the event was not processed, - // for a reason that we control and that is transient. - throw error - } - - silentFailuresAsyncHandlers.inc() - - return { lastStep: error.step, args: [], error: error.message } - } - } - registerLastStep( stepName: string, teamId: number | null, diff --git a/plugin-server/src/worker/tasks.ts b/plugin-server/src/worker/tasks.ts index 3677bb59616ad..16c9355babc82 100644 --- a/plugin-server/src/worker/tasks.ts +++ b/plugin-server/src/worker/tasks.ts @@ -1,7 +1,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' -import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType, PostIngestionEvent } from '../types' -import { convertToProcessedPluginEvent } from '../utils/event' +import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType } from '../types' import { EventPipelineRunner } from './ingestion/event-pipeline/runner' import { loadSchedule } from './plugins/loadSchedule' import { runPluginTask, runProcessEvent } from './plugins/run' @@ -33,10 +32,6 @@ export const workerTasks: Record = { const runner = new EventPipelineRunner(hub, args.event) return await runner.runEventPipeline(args.event) }, - runAppsOnEventPipeline: async (hub, args: { event: PostIngestionEvent }) => { - const runner = new EventPipelineRunner(hub, convertToProcessedPluginEvent(args.event)) - return await runner.runAppsOnEventPipeline(args.event) - }, reloadPlugins: async (hub) => { await setupPlugins(hub) }, diff --git a/plugin-server/src/worker/worker.ts b/plugin-server/src/worker/worker.ts index 02f3dc92754bc..248f7edca7864 100644 --- a/plugin-server/src/worker/worker.ts +++ b/plugin-server/src/worker/worker.ts @@ -87,11 +87,7 @@ export const createTaskRunner = return response }, (transactionDuration: number) => { - if ( - task === 'runEventPipeline' || - task === 'runWebhooksHandlersEventPipeline' || - task === 'runAppsOnEventPipeline' - ) { + if (task === 'runEventPipeline') { return transactionDuration > 0.5 ? 1 : 0.01 } else { return 1 diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 701632f0c7b57..51122c706b15a 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -55,8 +55,6 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } @@ -174,8 +172,6 @@ describe('eachBatchLegacyIngestion with overflow reroute', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index a2403137e20af..e97de06c7d4b0 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -56,8 +56,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } @@ -150,8 +148,6 @@ describe('eachBatchLegacyIngestion with overflow consume', () => { db: 'database', }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index 8175511a17a46..2f8756e23665e 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -24,8 +24,11 @@ import { import { ActionManager } from '../../../src/worker/ingestion/action-manager' import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher' import { HookCommander } from '../../../src/worker/ingestion/hooks' +import { runOnEvent } from '../../../src/worker/plugins/run' import { pluginConfig39 } from '../../helpers/plugins' +jest.mock('../../../src/worker/plugins/run') + jest.mock('../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep', () => { const originalModule = jest.requireActual('../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep') return { @@ -138,32 +141,33 @@ describe('eachBatchX', () => { pluginConfigsPerTeam: new Map(), }, workerMethods: { - runAppsOnEventPipeline: jest.fn(), - runWebhooksHandlersEventPipeline: jest.fn(), runEventPipeline: jest.fn(() => Promise.resolve({})), }, } }) describe('eachBatchAppsOnEventHandlers', () => { - it('calls runAppsOnEventPipeline when useful', async () => { + it('calls runOnEvent when useful', async () => { queue.pluginsServer.pluginConfigsPerTeam.set(2, [pluginConfig39]) await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue) - expect(queue.workerMethods.runAppsOnEventPipeline).toHaveBeenCalledWith({ - ...event, - properties: { - $ip: '127.0.0.1', - }, - }) + // TODO fix to jest spy on the actual function + expect(runOnEvent).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + uuid: 'uuid1', + team_id: 2, + distinct_id: 'my_id', + }) + ) expect(queue.pluginsServer.statsd.timing).toHaveBeenCalledWith( 'kafka_queue.each_batch_async_handlers_on_event', expect.any(Date) ) }) - it('skip runAppsOnEventPipeline when no pluginconfig for team', async () => { + it('skip runOnEvent when no pluginconfig for team', async () => { queue.pluginsServer.pluginConfigsPerTeam.clear() await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue) - expect(queue.workerMethods.runAppsOnEventPipeline).not.toHaveBeenCalled() + expect(runOnEvent).not.toHaveBeenCalled() expect(queue.pluginsServer.statsd.timing).toHaveBeenCalledWith( 'kafka_queue.each_batch_async_handlers_on_event', expect.any(Date) @@ -179,13 +183,15 @@ describe('eachBatchX', () => { createKafkaJSBatch({ ...clickhouseEvent, elements_chain: 'random' }), queue ) - expect(queue.workerMethods.runAppsOnEventPipeline).toHaveBeenCalledWith({ - ...event, - elementsList: [{ attributes: {}, order: 0, tag_name: 'random' }], - properties: { - $ip: '127.0.0.1', - }, - }) + expect(runOnEvent).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + uuid: 'uuid1', + team_id: 2, + distinct_id: 'my_id', + elements: [{ attributes: {}, order: 0, tag_name: 'random' }], + }) + ) }) it('skips elements parsing when not useful', async () => { queue.pluginsServer.pluginConfigsPerTeam.set(2, [ @@ -197,12 +203,15 @@ describe('eachBatchX', () => { createKafkaJSBatch({ ...clickhouseEvent, elements_chain: 'random' }), queue ) - expect(queue.workerMethods.runAppsOnEventPipeline).toHaveBeenCalledWith({ - ...event, - properties: { - $ip: '127.0.0.1', - }, - }) + expect(runOnEvent).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + uuid: 'uuid1', + team_id: 2, + distinct_id: 'my_id', + elements: [], + }) + ) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts index bd118c2d39067..f270308eaf6a1 100644 --- a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts @@ -21,15 +21,15 @@ import LibrdKafkaError from 'node-rdkafka-acosom/lib/error' import { defaultConfig } from '../../../src/config/config' import { KAFKA_EVENTS_JSON } from '../../../src/config/kafka-topics' import { buildOnEventIngestionConsumer } from '../../../src/main/ingestion-queues/on-event-handler-consumer' -import { Hub } from '../../../src/types' +import { Hub, ISOTimestamp } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { UUIDT } from '../../../src/utils/utils' +import { processOnEventStep } from '../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' import Piscina, { makePiscina } from '../../../src/worker/piscina' import { setupPlugins } from '../../../src/worker/plugins/setup' import { teardownPlugins } from '../../../src/worker/plugins/teardown' -import { createTaskRunner } from '../../../src/worker/worker' import { createOrganization, createPlugin, @@ -40,7 +40,7 @@ import { jest.setTimeout(10000) -describe('workerTasks.runAppsOnEventPipeline()', () => { +describe('runAppsOnEventPipeline()', () => { // Tests the failure cases for the workerTasks.runAppsOnEventPipeline // task. Note that this equally applies to e.g. runEventPipeline task as // well and likely could do with adding additional tests for that. @@ -52,7 +52,6 @@ describe('workerTasks.runAppsOnEventPipeline()', () => { let hub: Hub let redis: Redis.Redis let closeHub: () => Promise - let piscinaTaskRunner: ({ task, args }) => Promise beforeEach(() => { // Use fake timers to ensure that we don't need to wait on e.g. retry logic. @@ -63,7 +62,6 @@ describe('workerTasks.runAppsOnEventPipeline()', () => { jest.useFakeTimers({ advanceTimers: true }) ;[hub, closeHub] = await createHub() redis = await hub.redisPool.acquire() - piscinaTaskRunner = createTaskRunner(hub) await hub.postgres.query(PostgresUse.COMMON_WRITE, POSTGRES_DELETE_TABLES_QUERY, null, 'deleteTables') // Need to clear the DB to avoid unique constraint violations on ids }) @@ -120,18 +118,16 @@ describe('workerTasks.runAppsOnEventPipeline()', () => { ) await expect( - piscinaTaskRunner({ - task: 'runAppsOnEventPipeline', - args: { - event: { - distinctId: 'asdf', - ip: '', - teamId: teamId, - event: 'some event', - properties: {}, - eventUuid: new UUIDT().toString(), - }, - }, + processOnEventStep(hub, { + distinctId: 'asdf', + teamId: teamId, + event: 'some event', + properties: {}, + eventUuid: new UUIDT().toString(), + person_created_at: null, + person_properties: {}, + timestamp: new Date().toISOString() as ISOTimestamp, + elementsList: [], }) ).rejects.toEqual(new DependencyUnavailableError('Failed to produce', 'Kafka', error)) }) @@ -159,22 +155,17 @@ describe('workerTasks.runAppsOnEventPipeline()', () => { const event = { distinctId: 'asdf', - ip: '', teamId: teamId, event: 'some event', properties: {}, eventUuid: new UUIDT().toString(), + person_created_at: null, + person_properties: {}, + timestamp: new Date().toISOString() as ISOTimestamp, + elementsList: [], } - await expect( - piscinaTaskRunner({ - task: 'runAppsOnEventPipeline', - args: { event }, - }) - ).resolves.toEqual({ - args: [expect.objectContaining(event)], - lastStep: 'processOnEventStep', - }) + await expect(processOnEventStep(hub, event)).resolves.toEqual(null) }) }) @@ -198,7 +189,7 @@ describe('eachBatchAsyncHandlers', () => { jest.useRealTimers() }) - test('rejections from piscina are bubbled up to the consumer', async () => { + test('rejections from kafka are bubbled up to the consumer', async () => { piscina = await makePiscina(defaultConfig, hub) const ingestionConsumer = buildOnEventIngestionConsumer({ hub, piscina }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts index 0b65c4563594f..89bd8d8c27931 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts @@ -9,7 +9,10 @@ import { convertToIngestionEvent } from '../../../../src/utils/event' import { UUIDT } from '../../../../src/utils/utils' import { ActionManager } from '../../../../src/worker/ingestion/action-manager' import { ActionMatcher } from '../../../../src/worker/ingestion/action-matcher' -import { processWebhooksStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' +import { + processOnEventStep, + processWebhooksStep, +} from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner' import { HookCommander } from '../../../../src/worker/ingestion/hooks' import { setupPlugins } from '../../../../src/worker/plugins/setup' @@ -31,7 +34,7 @@ describe('Event Pipeline integration test', () => { const result = await runner.runEventPipeline(event) const postIngestionEvent = convertToIngestionEvent(result.args[0]) return Promise.all([ - runner.runAppsOnEventPipeline(postIngestionEvent), + processOnEventStep(runner.hub, postIngestionEvent), processWebhooksStep(postIngestionEvent, actionMatcher, hookCannon), ]) } diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts index 491b57863c1b3..8f3bdd6928031 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts @@ -43,7 +43,7 @@ describe('runAsyncHandlersStep()', () => { }) it('stops processing', async () => { - const response = await processOnEventStep(runner, ingestionEvent) + const response = await processOnEventStep(runner.hub, ingestionEvent) expect(response).toEqual(null) }) @@ -56,7 +56,7 @@ describe('runAsyncHandlersStep()', () => { }) it('calls onEvent plugin methods', async () => { - await processOnEventStep(runner, ingestionEvent) + await processOnEventStep(runner.hub, ingestionEvent) expect(runOnEvent).toHaveBeenCalledWith(runner.hub, convertToProcessedPluginEvent(ingestionEvent)) }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index b2ad3a4290a72..a48e743c31b48 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -242,18 +242,4 @@ describe('EventPipelineRunner', () => { }) }) }) - - describe('runAppsOnEventPipeline()', () => { - it('runs remaining steps', async () => { - jest.mocked(hub.db.fetchPerson).mockResolvedValue('testPerson') - - await runner.runAppsOnEventPipeline({ - ...preIngestionEvent, - person_properties: {}, - person_created_at: '2020-02-23T02:11:00.000Z' as ISOTimestamp, - }) - - expect(runner.steps).toEqual(['processOnEventStep']) - }) - }) })