Skip to content

Commit

Permalink
chore(plugin-server): remove INGESTION_DELAY_WRITE_ACKS
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Oct 12, 2023
1 parent 3528b66 commit 54ec373
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ afterAll(async () => {
await dlqConsumer.disconnect()
})

test.concurrent('consumer handles messages just less than 1MB gracefully', async () => {
test.concurrent('consumer handles messages just over 1MB gracefully', async () => {
// For this we basically want the plugin-server to try and produce a new
// message larger than 1MB. We do this by creating a person with a lot of
// properties. We will end up denormalizing the person properties onto the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,59 @@ type IngestResult = {
promises?: Array<Promise<void>>
}

async function handleProcessingError(
error: any,
message: Message,
pluginEvent: PipelineEvent,
queue: IngestionConsumer
) {
status.error('🔥', `Error processing message`, {
stack: error.stack,
error: error,
})

// If the error is a non-retriable error, push to the dlq and commit the offset. Else raise the
// error.
//
// NOTE: there is behavior to push to a DLQ at the moment within EventPipelineRunner. This
// doesn't work so well with e.g. messages that when sent to the DLQ is it's self too large.
// Here we explicitly do _not_ add any additional metadata to the message. We might want to add
// some metadata to the message e.g. in the header or reference e.g. the sentry event id.
//
// TODO: property abstract out this `isRetriable` error logic. This is currently relying on the
// fact that node-rdkafka adheres to the `isRetriable` interface.
if (error?.isRetriable !== false) {
throw error
}

const sentryEventId = Sentry.captureException(error)
const headers: MessageHeader[] = message.headers ?? []
headers.push({ ['sentry-event-id']: sentryEventId })
headers.push({ ['event-id']: pluginEvent.uuid })
try {
await queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
value: message.value,
key: message.key,
headers: headers,
waitForAck: true,
})
} catch (error) {
// If we can't send to the DLQ and it's not retriable, just continue. We'll commit the
// offset and move on.
if (error?.isRetriable === false) {
status.error('🔥', `Error pushing to DLQ`, {
stack: error.stack,
error: error,
})
return
}

// If we can't send to the DLQ and it is retriable, raise the error.
throw error
}
}

export async function eachBatchParallelIngestion(
messages: Message[],
queue: IngestionConsumer,
Expand Down Expand Up @@ -102,62 +155,15 @@ export async function eachBatchParallelIngestion(
for (const { message, pluginEvent } of currentBatch) {
try {
const result = await eachMessage(pluginEvent, queue)
if (result.promises) {
processingPromises.push(...result.promises)
}
} catch (error) {
status.error('🔥', `Error processing message`, {
stack: error.stack,
error: error,
})

// If there error is a non-retriable error, push
// to the dlq and commit the offset. Else raise the
// error.
//
// NOTE: there is behavior to push to a DLQ at the
// moment within EventPipelineRunner. This doesn't work
// so well with e.g. messages that when sent to the DLQ
// is it's self too large. Here we explicitly do _not_
// add any additional metadata to the message. We might
// want to add some metadata to the message e.g. in the
// header or reference e.g. the sentry event id.
//
// TODO: property abstract out this `isRetriable` error
// logic. This is currently relying on the fact that
// node-rdkafka adheres to the `isRetriable` interface.
if (error?.isRetriable === false) {
const sentryEventId = Sentry.captureException(error)
const headers: MessageHeader[] = message.headers ?? []
headers.push({ ['sentry-event-id']: sentryEventId })
headers.push({ ['event-id']: pluginEvent.uuid })
try {
await queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
value: message.value,
key: message.key,
headers: headers,
waitForAck: true,
for (const promise of result.promises ?? []) {
processingPromises.push(
promise.catch(async (error) => {
await handleProcessingError(error, message, pluginEvent, queue)
})
} catch (error) {
// If we can't send to the DLQ and it's not
// retriable, just continue. We'll commit the
// offset and move on.
if (error?.isRetriable === false) {
status.error('🔥', `Error pushing to DLQ`, {
stack: error.stack,
error: error,
})
continue
}

// If we can't send to the DLQ and it is
// retriable, raise the error.
throw error
}
} else {
throw error
)
}
} catch (error) {
await handleProcessingError(error, message, pluginEvent, queue)
}
}

Expand Down
13 changes: 2 additions & 11 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { runInSpan } from '../../../sentry'
import { Hub, PipelineEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { timeoutGuard } from '../../../utils/db/utils'
import { stringToBoolean } from '../../../utils/env-utils'
import { status } from '../../../utils/status'
import { generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
Expand Down Expand Up @@ -64,17 +63,13 @@ export class EventPipelineRunner {

// See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
poEEmbraceJoin: boolean
private delayAcks: boolean
private eventsToDropByToken: Map<string, string[]>

constructor(hub: Hub, originalEvent: PipelineEvent | ProcessedPluginEvent, poEEmbraceJoin = false) {
this.hub = hub
this.originalEvent = originalEvent
this.poEEmbraceJoin = poEEmbraceJoin

// TODO: remove after successful rollout
this.delayAcks = stringToBoolean(process.env.INGESTION_DELAY_WRITE_ACKS)

this.eventsToDropByToken = new Map()
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID?.split(',').forEach((pair) => {
const [token, distinctID] = pair.split(':')
Expand Down Expand Up @@ -157,12 +152,8 @@ export class EventPipelineRunner {
[this, preparedEvent, person],
event.team_id
)
if (this.delayAcks) {
return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person], [eventAck])
} else {
await eventAck
return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person])
}

return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person], [eventAck])
}

registerLastStep(
Expand Down

0 comments on commit 54ec373

Please sign in to comment.