Skip to content

Commit

Permalink
Fix up acks
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent 8126704 commit d0bef85
Show file tree
Hide file tree
Showing 32 changed files with 39 additions and 133 deletions.
1 change: 0 additions & 1 deletion plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ export const capture = async ({
})
),
key: teamId ? teamId.toString() : '',
waitForAck: true,
})
}

Expand Down
6 changes: 3 additions & 3 deletions plugin-server/functional_tests/jobs-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: null, key, waitForAck: true })
await produce({ topic: 'jobs', message: null, key })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key, waitForAck: true })
await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -72,7 +72,7 @@ describe('dlq handling', () => {
labels: { topic: 'jobs', partition: '0', groupId: 'jobs-inserter' },
})

await produce({ topic: 'jobs', message: Buffer.from(''), key: '', waitForAck: true })
await produce({ topic: 'jobs', message: Buffer.from(''), key: '' })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
14 changes: 2 additions & 12 deletions plugin-server/functional_tests/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,8 @@ export async function createKafkaProducer() {
return producer
}

export async function produce({
topic,
message,
key,
waitForAck,
}: {
topic: string
message: Buffer | null
key: string
waitForAck: boolean
}) {
export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) {
producer = producer ?? (await createKafkaProducer())

await new KafkaProducerWrapper(producer).produce({ topic, value: message, key: Buffer.from(key), waitForAck })
await new KafkaProducerWrapper(producer).produce({ topic, value: message, key: Buffer.from(key) })
}
10 changes: 5 additions & 5 deletions plugin-server/functional_tests/scheduled-tasks-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: null, key, waitForAck: true })
await produce({ topic: 'scheduled_tasks', message: null, key })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key, waitForAck: true })
await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -69,7 +69,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 })),
key,
waitForAck: true,
,
})

await waitForExpect(() => {
Expand All @@ -85,7 +85,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' })),
key,
waitForAck: true,
,
})

await waitForExpect(() => {
Expand All @@ -106,7 +106,7 @@ describe('dlq handling', () => {

// NOTE: we don't actually care too much about the contents of the
// message, just that it triggeres the consumer to try to process it.
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '', waitForAck: true })
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '' })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/functional_tests/session-recordings.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ test.skip('consumer updates timestamp exported to prometheus', async () => {
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(''),
key: '',
waitForAck: true,
})

await waitForExpect(async () => {
Expand Down Expand Up @@ -250,15 +249,13 @@ test.skip(`handles message with no token or with token and no associated team_id
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(JSON.stringify({ uuid: noTokenUuid, data: JSON.stringify({}) })),
key: noTokenKey,
waitForAck: true,
})
await produce({
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(
JSON.stringify({ uuid: noAssociatedTeamUuid, token: 'no associated team', data: JSON.stringify({}) })
),
key: noAssociatedTeamKey,
waitForAck: true,
})

await capture(makeSessionMessage(teamId, 'should be ingested'))
Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ abstract class CdpConsumerBase {
},
],
})),
waitForAck: true,
}).catch((reason) => {
status.error('⚠️', `failed to produce message: ${reason}`)
})
Expand Down
41 changes: 10 additions & 31 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,19 @@ export class KafkaProducerWrapper {
key,
topic,
headers,
waitForAck,
}: {
value: MessageValue
key: MessageKey
topic: string
headers?: MessageHeader[]
waitForAck: boolean
}): Promise<void> {
try {
const produceTimer = ingestEventKafkaProduceLatency.labels({ topic }).startTimer()
const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' })
kafkaProducerMessagesQueuedCounter.labels({ topic_name: topic }).inc()

status.debug('📤', 'Producing message', { topic: topic })
const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' })
return await new Promise((resolve, reject) => {
const produceTimer = ingestEventKafkaProduceLatency
.labels({ topic, waitForAck: waitForAck.toString() })
.startTimer()

const result = await new Promise((resolve, reject) => {
this.producer.produce(
topic,
null,
Expand All @@ -117,28 +112,15 @@ export class KafkaProducerWrapper {
Date.now(),
headers ?? [],
(error: any, offset: NumberNullUndefined) => {
produceSpan?.finish()
kafkaProducerMessagesWrittenCounter.labels({ topic_name: topic }).inc()
if (error) {
status.error('⚠️', 'produce_error', { error: error, topic: topic })
} else {
status.debug('📤', 'Produced message', { topic: topic, offset: offset })
}

if (waitForAck) {
produceTimer()
// TODO: Modify this to return the offset in a way that works across the codebase
return error ? reject(error) : resolve()
}
return error ? reject(error) : resolve(offset)
}
)

// If we're not waiting for an ack, we can resolve immediately
if (!waitForAck) {
resolve(undefined)
produceTimer()
}
})

produceSpan?.finish()
kafkaProducerMessagesWrittenCounter.labels({ topic_name: topic }).inc()
status.debug('📤', 'Produced message', { topic: topic, offset: result })
produceTimer()
} catch (error) {
kafkaProducerMessagesFailedCounter.labels({ topic_name: topic }).inc()
status.error('⚠️', 'kafka_produce_error', { error: error, topic: topic })
Expand All @@ -157,10 +139,8 @@ export class KafkaProducerWrapper {

async queueMessages({
kafkaMessages: kafkaMessage,
waitForAck,
}: {
kafkaMessages: ProducerRecord | ProducerRecord[]
waitForAck: boolean
}): Promise<void> {
const records = Array.isArray(kafkaMessage) ? kafkaMessage : [kafkaMessage]

Expand All @@ -173,7 +153,6 @@ export class KafkaProducerWrapper {
key: message.key ? Buffer.from(message.key) : null,
value: message.value ? Buffer.from(message.value) : null,
headers: convertKafkaJSHeadersToRdKafkaHeaders(message.headers),
waitForAck: waitForAck,
})
)
)
Expand Down Expand Up @@ -255,6 +234,6 @@ export const kafkaProducerMessagesFailedCounter = new Counter({
export const ingestEventKafkaProduceLatency = new Summary({
name: 'ingest_event_kafka_produce_latency',
help: 'Wait time for individual Kafka produces',
labelNames: ['topic', 'waitForAck'],
labelNames: ['topic'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})
1 change: 0 additions & 1 deletion plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export async function runScheduledTasks(
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
},
waitForAck: true,
})
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ async function handleProcessingError(
value: message.value,
key: message.key ?? null, // avoid undefined, just to be safe
headers: headers,
waitForAck: true,
})
} catch (error) {
// If we can't send to the DLQ and it's not retriable, just continue. We'll commit the
Expand Down Expand Up @@ -282,7 +281,6 @@ async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]
// instead as that behavior is safer.
key: useRandomPartitioning ? null : message.key ?? null,
headers: message.headers,
waitForAck: true,
})
)
)
Expand Down
2 changes: 0 additions & 2 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ export const startJobsConsumer = async ({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand All @@ -79,7 +78,6 @@ export const startJobsConsumer = async ({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand All @@ -190,7 +189,6 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand All @@ -202,7 +200,6 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ export class ConsoleLogsIngester {
key: event.session_id,
})),
},
waitForAck: true,
}),
]
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ export class ReplayEventsIngester {
},
],
},
waitForAck: true,
}),
]
} catch (error) {
Expand Down
5 changes: 2 additions & 3 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ export class DB {
})
}

await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
await this.kafkaProducer.queueMessages({ kafkaMessages })
return person
}

Expand Down Expand Up @@ -881,7 +881,7 @@ export class DB {
): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version, tx)
if (kafkaMessages.length) {
await this.kafkaProducer.queueMessages({ kafkaMessages: kafkaMessages, waitForAck: true })
await this.kafkaProducer.queueMessages({ kafkaMessages })
}
}

Expand Down Expand Up @@ -1435,7 +1435,6 @@ export class DB {
},
],
},
waitForAck: true,
})
}

Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ export async function createHub(
},
],
},
waitForAck: true,
})
}

Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ export class AppMetrics {
topic: KAFKA_APP_METRICS,
messages: kafkaMessages,
},
waitForAck: true,
})
status.debug('🚽', `Finished flushing app metrics, took ${Date.now() - startTime}ms`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ export async function extractHeatmapDataStep(
value: JSON.stringify(rawEvent),
})),
},
waitForAck: true,
})
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export function produceExceptionSymbolificationEventStep(
},
],
},
waitForAck: true,
})
.catch((error) => {
status.warn('⚠️', 'Failed to produce exception event for symbolification', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ export class EventPipelineRunner {
teamId,
`plugin_server_ingest_event:${currentStepName}`
)
await this.hub.db.kafkaProducer!.queueMessages({ kafkaMessages: message, waitForAck: true })
await this.hub.db.kafkaProducer!.queueMessages({ kafkaMessages: message })
} catch (dlqError) {
status.info('🔔', `Errored trying to add event to dead letter queue. Error: ${dlqError}`)
Sentry.captureException(dlqError, {
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ export class PersonState {

if (Object.keys(update).length > 0) {
const [updatedPerson, kafkaMessages] = await this.db.updatePersonDeprecated(person, update)
const kafkaAck = this.db.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
const kafkaAck = this.db.kafkaProducer.queueMessages({ kafkaMessages })
return [updatedPerson, kafkaAck]
}

Expand Down Expand Up @@ -768,7 +768,7 @@ export class PersonState {
})
.inc()

const kafkaAck = this.db.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
const kafkaAck = this.db.kafkaProducer.queueMessages({ kafkaMessages })

return [mergedPerson, kafkaAck]
}
Expand Down
Loading

0 comments on commit d0bef85

Please sign in to comment.