From 286b6899984d4d481fd8a67755f655f7939c773c Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 16 Oct 2023 09:19:49 -0600 Subject: [PATCH] chore(plugin-server): remove INGESTION_DELAY_WRITE_ACKS and workerMethods (#17932) * chore: stop using piscina worker methods for runEventPipeline * chore(plugin-server): remove INGESTION_DELAY_WRITE_ACKS --------- Co-authored-by: Tiina Turban --- .../error-handling.test.ts | 2 +- .../batch-processing/each-batch-ingestion.ts | 125 +++++++++--------- .../batch-processing/each-batch-webhooks.ts | 6 +- .../src/main/ingestion-queues/kafka-queue.ts | 26 +--- plugin-server/src/types.ts | 5 - .../worker/ingestion/event-pipeline/runner.ts | 31 ++--- plugin-server/src/worker/tasks.ts | 7 +- plugin-server/src/worker/worker.ts | 8 +- ...nalytics-events-ingestion-consumer.test.ts | 15 ++- ...events-ingestion-overflow-consumer.test.ts | 11 +- .../main/ingestion-queues/each-batch.test.ts | 27 ++-- .../run-ingestion-pipeline.test.ts | 26 ++-- plugin-server/tests/main/teardown.test.ts | 13 +- .../tests/worker/dead-letter-queue.test.ts | 4 +- 14 files changed, 140 insertions(+), 166 deletions(-) diff --git a/plugin-server/functional_tests/analytics-ingestion/error-handling.test.ts b/plugin-server/functional_tests/analytics-ingestion/error-handling.test.ts index 3d429760788ea..e2d366d9b94dc 100644 --- a/plugin-server/functional_tests/analytics-ingestion/error-handling.test.ts +++ b/plugin-server/functional_tests/analytics-ingestion/error-handling.test.ts @@ -36,7 +36,7 @@ afterAll(async () => { await dlqConsumer.disconnect() }) -test.concurrent('consumer handles messages just less than 1MB gracefully', async () => { +test.concurrent('consumer handles messages just over 1MB gracefully', async () => { // For this we basically want the plugin-server to try and produce a new // message larger than 1MB. We do this by creating a person with a lot of // properties. We will end up denormalizing the person properties onto the diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 652f70cd74244..9526faa4bb88b 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -2,11 +2,11 @@ import * as Sentry from '@sentry/node' import { Message, MessageHeader } from 'node-rdkafka' import { KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../config/kafka-topics' -import { Hub, PipelineEvent, WorkerMethods } from '../../../types' +import { Hub, PipelineEvent } from '../../../types' import { formPipelineEvent } from '../../../utils/event' import { status } from '../../../utils/status' import { ConfiguredLimiter, LoggingLimiter, WarningLimiter } from '../../../utils/token-bucket' -import { EventPipelineResult } from '../../../worker/ingestion/event-pipeline/runner' +import { EventPipelineResult, runEventPipeline } from '../../../worker/ingestion/event-pipeline/runner' import { captureIngestionWarning } from '../../../worker/ingestion/utils' import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer' import { IngestionConsumer } from '../kafka-queue' @@ -40,13 +40,66 @@ type IngestResult = { promises?: Array> } +async function handleProcessingError( + error: any, + message: Message, + pluginEvent: PipelineEvent, + queue: IngestionConsumer +) { + status.error('🔥', `Error processing message`, { + stack: error.stack, + error: error, + }) + + // If the error is a non-retriable error, push to the dlq and commit the offset. Else raise the + // error. + // + // NOTE: there is behavior to push to a DLQ at the moment within EventPipelineRunner. This + // doesn't work so well with e.g. messages that when sent to the DLQ is it's self too large. + // Here we explicitly do _not_ add any additional metadata to the message. We might want to add + // some metadata to the message e.g. in the header or reference e.g. the sentry event id. + // + // TODO: property abstract out this `isRetriable` error logic. This is currently relying on the + // fact that node-rdkafka adheres to the `isRetriable` interface. + if (error?.isRetriable === false) { + const sentryEventId = Sentry.captureException(error) + const headers: MessageHeader[] = message.headers ?? [] + headers.push({ ['sentry-event-id']: sentryEventId }) + headers.push({ ['event-id']: pluginEvent.uuid }) + try { + await queue.pluginsServer.kafkaProducer.produce({ + topic: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, + value: message.value, + key: message.key, + headers: headers, + waitForAck: true, + }) + } catch (error) { + // If we can't send to the DLQ and it's not retriable, just continue. We'll commit the + // offset and move on. + if (error?.isRetriable === false) { + status.error('🔥', `Error pushing to DLQ`, { + stack: error.stack, + error: error, + }) + return + } + + // If we can't send to the DLQ and it is retriable, raise the error. + throw error + } + } else { + throw error + } +} + export async function eachBatchParallelIngestion( messages: Message[], queue: IngestionConsumer, overflowMode: IngestionOverflowMode ): Promise { async function eachMessage(event: PipelineEvent, queue: IngestionConsumer): Promise { - return ingestEvent(queue.pluginsServer, queue.workerMethods, event) + return ingestEvent(queue.pluginsServer, event) } const batchStartTimer = new Date() @@ -102,62 +155,16 @@ export async function eachBatchParallelIngestion( for (const { message, pluginEvent } of currentBatch) { try { const result = await eachMessage(pluginEvent, queue) - if (result.promises) { - processingPromises.push(...result.promises) - } - } catch (error) { - status.error('🔥', `Error processing message`, { - stack: error.stack, - error: error, - }) - - // If there error is a non-retriable error, push - // to the dlq and commit the offset. Else raise the - // error. - // - // NOTE: there is behavior to push to a DLQ at the - // moment within EventPipelineRunner. This doesn't work - // so well with e.g. messages that when sent to the DLQ - // is it's self too large. Here we explicitly do _not_ - // add any additional metadata to the message. We might - // want to add some metadata to the message e.g. in the - // header or reference e.g. the sentry event id. - // - // TODO: property abstract out this `isRetriable` error - // logic. This is currently relying on the fact that - // node-rdkafka adheres to the `isRetriable` interface. - if (error?.isRetriable === false) { - const sentryEventId = Sentry.captureException(error) - const headers: MessageHeader[] = message.headers ?? [] - headers.push({ ['sentry-event-id']: sentryEventId }) - headers.push({ ['event-id']: pluginEvent.uuid }) - try { - await queue.pluginsServer.kafkaProducer.produce({ - topic: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, - value: message.value, - key: message.key, - headers: headers, - waitForAck: true, + + for (const promise of result.promises ?? []) { + processingPromises.push( + promise.catch(async (error) => { + await handleProcessingError(error, message, pluginEvent, queue) }) - } catch (error) { - // If we can't send to the DLQ and it's not - // retriable, just continue. We'll commit the - // offset and move on. - if (error?.isRetriable === false) { - status.error('🔥', `Error pushing to DLQ`, { - stack: error.stack, - error: error, - }) - continue - } - - // If we can't send to the DLQ and it is - // retriable, raise the error. - throw error - } - } else { - throw error + ) } + } catch (error) { + await handleProcessingError(error, message, pluginEvent, queue) } } @@ -236,7 +243,6 @@ export async function eachBatchParallelIngestion( async function ingestEvent( server: Hub, - workerMethods: WorkerMethods, event: PipelineEvent, checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again ): Promise { @@ -247,7 +253,8 @@ async function ingestEvent( server.statsd?.increment('kafka_queue_ingest_event_hit', { pipeline: 'runEventPipeline', }) - const result = await workerMethods.runEventPipeline(event) + + const result = await runEventPipeline(server, event) server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer) countAndLogEvents() diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts index a0c3f1e5bc22b..7237f0d4a7b68 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts @@ -1,6 +1,7 @@ import * as Sentry from '@sentry/node' import { StatsD } from 'hot-shots' import { EachBatchPayload, KafkaMessage } from 'kafkajs' +import { Counter } from 'prom-client' import { ActionMatcher } from 'worker/ingestion/action-matcher' import { PostIngestionEvent, RawClickHouseEvent } from '../../../types' @@ -8,7 +9,6 @@ import { DependencyUnavailableError } from '../../../utils/db/error' import { convertToIngestionEvent, convertToProcessedPluginEvent } from '../../../utils/event' import { status } from '../../../utils/status' import { processWebhooksStep } from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep' -import { silentFailuresAsyncHandlers } from '../../../worker/ingestion/event-pipeline/runner' import { HookCommander } from '../../../worker/ingestion/hooks' import { runInstrumentedFunction } from '../../utils' import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics' @@ -16,6 +16,10 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') +export const silentFailuresAsyncHandlers = new Counter({ + name: 'async_handlers_silent_failure', + help: 'Number silent failures from async handlers.', +}) // exporting only for testing export function groupIntoBatchesByUsage( array: KafkaMessage[], diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 690cf07291fca..c60a6a8f8d9d8 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -6,7 +6,7 @@ import { Counter } from 'prom-client' import { BatchConsumer, startBatchConsumer } from '../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../kafka/config' -import { Hub, PipelineEvent, WorkerMethods } from '../../types' +import { Hub } from '../../types' import { KafkaConfig } from '../../utils/db/hub' import { timeoutGuard } from '../../utils/db/utils' import { status } from '../../utils/status' @@ -23,7 +23,6 @@ type KafkaJSBatchFunction = (payload: EachBatchPayload, queue: KafkaJSIngestionC export class KafkaJSIngestionConsumer { public pluginsServer: Hub - public workerMethods: WorkerMethods public consumerReady: boolean public topic: string public consumerGroupId: string @@ -54,17 +53,6 @@ export class KafkaJSIngestionConsumer { ) this.wasConsumerRan = false - // TODO: remove `this.workerMethods` and just rely on - // `this.batchHandler`. At the time of writing however, there are some - // references to queue.workerMethods buried deep in the codebase - // #onestepatatime - this.workerMethods = { - runEventPipeline: (event: PipelineEvent) => { - this.pluginsServer.lastActivity = new Date().valueOf() - this.pluginsServer.lastActivityType = 'runEventPipeline' - return piscina.run({ task: 'runEventPipeline', args: { event } }) - }, - } this.consumerGroupMemberId = null this.consumerReady = false @@ -198,7 +186,6 @@ type EachBatchFunction = (messages: Message[], queue: IngestionConsumer) => Prom export class IngestionConsumer { public pluginsServer: Hub - public workerMethods: WorkerMethods public consumerReady: boolean public topic: string public consumerGroupId: string @@ -216,17 +203,6 @@ export class IngestionConsumer { this.topic = topic this.consumerGroupId = consumerGroupId - // TODO: remove `this.workerMethods` and just rely on - // `this.batchHandler`. At the time of writing however, there are some - // references to queue.workerMethods buried deep in the codebase - // #onestepatatime - this.workerMethods = { - runEventPipeline: (event: PipelineEvent) => { - this.pluginsServer.lastActivity = new Date().valueOf() - this.pluginsServer.lastActivityType = 'runEventPipeline' - return piscina.run({ task: 'runEventPipeline', args: { event } }) - }, - } this.consumerReady = false this.eachBatch = batchHandler diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 7c99819524b25..ef35ac6659141 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -24,7 +24,6 @@ import { KafkaProducerWrapper } from './utils/db/kafka-producer-wrapper' import { PostgresRouter } from './utils/db/postgres' import { UUID } from './utils/utils' import { AppMetrics } from './worker/ingestion/app-metrics' -import { EventPipelineResult } from './worker/ingestion/event-pipeline/runner' import { OrganizationManager } from './worker/ingestion/organization-manager' import { EventsProcessor } from './worker/ingestion/process-event' import { TeamManager } from './worker/ingestion/team-manager' @@ -477,10 +476,6 @@ export interface PluginTask { __ignoreForAppMetrics?: boolean } -export type WorkerMethods = { - runEventPipeline: (event: PipelineEvent) => Promise -} - export type VMMethods = { setupPlugin?: () => Promise teardownPlugin?: () => Promise diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index fde36de792369..d6f3ce6d0ec1c 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -1,4 +1,4 @@ -import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold' +import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { Counter } from 'prom-client' @@ -7,7 +7,6 @@ import { runInSpan } from '../../../sentry' import { Hub, PipelineEvent } from '../../../types' import { DependencyUnavailableError } from '../../../utils/db/error' import { timeoutGuard } from '../../../utils/db/utils' -import { stringToBoolean } from '../../../utils/env-utils' import { status } from '../../../utils/status' import { generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' @@ -16,10 +15,6 @@ import { populateTeamDataStep } from './populateTeamDataStep' import { prepareEventStep } from './prepareEventStep' import { processPersonsStep } from './processPersonsStep' -export const silentFailuresAsyncHandlers = new Counter({ - name: 'async_handlers_silent_failure', - help: 'Number silent failures from async handlers.', -}) const pipelineStepCompletionCounter = new Counter({ name: 'events_pipeline_step_executed_total', help: 'Number of events that have completed the step', @@ -58,21 +53,22 @@ class StepErrorNoRetry extends Error { } } +export async function runEventPipeline(hub: Hub, event: PipelineEvent): Promise { + const runner = new EventPipelineRunner(hub, event) + return runner.runEventPipeline(event) +} + export class EventPipelineRunner { hub: Hub - originalEvent: PipelineEvent | ProcessedPluginEvent + originalEvent: PipelineEvent // See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0 poEEmbraceJoin: boolean - private delayAcks: boolean - constructor(hub: Hub, originalEvent: PipelineEvent | ProcessedPluginEvent, poEEmbraceJoin = false) { + constructor(hub: Hub, event: PipelineEvent, poEEmbraceJoin = false) { this.hub = hub - this.originalEvent = originalEvent this.poEEmbraceJoin = poEEmbraceJoin - - // TODO: remove after successful rollout - this.delayAcks = stringToBoolean(process.env.INGESTION_DELAY_WRITE_ACKS) + this.originalEvent = event } isEventBlacklisted(event: PipelineEvent): boolean { @@ -87,6 +83,7 @@ export class EventPipelineRunner { } async runEventPipeline(event: PipelineEvent): Promise { + this.originalEvent = event this.hub.statsd?.increment('kafka_queue.event_pipeline.start', { pipeline: 'event' }) try { @@ -147,12 +144,8 @@ export class EventPipelineRunner { [this, preparedEvent, person], event.team_id ) - if (this.delayAcks) { - return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person], [eventAck]) - } else { - await eventAck - return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person]) - } + + return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person], [eventAck]) } registerLastStep( diff --git a/plugin-server/src/worker/tasks.ts b/plugin-server/src/worker/tasks.ts index 16c9355babc82..1719cf948917e 100644 --- a/plugin-server/src/worker/tasks.ts +++ b/plugin-server/src/worker/tasks.ts @@ -1,7 +1,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' -import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType } from '../types' -import { EventPipelineRunner } from './ingestion/event-pipeline/runner' +import { EnqueuedPluginJob, Hub, PluginTaskType } from '../types' import { loadSchedule } from './plugins/loadSchedule' import { runPluginTask, runProcessEvent } from './plugins/run' import { setupPlugins } from './plugins/setup' @@ -28,10 +27,6 @@ export const workerTasks: Record = { pluginScheduleReady: (hub) => { return hub.pluginSchedule !== null }, - runEventPipeline: async (hub, args: { event: PipelineEvent }) => { - const runner = new EventPipelineRunner(hub, args.event) - return await runner.runEventPipeline(args.event) - }, reloadPlugins: async (hub) => { await setupPlugins(hub) }, diff --git a/plugin-server/src/worker/worker.ts b/plugin-server/src/worker/worker.ts index b08ab8ead77df..858804cd0200c 100644 --- a/plugin-server/src/worker/worker.ts +++ b/plugin-server/src/worker/worker.ts @@ -100,12 +100,8 @@ export const createTaskRunner = } return response }, - (transactionDuration: number) => { - if (task === 'runEventPipeline') { - return transactionDuration > 0.5 ? 1 : 0.01 - } else { - return 1 - } + (_) => { + return 1 } ) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index b8d466a649662..cbb3d06e6a29d 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -9,6 +9,11 @@ import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' jest.mock('../../../src/utils/status') jest.mock('./../../../src/worker/ingestion/utils') +jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ + runEventPipeline: jest.fn().mockResolvedValue('default value'), +})) +import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' + const captureEndpointEvent = { uuid: 'uuid1', distinct_id: 'id', @@ -53,10 +58,8 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { }, db: 'database', }, - workerMethods: { - runEventPipeline: jest.fn(() => Promise.resolve({})), - }, } + jest.mock('./../../../src/worker/ingestion/event-pipeline/runner') }) it('reroutes events with no key to OVERFLOW topic', async () => { @@ -87,7 +90,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { }) // Event is not processed here - expect(queue.workerMethods.runEventPipeline).not.toHaveBeenCalled() + expect(runEventPipeline).not.toHaveBeenCalled() }) it('reroutes excess events to OVERFLOW topic', async () => { @@ -111,7 +114,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { }) // Event is not processed here - expect(queue.workerMethods.runEventPipeline).not.toHaveBeenCalled() + expect(runEventPipeline).not.toHaveBeenCalled() }) it('does not reroute if not over capacity limit', async () => { @@ -127,6 +130,6 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() // Event is processed - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() + expect(runEventPipeline).toHaveBeenCalled() }) }) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index d0a1224112391..bec5615dac5d3 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -7,6 +7,10 @@ import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' jest.mock('../../../src/utils/status') jest.mock('./../../../src/worker/ingestion/utils') +jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ + runEventPipeline: jest.fn().mockResolvedValue('default value'), +})) +import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' const captureEndpointEvent = { uuid: 'uuid1', @@ -53,9 +57,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => { }, db: 'database', }, - workerMethods: { - runEventPipeline: jest.fn(() => Promise.resolve({})), - }, } }) @@ -81,7 +82,7 @@ describe('eachBatchParallelIngestion with overflow consume', () => { ) // Event is processed - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() + expect(runEventPipeline).toHaveBeenCalled() }) it('does not raise ingestion warning when under threshold', async () => { @@ -99,6 +100,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => { expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled() // Event is processed - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() + expect(runEventPipeline).toHaveBeenCalled() }) }) diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index e2f8a9b018947..5698af2ade738 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -34,6 +34,11 @@ jest.mock('../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep', ( }) jest.mock('../../../src/utils/status') jest.mock('./../../../src/worker/ingestion/utils') +jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ + runEventPipeline: jest.fn().mockResolvedValue('default value'), + // runEventPipeline: jest.fn().mockRejectedValue('default value'), +})) +import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' const event: PostIngestionEvent = { eventUuid: 'uuid1', @@ -136,9 +141,6 @@ describe('eachBatchX', () => { }, pluginConfigsPerTeam: new Map(), }, - workerMethods: { - runEventPipeline: jest.fn(() => Promise.resolve({})), - }, } }) @@ -396,11 +398,18 @@ describe('eachBatchX', () => { }) describe('eachBatchParallelIngestion', () => { + let runEventPipelineSpy + beforeEach(() => { + runEventPipelineSpy = jest.spyOn( + require('./../../../src/worker/ingestion/event-pipeline/runner'), + 'runEventPipeline' + ) + }) it('calls runEventPipeline', async () => { const batch = createBatch(captureEndpointEvent) await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled) - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledWith({ + expect(runEventPipeline).toHaveBeenCalledWith(expect.anything(), { distinct_id: 'id', event: 'event', properties: {}, @@ -419,16 +428,16 @@ describe('eachBatchX', () => { it('fails the batch if runEventPipeline rejects', async () => { const batch = createBatch(captureEndpointEvent) - queue.workerMethods.runEventPipeline = jest.fn(() => Promise.reject('runEventPipeline nopes out')) + runEventPipelineSpy.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out')) await expect(eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe( 'runEventPipeline nopes out' ) - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledTimes(1) + expect(runEventPipeline).toHaveBeenCalledTimes(1) }) it('fails the batch if one deferred promise rejects', async () => { const batch = createBatch(captureEndpointEvent) - queue.workerMethods.runEventPipeline = jest.fn(() => + runEventPipelineSpy.mockImplementationOnce(() => Promise.resolve({ promises: [Promise.resolve(), Promise.reject('deferred nopes out')], }) @@ -436,7 +445,7 @@ describe('eachBatchX', () => { await expect(eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe( 'deferred nopes out' ) - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledTimes(1) + expect(runEventPipeline).toHaveBeenCalledTimes(1) }) it('batches events by team or token and distinct_id', () => { @@ -510,7 +519,7 @@ describe('eachBatchX', () => { ]) await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled) - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledTimes(14) + expect(runEventPipeline).toHaveBeenCalledTimes(14) expect(queue.pluginsServer.statsd.histogram).toHaveBeenCalledWith( 'ingest_event_batching.input_length', 14, diff --git a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts index fd1c38faa830f..d938b95078c68 100644 --- a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts @@ -5,21 +5,18 @@ import { Hub } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' -import { UUIDT } from '../../../src/utils/utils' -import { createTaskRunner } from '../../../src/worker/worker' +import { runEventPipeline } from '../../../src/worker/ingestion/event-pipeline/runner' import { createOrganization, createTeam, POSTGRES_DELETE_TABLES_QUERY } from '../../helpers/sql' describe('workerTasks.runEventPipeline()', () => { let hub: Hub let redis: Redis.Redis let closeHub: () => Promise - let piscinaTaskRunner: ({ task, args }) => Promise const OLD_ENV = process.env beforeAll(async () => { ;[hub, closeHub] = await createHub() redis = await hub.redisPool.acquire() - piscinaTaskRunner = createTaskRunner(hub) await hub.postgres.query(PostgresUse.COMMON_WRITE, POSTGRES_DELETE_TABLES_QUERY, undefined, '') // Need to clear the DB to avoid unique constraint violations on ids process.env = { ...OLD_ENV } // Make a copy }) @@ -52,18 +49,15 @@ describe('workerTasks.runEventPipeline()', () => { }) await expect( - piscinaTaskRunner({ - task: 'runEventPipeline', - args: { - event: { - distinctId: 'asdf', - ip: '', - team_id: teamId, - event: 'some event', - properties: {}, - eventUuid: new UUIDT().toString(), - }, - }, + runEventPipeline(hub, { + distinct_id: 'asdf', + ip: '', + team_id: teamId, + event: 'some event', + properties: {}, + site_url: 'https://example.com', + now: new Date().toISOString(), + uuid: 'uuid', }) ).rejects.toEqual(new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage))) pgQueryMock.mockRestore() diff --git a/plugin-server/tests/main/teardown.test.ts b/plugin-server/tests/main/teardown.test.ts index 01080aef62285..4e5e69371523b 100644 --- a/plugin-server/tests/main/teardown.test.ts +++ b/plugin-server/tests/main/teardown.test.ts @@ -1,8 +1,9 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { startPluginsServer } from '../../src/main/pluginsServer' -import { LogLevel } from '../../src/types' -import Piscina, { makePiscina } from '../../src/worker/piscina' +import { Hub, LogLevel } from '../../src/types' +import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner' +import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' import { getErrorForPluginConfig, resetTestDatabase } from '../helpers/sql' @@ -21,8 +22,8 @@ const defaultEvent: PluginEvent = { } describe('teardown', () => { - const processEvent = async (piscina: Piscina, event: PluginEvent) => { - const result = await piscina.run({ task: 'runEventPipeline', args: { event } }) + const processEvent = async (hub: Hub, event: PluginEvent) => { + const result = await runEventPipeline(hub, event) const resultEvent = result.args[0] return resultEvent } @@ -38,7 +39,7 @@ describe('teardown', () => { throw new Error('This Happened In The Teardown Palace') } `) - const { piscina, stop } = await startPluginsServer( + const { hub, stop } = await startPluginsServer( { WORKER_CONCURRENCY: 2, LOG_LEVEL: LogLevel.Log, @@ -50,7 +51,7 @@ describe('teardown', () => { const error1 = await getErrorForPluginConfig(pluginConfig39.id) expect(error1).toBe(null) - await processEvent(piscina!, defaultEvent) + await processEvent(hub!, defaultEvent) await stop?.() diff --git a/plugin-server/tests/worker/dead-letter-queue.test.ts b/plugin-server/tests/worker/dead-letter-queue.test.ts index b78e3dc9973c6..9de02801c4c3f 100644 --- a/plugin-server/tests/worker/dead-letter-queue.test.ts +++ b/plugin-server/tests/worker/dead-letter-queue.test.ts @@ -3,8 +3,8 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { Hub, LogLevel } from '../../src/types' import { createHub } from '../../src/utils/db/hub' import { UUIDT } from '../../src/utils/utils' +import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner' import { generateEventDeadLetterQueueMessage } from '../../src/worker/ingestion/utils' -import { workerTasks } from '../../src/worker/tasks' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse' import { resetTestDatabase } from '../helpers/sql' @@ -59,7 +59,7 @@ describe('events dead letter queue', () => { }) test('events get sent to dead letter queue on error', async () => { - const ingestResponse1 = await workerTasks.runEventPipeline(hub, { event: createEvent() }) + const ingestResponse1 = await runEventPipeline(hub, createEvent()) expect(ingestResponse1).toEqual({ lastStep: 'prepareEventStep', error: 'database unavailable',