Skip to content

Commit

Permalink
add Kafka produce/ack metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Mar 22, 2024
1 parent 0e84220 commit d112f0a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 0 deletions.
10 changes: 10 additions & 0 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
NumberNullUndefined,
ProducerGlobalConfig,
} from 'node-rdkafka'
import { Summary } from 'prom-client'

import { getSpan } from '../sentry'
import { status } from '../utils/status'
Expand All @@ -17,6 +18,13 @@ export type KafkaProducerConfig = {
KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES: number
}

export const ingestEventKafkaAckWait = new Summary({
name: 'ingest_event_kafka_ack_wait',
help: 'Wait time for individual Kafka produces that await their ACKs',
labelNames: ['topic'],
percentiles: [0.5, 0.9, 0.95, 0.99],
})

// Kafka production related functions using node-rdkafka.
export const createKafkaProducer = async (globalConfig: ProducerGlobalConfig, producerConfig: KafkaProducerConfig) => {
const producer = new RdKafkaProducer({
Expand Down Expand Up @@ -84,6 +92,7 @@ export const produce = async ({
const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' })
return await new Promise((resolve, reject) => {
if (waitForAck) {
const ackTimer = ingestEventKafkaAckWait.labels(topic).startTimer()
producer.produce(
topic,
null,
Expand All @@ -100,6 +109,7 @@ export const produce = async ({
resolve(offset)
}

ackTimer()
produceSpan?.finish()
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import {
ingestEventBatchingBatchCountSummary,
ingestEventBatchingInputLengthSummary,
ingestEventEachBatchKafkaAckWait,
ingestionOverflowingMessagesTotal,
ingestionParallelism,
ingestionParallelismPotential,
Expand Down Expand Up @@ -227,7 +228,9 @@ export async function eachBatchParallelIngestion(
// impact the success. Delaying ACKs allows the producer to write in big batches for
// better throughput and lower broker load.
const awaitSpan = transaction.startChild({ op: 'awaitACKs', data: { promiseCount: processingPromises.length } })
const kafkaAckWaitMetric = ingestEventEachBatchKafkaAckWait.startTimer()
await Promise.all(processingPromises)
kafkaAckWaitMetric()
awaitSpan.finish()

for (const message of messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ export const ingestEventBatchingBatchCountSummary = new Summary({
help: 'Number of batches of events',
percentiles: [0.5, 0.9, 0.95, 0.99],
})

export const ingestEventEachBatchKafkaAckWait = new Summary({
name: 'ingest_event_each_batch_kafka_ack_wait',
help: 'Wait time for the batch of Kafka ACKs at the end of eachBatchParallelIngestion',
percentiles: [0.5, 0.9, 0.95, 0.99],
})

0 comments on commit d112f0a

Please sign in to comment.