Skip to content

Commit

Permalink
fix(plugin-server): wrap a few ingestion/consumer calls in simple ret…
Browse files Browse the repository at this point in the history
…ries
  • Loading branch information
bretthoerner committed Oct 19, 2023
1 parent a7ae3b0 commit 9c0beca
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 18 deletions.
14 changes: 8 additions & 6 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka'
import { exponentialBuckets, Gauge, Histogram } from 'prom-client'

import { retryIfRetriable } from '../utils/retries'
import { status } from '../utils/status'
import { createAdminClient, ensureTopicExists } from './admin'
import {
Expand Down Expand Up @@ -173,7 +174,9 @@ export const startBatchConsumer = async ({
try {
while (!isShuttingDown) {
status.debug('🔁', 'main_loop_consuming')
const messages = await consumeMessages(consumer, fetchBatchSize)
const messages = await retryIfRetriable(async () => {
return await consumeMessages(consumer, fetchBatchSize)
})

// It's important that we only set the `lastConsumeTime` after a successful consume
// call. Even if we received 0 messages, a successful call means we are actually
Expand Down Expand Up @@ -219,11 +222,10 @@ export const startBatchConsumer = async ({

clearInterval(statusLogInterval)

// Finally disconnect from the broker. I'm not 100% on if the offset
// commit is allowed to complete before completing, or if in fact
// disconnect itself handles committing offsets thus the previous
// `commit()` call is redundant, but it shouldn't hurt.
await Promise.all([disconnectConsumer(consumer)])
// Finally, disconnect from the broker. If stored offsets have changed via
// `storeOffsetsForMessages` above, they will be committed before shutdown (so long
// as this consumer is still part of the group).
await disconnectConsumer(consumer)
}
}

Expand Down
6 changes: 6 additions & 0 deletions plugin-server/src/kafka/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPart
return highestOffsets
}

/**
* Updates the offsets that will be committed on the next call to commit() (without offsets
* specified) or the next auto commit.
*
* This is a local (in-memory) operation and does not talk to the Kafka broker.
*/
export const storeOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => {
const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => {
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Message, MessageHeader } from 'node-rdkafka'
import { KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../config/kafka-topics'
import { Hub, PipelineEvent } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { retryIfRetriable } from '../../../utils/retries'
import { status } from '../../../utils/status'
import { ConfiguredLimiter, LoggingLimiter, WarningLimiter } from '../../../utils/token-bucket'
import { EventPipelineResult, runEventPipeline } from '../../../worker/ingestion/event-pipeline/runner'
Expand Down Expand Up @@ -98,10 +99,6 @@ export async function eachBatchParallelIngestion(
queue: IngestionConsumer,
overflowMode: IngestionOverflowMode
): Promise<void> {
async function eachMessage(event: PipelineEvent, queue: IngestionConsumer): Promise<IngestResult> {
return ingestEvent(queue.pluginsServer, event)
}

const batchStartTimer = new Date()
const metricKey = 'ingestion'
const loggingKey = `each_batch_parallel_ingestion`
Expand Down Expand Up @@ -154,15 +151,17 @@ export async function eachBatchParallelIngestion(
// Process every message sequentially, stash promises to await on later
for (const { message, pluginEvent } of currentBatch) {
try {
const result = await eachMessage(pluginEvent, queue)
const result = (await retryIfRetriable(async () => {
return await ingestEvent(queue.pluginsServer, pluginEvent)
})) as IngestResult

for (const promise of result.promises ?? []) {
result.promises?.forEach((promise) =>
processingPromises.push(
promise.catch(async (error) => {
await handleProcessingError(error, message, pluginEvent, queue)
})
)
}
)
} catch (error) {
await handleProcessingError(error, message, pluginEvent, queue)
}
Expand Down
23 changes: 23 additions & 0 deletions plugin-server/src/utils/retries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { runInTransaction } from '../sentry'
import { Hub } from '../types'
import { status } from '../utils/status'
import { AppMetricIdentifier, ErrorWithContext } from '../worker/ingestion/app-metrics'
import { sleep } from './utils'

// Simple retries in our code
export const defaultRetryConfig = {
Expand Down Expand Up @@ -153,3 +154,25 @@ export async function runRetriableFunction(retriableFunctionPayload: RetriableFu
},
})
}

/**
* Retry a function, respecting `error.isRetriable`.
*/
export async function retryIfRetriable<T>(fn: () => Promise<T>, retries = 3, sleepMs = 500): Promise<T> {
for (let i = 0; i < retries; i++) {
try {
return await fn()
} catch (error) {
if (error?.isRetriable === false || i === retries - 1) {
// Throw if the error is not retryable or if we're out of retries.
throw error
}

// Fall through, `fn` will retry after sleep.
await sleep(sleepMs)
}
}

// This should never happen, but TypeScript doesn't know that.
throw new Error('Unreachable error in retry')
}
4 changes: 4 additions & 0 deletions plugin-server/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,7 @@ export function getPropertyValueByPath(properties: Properties, [firstKey, ...nes
}
return value
}

export async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}
21 changes: 16 additions & 5 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,11 @@ describe('eachBatchX', () => {
)
})

it('fails the batch if runEventPipeline rejects', async () => {
it("doesn't fail the batch if runEventPipeline rejects once then succeeds on retry", async () => {
const batch = createBatch(captureEndpointEvent)
runEventPipelineSpy.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
await expect(eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe(
'runEventPipeline nopes out'
)
expect(runEventPipeline).toHaveBeenCalledTimes(1)
await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)
expect(runEventPipeline).toHaveBeenCalledTimes(2)
})

it('fails the batch if one deferred promise rejects', async () => {
Expand Down Expand Up @@ -531,5 +529,18 @@ describe('eachBatchX', () => {
key: 'ingestion',
})
})

it('fails the batch if runEventPipeline rejects repeatedly', async () => {
const batch = createBatch(captureEndpointEvent)
runEventPipelineSpy
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
await expect(eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe(
'runEventPipeline nopes out'
)
expect(runEventPipeline).toHaveBeenCalledTimes(3)
runEventPipelineSpy.mockRestore()
})
})
})

0 comments on commit 9c0beca

Please sign in to comment.