diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 749e41c18c335..ca333926c7b2b 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -41,7 +41,7 @@ type IngestionSplitBatch = { type IngestResult = { // Promises that the batch handler should await on before committing offsets, // contains the Kafka producer ACKs, to avoid blocking after every message. - promises?: Array> + ackPromises?: Array> } async function handleProcessingError( @@ -166,7 +166,7 @@ export async function eachBatchParallelIngestion( return await runner.runEventPipeline(pluginEvent) })) as IngestResult - result.promises?.forEach((promise) => + result.ackPromises?.forEach((promise) => processingPromises.push( promise.catch(async (error) => { await handleProcessingError(error, message, pluginEvent, queue) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 3d860e4064ced..9ecc377dc9f76 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -25,7 +25,7 @@ import { processPersonsStep } from './processPersonsStep' export type EventPipelineResult = { // Promises that the batch handler should await on before committing offsets, // contains the Kafka producer ACKs, to avoid blocking after every message. - promises?: Array> + ackPromises?: Array> // Only used in tests // TODO: update to test for side-effects of running the pipeline rather than // this return type. @@ -135,9 +135,9 @@ export class EventPipelineRunner { return this.registerLastStep('createEventStep', [rawClickhouseEvent, person], [eventAck]) } - registerLastStep(stepName: string, args: any[], promises?: Array>): EventPipelineResult { + registerLastStep(stepName: string, args: any[], ackPromises?: Array>): EventPipelineResult { pipelineLastStepCounter.labels(stepName).inc() - return { promises: promises, lastStep: stepName, args } + return { ackPromises, lastStep: stepName, args } } protected runStep any>(