Skip to content

Commit

Permalink
cleanup: rename promises to ackPromises to make it more clear thats w…
Browse files Browse the repository at this point in the history
…hat they are
  • Loading branch information
bretthoerner committed Mar 22, 2024
1 parent 08e6605 commit 3ccfb85
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type IngestionSplitBatch = {
type IngestResult = {
// Promises that the batch handler should await on before committing offsets,
// contains the Kafka producer ACKs, to avoid blocking after every message.
promises?: Array<Promise<void>>
ackPromises?: Array<Promise<void>>
}

async function handleProcessingError(
Expand Down Expand Up @@ -166,7 +166,7 @@ export async function eachBatchParallelIngestion(
return await runner.runEventPipeline(pluginEvent)
})) as IngestResult

result.promises?.forEach((promise) =>
result.ackPromises?.forEach((promise) =>
processingPromises.push(
promise.catch(async (error) => {
await handleProcessingError(error, message, pluginEvent, queue)
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { processPersonsStep } from './processPersonsStep'
export type EventPipelineResult = {
// Promises that the batch handler should await on before committing offsets,
// contains the Kafka producer ACKs, to avoid blocking after every message.
promises?: Array<Promise<void>>
ackPromises?: Array<Promise<void>>
// Only used in tests
// TODO: update to test for side-effects of running the pipeline rather than
// this return type.
Expand Down Expand Up @@ -135,9 +135,9 @@ export class EventPipelineRunner {
return this.registerLastStep('createEventStep', [rawClickhouseEvent, person], [eventAck])
}

registerLastStep(stepName: string, args: any[], promises?: Array<Promise<void>>): EventPipelineResult {
registerLastStep(stepName: string, args: any[], ackPromises?: Array<Promise<void>>): EventPipelineResult {
pipelineLastStepCounter.labels(stepName).inc()
return { promises: promises, lastStep: stepName, args }
return { ackPromises, lastStep: stepName, args }
}

protected runStep<Step extends (...args: any[]) => any>(
Expand Down

0 comments on commit 3ccfb85

Please sign in to comment.