Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(plugin-server): kafka ack cleanup and metric #21111

Merged
merged 5 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export const capture = async ({
})
),
key: teamId ? teamId.toString() : '',
waitForAck: true,
})
}

Expand Down
6 changes: 3 additions & 3 deletions plugin-server/functional_tests/jobs-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: null, key })
await produce({ topic: 'jobs', message: null, key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key })
await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -72,7 +72,7 @@ describe('dlq handling', () => {
labels: { topic: 'jobs', partition: '0', groupId: 'jobs-inserter' },
})

await produce({ topic: 'jobs', message: Buffer.from(''), key: '' })
await produce({ topic: 'jobs', message: Buffer.from(''), key: '', waitForAck: true })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
14 changes: 12 additions & 2 deletions plugin-server/functional_tests/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@ export async function createKafkaProducer() {
return producer
}

export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) {
export async function produce({
topic,
message,
key,
waitForAck,
}: {
topic: string
message: Buffer | null
key: string
waitForAck: boolean
}) {
producer = producer ?? (await createKafkaProducer())
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key) })
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key), waitForAck })
}
8 changes: 5 additions & 3 deletions plugin-server/functional_tests/scheduled-tasks-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: null, key })
await produce({ topic: 'scheduled_tasks', message: null, key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key })
await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -69,6 +69,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 })),
key,
waitForAck: true,
})

await waitForExpect(() => {
Expand All @@ -84,6 +85,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' })),
key,
waitForAck: true,
})

await waitForExpect(() => {
Expand All @@ -104,7 +106,7 @@ describe('dlq handling', () => {

// NOTE: we don't actually care too much about the contents of the
// message, just that it triggeres the consumer to try to process it.
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '' })
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '', waitForAck: true })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
9 changes: 8 additions & 1 deletion plugin-server/functional_tests/session-recordings.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ test.skip('consumer updates timestamp exported to prometheus', async () => {
},
})

await produce({ topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, message: Buffer.from(''), key: '' })
await produce({
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(''),
key: '',
waitForAck: true,
})

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down Expand Up @@ -245,13 +250,15 @@ test.skip(`handles message with no token or with token and no associated team_id
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(JSON.stringify({ uuid: noTokenUuid, data: JSON.stringify({}) })),
key: noTokenKey,
waitForAck: true,
})
await produce({
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(
JSON.stringify({ uuid: noAssociatedTeamUuid, token: 'no associated team', data: JSON.stringify({}) })
),
key: noAssociatedTeamKey,
waitForAck: true,
})

await capture(makeSessionMessage(teamId, 'should be ingested'))
Expand Down
18 changes: 16 additions & 2 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
NumberNullUndefined,
ProducerGlobalConfig,
} from 'node-rdkafka'
import { Summary } from 'prom-client'

import { getSpan } from '../sentry'
import { status } from '../utils/status'
Expand All @@ -17,6 +18,13 @@ export type KafkaProducerConfig = {
KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES: number
}

export const ingestEventKafkaProduceLatency = new Summary({
name: 'ingest_event_kafka_produce_latency',
help: 'Wait time for individual Kafka produces',
labelNames: ['topic', 'waitForAck'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})

// Kafka production related functions using node-rdkafka.
export const createKafkaProducer = async (globalConfig: ProducerGlobalConfig, producerConfig: KafkaProducerConfig) => {
const producer = new RdKafkaProducer({
Expand Down Expand Up @@ -71,18 +79,22 @@ export const produce = async ({
value,
key,
headers = [],
waitForAck = true,
waitForAck,
}: {
producer: RdKafkaProducer
topic: string
value: MessageValue
key: MessageKey
headers?: MessageHeader[]
waitForAck?: boolean
waitForAck: boolean
}): Promise<number | null | undefined> => {
status.debug('📤', 'Producing message', { topic: topic })
const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' })
return await new Promise((resolve, reject) => {
const produceTimer = ingestEventKafkaProduceLatency
.labels({ topic, waitForAck: waitForAck.toString() })
.startTimer()

if (waitForAck) {
producer.produce(
topic,
Expand All @@ -100,6 +112,7 @@ export const produce = async ({
resolve(offset)
}

produceTimer()
produceSpan?.finish()
}
)
Expand All @@ -112,6 +125,7 @@ export const produce = async ({
produceSpan?.finish()
})
resolve(undefined)
produceTimer()
}
})
}
Expand Down
7 changes: 5 additions & 2 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ export async function runScheduledTasks(
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
await server.kafkaProducer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
},
waitForAck: true,
})
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import {
ingestEventBatchingBatchCountSummary,
ingestEventBatchingInputLengthSummary,
ingestEventEachBatchKafkaAckWait,
ingestionOverflowingMessagesTotal,
ingestionParallelism,
ingestionParallelismPotential,
Expand All @@ -41,7 +42,7 @@ type IngestionSplitBatch = {
type IngestResult = {
// Promises that the batch handler should await on before committing offsets,
// contains the Kafka producer ACKs, to avoid blocking after every message.
promises?: Array<Promise<void>>
ackPromises?: Array<Promise<void>>
}

async function handleProcessingError(
Expand Down Expand Up @@ -166,7 +167,7 @@ export async function eachBatchParallelIngestion(
return await runner.runEventPipeline(pluginEvent)
})) as IngestResult

result.promises?.forEach((promise) =>
result.ackPromises?.forEach((promise) =>
processingPromises.push(
promise.catch(async (error) => {
await handleProcessingError(error, message, pluginEvent, queue)
Expand Down Expand Up @@ -227,7 +228,9 @@ export async function eachBatchParallelIngestion(
// impact the success. Delaying ACKs allows the producer to write in big batches for
// better throughput and lower broker load.
const awaitSpan = transaction.startChild({ op: 'awaitACKs', data: { promiseCount: processingPromises.length } })
const kafkaAckWaitMetric = ingestEventEachBatchKafkaAckWait.startTimer()
await Promise.all(processingPromises)
kafkaAckWaitMetric()
awaitSpan.finish()

for (const message of messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ export const ingestEventBatchingBatchCountSummary = new Summary({
help: 'Number of batches of events',
percentiles: [0.5, 0.9, 0.95, 0.99],
})

export const ingestEventEachBatchKafkaAckWait = new Summary({
name: 'ingest_event_each_batch_kafka_ack_wait',
help: 'Wait time for the batch of Kafka ACKs at the end of eachBatchParallelIngestion',
percentiles: [0.5, 0.9, 0.95, 0.99],
})
14 changes: 10 additions & 4 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand All @@ -71,8 +74,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
value: message.value,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand All @@ -181,17 +184,23 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
error: error.stack ?? error,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}

if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export class ConsoleLogsIngester {
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(cle)),
key: event.session_id,
waitForAck: true,
})
)
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class ReplayEventsIngester {
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
key: event.session_id,
waitForAck: true,
}),
]
} catch (error) {
Expand Down
Loading
Loading