diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 47354d9fe5113..9466838cdf9bc 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -2,12 +2,12 @@ import * as Sentry from '@sentry/node' import { Message, MessageHeader } from 'node-rdkafka' import { KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../config/kafka-topics' -import { Hub, PipelineEvent, ValueMatcher } from '../../../types' +import { PipelineEvent, ValueMatcher } from '../../../types' import { formPipelineEvent } from '../../../utils/event' import { retryIfRetriable } from '../../../utils/retries' import { status } from '../../../utils/status' import { ConfiguredLimiter, LoggingLimiter, OverflowWarningLimiter } from '../../../utils/token-bucket' -import { EventPipelineResult, runEventPipeline } from '../../../worker/ingestion/event-pipeline/runner' +import { EventPipelineRunner } from '../../../worker/ingestion/event-pipeline/runner' import { captureIngestionWarning } from '../../../worker/ingestion/utils' import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer' import { IngestionConsumer } from '../kafka-queue' @@ -156,7 +156,8 @@ export async function eachBatchParallelIngestion( for (const { message, pluginEvent } of currentBatch) { try { const result = (await retryIfRetriable(async () => { - return await ingestEvent(queue.pluginsServer, pluginEvent) + const runner = new EventPipelineRunner(queue.pluginsServer, pluginEvent) + return await runner.runEventPipeline(pluginEvent) })) as IngestResult result.promises?.forEach((promise) => @@ -243,16 +244,6 @@ export async function eachBatchParallelIngestion( } } -async function ingestEvent( - server: Hub, - event: PipelineEvent, - checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again -): Promise { - checkAndPause?.() - const result = await runEventPipeline(server, event) - return result -} - function computeKey(pluginEvent: PipelineEvent): string { return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}` } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index fc5953503fa55..b2baa1f16e678 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -43,12 +43,6 @@ class StepErrorNoRetry extends Error { this.args = args } } - -export async function runEventPipeline(hub: Hub, event: PipelineEvent): Promise { - const runner = new EventPipelineRunner(hub, event) - return runner.runEventPipeline(event) -} - export class EventPipelineRunner { hub: Hub originalEvent: PipelineEvent diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts index affc100370afa..7861e00185b0b 100644 --- a/plugin-server/src/worker/ingestion/utils.ts +++ b/plugin-server/src/worker/ingestion/utils.ts @@ -20,7 +20,7 @@ export function generateEventDeadLetterQueueMessage( teamId: number, errorLocation = 'plugin_server_ingest_event' ): ProducerRecord { - let errorMessage = 'ingestEvent failed. ' + let errorMessage = 'Event ingestion failed. ' if (error instanceof Error) { errorMessage += `Error: ${error.message}` } diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 6641aa8b87268..e042cf5c1ac34 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -5,14 +5,17 @@ import { IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' import { ConfiguredLimiter } from '../../../src/utils/token-bucket' -import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' jest.mock('../../../src/utils/status') jest.mock('./../../../src/worker/ingestion/utils') +const runEventPipeline = jest.fn().mockResolvedValue('default value') + jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ - runEventPipeline: jest.fn().mockResolvedValue('default value'), + EventPipelineRunner: jest.fn().mockImplementation(() => ({ + runEventPipeline: runEventPipeline, + })), })) const captureEndpointEvent1 = { diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index 70c242be766c6..ce2569909fe4a 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -4,13 +4,17 @@ import { IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' import { OverflowWarningLimiter } from '../../../src/utils/token-bucket' -import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' jest.mock('../../../src/utils/status') jest.mock('./../../../src/worker/ingestion/utils') + +const runEventPipeline = jest.fn().mockResolvedValue('default value') + jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ - runEventPipeline: jest.fn().mockResolvedValue('default value'), + EventPipelineRunner: jest.fn().mockImplementation(() => ({ + runEventPipeline: runEventPipeline, + })), })) const captureEndpointEvent1 = { diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index a98490d00f4e9..667c278d243f1 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -24,7 +24,6 @@ import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher' import { HookCommander } from '../../../src/worker/ingestion/hooks' import { runOnEvent } from '../../../src/worker/plugins/run' import { pluginConfig39 } from '../../helpers/plugins' -import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' jest.mock('../../../src/worker/plugins/run') @@ -37,9 +36,13 @@ jest.mock('../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep', ( }) jest.mock('../../../src/utils/status') jest.mock('./../../../src/worker/ingestion/utils') + +const runEventPipeline = jest.fn().mockResolvedValue('default value') + jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ - runEventPipeline: jest.fn().mockResolvedValue('default value'), - // runEventPipeline: jest.fn().mockRejectedValue('default value'), + EventPipelineRunner: jest.fn().mockImplementation(() => ({ + runEventPipeline: runEventPipeline, + })), })) const event: PostIngestionEvent = { @@ -375,19 +378,12 @@ describe('eachBatchX', () => { }) describe('eachBatchParallelIngestion', () => { - let runEventPipelineSpy - beforeEach(() => { - runEventPipelineSpy = jest.spyOn( - require('./../../../src/worker/ingestion/event-pipeline/runner'), - 'runEventPipeline' - ) - }) it('calls runEventPipeline', async () => { const batch = createBatch(captureEndpointEvent) const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled) - expect(runEventPipeline).toHaveBeenCalledWith(expect.anything(), { + expect(runEventPipeline).toHaveBeenCalledWith({ distinct_id: 'id', event: 'event', properties: {}, @@ -402,7 +398,7 @@ describe('eachBatchX', () => { it("doesn't fail the batch if runEventPipeline rejects once then succeeds on retry", async () => { const batch = createBatch(captureEndpointEvent) - runEventPipelineSpy.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out')) + runEventPipeline.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out')) const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled) expect(runEventPipeline).toHaveBeenCalledTimes(2) @@ -410,7 +406,7 @@ describe('eachBatchX', () => { it('fails the batch if one deferred promise rejects', async () => { const batch = createBatch(captureEndpointEvent) - runEventPipelineSpy.mockImplementationOnce(() => + runEventPipeline.mockImplementationOnce(() => Promise.resolve({ promises: [Promise.resolve(), Promise.reject('deferred nopes out')], }) @@ -518,7 +514,7 @@ describe('eachBatchX', () => { it('fails the batch if runEventPipeline rejects repeatedly', async () => { const tokenBlockList = buildStringMatcher('another_token,more_token', false) const batch = createBatch(captureEndpointEvent) - runEventPipelineSpy + runEventPipeline .mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out')) .mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out')) .mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out')) @@ -526,7 +522,7 @@ describe('eachBatchX', () => { eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled) ).rejects.toBe('runEventPipeline nopes out') expect(runEventPipeline).toHaveBeenCalledTimes(3) - runEventPipelineSpy.mockRestore() + runEventPipeline.mockRestore() }) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts index 4ea08342eda2b..fa5c000fd9ae1 100644 --- a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts @@ -6,7 +6,7 @@ import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { UUIDT } from '../../../src/utils/utils' -import { runEventPipeline } from '../../../src/worker/ingestion/event-pipeline/runner' +import { EventPipelineRunner } from '../../../src/worker/ingestion/event-pipeline/runner' import { createOrganization, createTeam, POSTGRES_DELETE_TABLES_QUERY } from '../../helpers/sql' describe('workerTasks.runEventPipeline()', () => { @@ -49,18 +49,19 @@ describe('workerTasks.runEventPipeline()', () => { return Promise.reject(new Error(errorMessage)) }) - await expect( - runEventPipeline(hub, { - distinct_id: 'asdf', - ip: '', - team_id: teamId, - event: 'some event', - properties: {}, - site_url: 'https://example.com', - now: new Date().toISOString(), - uuid: new UUIDT().toString(), - }) - ).rejects.toEqual(new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage))) + const event = { + distinct_id: 'asdf', + ip: '', + team_id: teamId, + event: 'some event', + properties: {}, + site_url: 'https://example.com', + now: new Date().toISOString(), + uuid: new UUIDT().toString(), + } + await expect(new EventPipelineRunner(hub, event).runEventPipeline(event)).rejects.toEqual( + new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage)) + ) pgQueryMock.mockRestore() }) }) diff --git a/plugin-server/tests/main/teardown.test.ts b/plugin-server/tests/main/teardown.test.ts index 50e3948e912cf..58137f82bf24f 100644 --- a/plugin-server/tests/main/teardown.test.ts +++ b/plugin-server/tests/main/teardown.test.ts @@ -4,7 +4,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { waitForExpect } from '../../functional_tests/expectations' import { startPluginsServer } from '../../src/main/pluginsServer' import { Hub, LogLevel, PluginLogEntry, PluginLogEntrySource, PluginLogEntryType } from '../../src/types' -import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner' +import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner' import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' import { resetTestDatabase } from '../helpers/sql' @@ -36,7 +36,7 @@ async function getLogEntriesForPluginConfig(hub: Hub, pluginConfigId: number) { describe('teardown', () => { const processEvent = async (hub: Hub, event: PluginEvent) => { - const result = await runEventPipeline(hub, event) + const result = await new EventPipelineRunner(hub, event).runEventPipeline(event) const resultEvent = result.args[0] return resultEvent } diff --git a/plugin-server/tests/worker/dead-letter-queue.test.ts b/plugin-server/tests/worker/dead-letter-queue.test.ts index 9de02801c4c3f..a9ef21a6fdd2c 100644 --- a/plugin-server/tests/worker/dead-letter-queue.test.ts +++ b/plugin-server/tests/worker/dead-letter-queue.test.ts @@ -3,7 +3,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { Hub, LogLevel } from '../../src/types' import { createHub } from '../../src/utils/db/hub' import { UUIDT } from '../../src/utils/utils' -import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner' +import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner' import { generateEventDeadLetterQueueMessage } from '../../src/worker/ingestion/utils' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse' import { resetTestDatabase } from '../helpers/sql' @@ -59,7 +59,8 @@ describe('events dead letter queue', () => { }) test('events get sent to dead letter queue on error', async () => { - const ingestResponse1 = await runEventPipeline(hub, createEvent()) + const event = createEvent() + const ingestResponse1 = await new EventPipelineRunner(hub, event).runEventPipeline(event) expect(ingestResponse1).toEqual({ lastStep: 'prepareEventStep', error: 'database unavailable', @@ -78,7 +79,7 @@ describe('events dead letter queue', () => { expect(dlqEvent.team_id).toEqual(2) expect(dlqEvent.team_id).toEqual(2) expect(dlqEvent.error_location).toEqual('plugin_server_ingest_event:prepareEventStep') - expect(dlqEvent.error).toEqual('ingestEvent failed. Error: database unavailable') + expect(dlqEvent.error).toEqual('Event ingestion failed. Error: database unavailable') expect(dlqEvent.properties).toEqual(JSON.stringify({ key: 'value', $ip: '127.0.0.1' })) expect(dlqEvent.event_uuid).toEqual(EVENT_UUID) }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index c0c894f4777b9..4bfc79f5e2379 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -222,7 +222,7 @@ describe('EventPipelineRunner', () => { expect(JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].messages[0].value)).toMatchObject({ team_id: 2, distinct_id: 'my_id', - error: 'ingestEvent failed. Error: testError', + error: 'Event ingestion failed. Error: testError', error_location: 'plugin_server_ingest_event:prepareEventStep', }) expect(pipelineStepDLQCounterSpy).toHaveBeenCalledWith('prepareEventStep')