From d112f0a95001c25bb53554d96bb266d40d0baf90 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Fri, 22 Mar 2024 14:50:10 -0600 Subject: [PATCH] add Kafka produce/ack metrics --- plugin-server/src/kafka/producer.ts | 10 ++++++++++ .../batch-processing/each-batch-ingestion.ts | 3 +++ .../main/ingestion-queues/batch-processing/metrics.ts | 6 ++++++ 3 files changed, 19 insertions(+) diff --git a/plugin-server/src/kafka/producer.ts b/plugin-server/src/kafka/producer.ts index a5ed4e072cb55..c75e012328b9e 100644 --- a/plugin-server/src/kafka/producer.ts +++ b/plugin-server/src/kafka/producer.ts @@ -7,6 +7,7 @@ import { NumberNullUndefined, ProducerGlobalConfig, } from 'node-rdkafka' +import { Summary } from 'prom-client' import { getSpan } from '../sentry' import { status } from '../utils/status' @@ -17,6 +18,13 @@ export type KafkaProducerConfig = { KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES: number } +export const ingestEventKafkaAckWait = new Summary({ + name: 'ingest_event_kafka_ack_wait', + help: 'Wait time for individual Kafka produces that await their ACKs', + labelNames: ['topic'], + percentiles: [0.5, 0.9, 0.95, 0.99], +}) + // Kafka production related functions using node-rdkafka. export const createKafkaProducer = async (globalConfig: ProducerGlobalConfig, producerConfig: KafkaProducerConfig) => { const producer = new RdKafkaProducer({ @@ -84,6 +92,7 @@ export const produce = async ({ const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' }) return await new Promise((resolve, reject) => { if (waitForAck) { + const ackTimer = ingestEventKafkaAckWait.labels(topic).startTimer() producer.produce( topic, null, @@ -100,6 +109,7 @@ export const produce = async ({ resolve(offset) } + ackTimer() produceSpan?.finish() } ) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index ca333926c7b2b..588c2c92beb86 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -15,6 +15,7 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics' import { ingestEventBatchingBatchCountSummary, ingestEventBatchingInputLengthSummary, + ingestEventEachBatchKafkaAckWait, ingestionOverflowingMessagesTotal, ingestionParallelism, ingestionParallelismPotential, @@ -227,7 +228,9 @@ export async function eachBatchParallelIngestion( // impact the success. Delaying ACKs allows the producer to write in big batches for // better throughput and lower broker load. const awaitSpan = transaction.startChild({ op: 'awaitACKs', data: { promiseCount: processingPromises.length } }) + const kafkaAckWaitMetric = ingestEventEachBatchKafkaAckWait.startTimer() await Promise.all(processingPromises) + kafkaAckWaitMetric() awaitSpan.finish() for (const message of messages) { diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts b/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts index 42c1b06a27b5d..60563b6cabaaa 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts @@ -41,3 +41,9 @@ export const ingestEventBatchingBatchCountSummary = new Summary({ help: 'Number of batches of events', percentiles: [0.5, 0.9, 0.95, 0.99], }) + +export const ingestEventEachBatchKafkaAckWait = new Summary({ + name: 'ingest_event_each_batch_kafka_ack_wait', + help: 'Wait time for the batch of Kafka ACKs at the end of eachBatchParallelIngestion', + percentiles: [0.5, 0.9, 0.95, 0.99], +})