Skip to content

Commit

Permalink
chore(plugin-server): kafka ack cleanup and metric (#21111)
Browse files Browse the repository at this point in the history
* cleanup: remove unused team arg from registerLastStep

* cleanup: rename promises to ackPromises to make it more clear thats what they are

* cleanup(plugin-server): make waitForAck explicit/required

* add Kafka produce/ack metrics

* Clarify Kafka produce metric/labels
  • Loading branch information
bretthoerner authored Mar 25, 2024
1 parent 2389be4 commit 30bafdd
Show file tree
Hide file tree
Showing 31 changed files with 372 additions and 248 deletions.
1 change: 1 addition & 0 deletions plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export const capture = async ({
})
),
key: teamId ? teamId.toString() : '',
waitForAck: true,
})
}

Expand Down
6 changes: 3 additions & 3 deletions plugin-server/functional_tests/jobs-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: null, key })
await produce({ topic: 'jobs', message: null, key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key })
await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -72,7 +72,7 @@ describe('dlq handling', () => {
labels: { topic: 'jobs', partition: '0', groupId: 'jobs-inserter' },
})

await produce({ topic: 'jobs', message: Buffer.from(''), key: '' })
await produce({ topic: 'jobs', message: Buffer.from(''), key: '', waitForAck: true })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
14 changes: 12 additions & 2 deletions plugin-server/functional_tests/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@ export async function createKafkaProducer() {
return producer
}

export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) {
export async function produce({
topic,
message,
key,
waitForAck,
}: {
topic: string
message: Buffer | null
key: string
waitForAck: boolean
}) {
producer = producer ?? (await createKafkaProducer())
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key) })
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key), waitForAck })
}
8 changes: 5 additions & 3 deletions plugin-server/functional_tests/scheduled-tasks-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: null, key })
await produce({ topic: 'scheduled_tasks', message: null, key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key })
await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -69,6 +69,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 })),
key,
waitForAck: true,
})

await waitForExpect(() => {
Expand All @@ -84,6 +85,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' })),
key,
waitForAck: true,
})

await waitForExpect(() => {
Expand All @@ -104,7 +106,7 @@ describe('dlq handling', () => {

// NOTE: we don't actually care too much about the contents of the
// message, just that it triggeres the consumer to try to process it.
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '' })
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '', waitForAck: true })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
9 changes: 8 additions & 1 deletion plugin-server/functional_tests/session-recordings.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ test.skip('consumer updates timestamp exported to prometheus', async () => {
},
})

await produce({ topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, message: Buffer.from(''), key: '' })
await produce({
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(''),
key: '',
waitForAck: true,
})

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down Expand Up @@ -245,13 +250,15 @@ test.skip(`handles message with no token or with token and no associated team_id
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(JSON.stringify({ uuid: noTokenUuid, data: JSON.stringify({}) })),
key: noTokenKey,
waitForAck: true,
})
await produce({
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(
JSON.stringify({ uuid: noAssociatedTeamUuid, token: 'no associated team', data: JSON.stringify({}) })
),
key: noAssociatedTeamKey,
waitForAck: true,
})

await capture(makeSessionMessage(teamId, 'should be ingested'))
Expand Down
18 changes: 16 additions & 2 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
NumberNullUndefined,
ProducerGlobalConfig,
} from 'node-rdkafka'
import { Summary } from 'prom-client'

import { getSpan } from '../sentry'
import { status } from '../utils/status'
Expand All @@ -17,6 +18,13 @@ export type KafkaProducerConfig = {
KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES: number
}

export const ingestEventKafkaProduceLatency = new Summary({
name: 'ingest_event_kafka_produce_latency',
help: 'Wait time for individual Kafka produces',
labelNames: ['topic', 'waitForAck'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})

// Kafka production related functions using node-rdkafka.
export const createKafkaProducer = async (globalConfig: ProducerGlobalConfig, producerConfig: KafkaProducerConfig) => {
const producer = new RdKafkaProducer({
Expand Down Expand Up @@ -71,18 +79,22 @@ export const produce = async ({
value,
key,
headers = [],
waitForAck = true,
waitForAck,
}: {
producer: RdKafkaProducer
topic: string
value: MessageValue
key: MessageKey
headers?: MessageHeader[]
waitForAck?: boolean
waitForAck: boolean
}): Promise<number | null | undefined> => {
status.debug('📤', 'Producing message', { topic: topic })
const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' })
return await new Promise((resolve, reject) => {
const produceTimer = ingestEventKafkaProduceLatency
.labels({ topic, waitForAck: waitForAck.toString() })
.startTimer()

if (waitForAck) {
producer.produce(
topic,
Expand All @@ -100,6 +112,7 @@ export const produce = async ({
resolve(offset)
}

produceTimer()
produceSpan?.finish()
}
)
Expand All @@ -112,6 +125,7 @@ export const produce = async ({
produceSpan?.finish()
})
resolve(undefined)
produceTimer()
}
})
}
Expand Down
7 changes: 5 additions & 2 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ export async function runScheduledTasks(
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
await server.kafkaProducer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
},
waitForAck: true,
})
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import {
ingestEventBatchingBatchCountSummary,
ingestEventBatchingInputLengthSummary,
ingestEventEachBatchKafkaAckWait,
ingestionOverflowingMessagesTotal,
ingestionParallelism,
ingestionParallelismPotential,
Expand All @@ -41,7 +42,7 @@ type IngestionSplitBatch = {
type IngestResult = {
// Promises that the batch handler should await on before committing offsets,
// contains the Kafka producer ACKs, to avoid blocking after every message.
promises?: Array<Promise<void>>
ackPromises?: Array<Promise<void>>
}

async function handleProcessingError(
Expand Down Expand Up @@ -166,7 +167,7 @@ export async function eachBatchParallelIngestion(
return await runner.runEventPipeline(pluginEvent)
})) as IngestResult

result.promises?.forEach((promise) =>
result.ackPromises?.forEach((promise) =>
processingPromises.push(
promise.catch(async (error) => {
await handleProcessingError(error, message, pluginEvent, queue)
Expand Down Expand Up @@ -227,7 +228,9 @@ export async function eachBatchParallelIngestion(
// impact the success. Delaying ACKs allows the producer to write in big batches for
// better throughput and lower broker load.
const awaitSpan = transaction.startChild({ op: 'awaitACKs', data: { promiseCount: processingPromises.length } })
const kafkaAckWaitMetric = ingestEventEachBatchKafkaAckWait.startTimer()
await Promise.all(processingPromises)
kafkaAckWaitMetric()
awaitSpan.finish()

for (const message of messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ export const ingestEventBatchingBatchCountSummary = new Summary({
help: 'Number of batches of events',
percentiles: [0.5, 0.9, 0.95, 0.99],
})

export const ingestEventEachBatchKafkaAckWait = new Summary({
name: 'ingest_event_each_batch_kafka_ack_wait',
help: 'Wait time for the batch of Kafka ACKs at the end of eachBatchParallelIngestion',
percentiles: [0.5, 0.9, 0.95, 0.99],
})
14 changes: 10 additions & 4 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand All @@ -71,8 +74,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
value: message.value,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand All @@ -181,17 +184,23 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
error: error.stack ?? error,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}

if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export class ConsoleLogsIngester {
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(cle)),
key: event.session_id,
waitForAck: true,
})
)
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class ReplayEventsIngester {
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
key: event.session_id,
waitForAck: true,
}),
]
} catch (error) {
Expand Down
Loading

0 comments on commit 30bafdd

Please sign in to comment.