From f82131fa74a648745b886993c983f9815fecb04d Mon Sep 17 00:00:00 2001 From: Tiina Turban Date: Fri, 24 Nov 2023 02:54:40 +0100 Subject: [PATCH] feat: Add batch duration metric for ingestion consumer loop --- plugin-server/src/kafka/batch-consumer.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 83139ad21798d..ad63f8f119738 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -221,6 +221,7 @@ export const startBatchConsumer = async ({ batchesProcessed += 1 const processingTimeMs = new Date().valueOf() - startProcessingTimeMs + consumedBatchDuration.labels({ topic, groupId }).observe(processingTimeMs) if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) { status.warn( '🕒', @@ -293,6 +294,12 @@ export const startBatchConsumer = async ({ return { isHealthy, stop, join, consumer } } +export const consumedBatchDuration = new Histogram({ + name: 'consumed_batch_duration_ms', + help: 'Main loop consumer batch processing duration in ms', + labelNames: ['topic', 'groupId'], +}) + export const consumerBatchSize = new Histogram({ name: 'consumed_batch_size', help: 'Size of the batch fetched by the consumer',