diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 83139ad21798d..ad63f8f119738 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -221,6 +221,7 @@ export const startBatchConsumer = async ({ batchesProcessed += 1 const processingTimeMs = new Date().valueOf() - startProcessingTimeMs + consumedBatchDuration.labels({ topic, groupId }).observe(processingTimeMs) if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) { status.warn( '🕒', @@ -293,6 +294,12 @@ export const startBatchConsumer = async ({ return { isHealthy, stop, join, consumer } } +export const consumedBatchDuration = new Histogram({ + name: 'consumed_batch_duration_ms', + help: 'Main loop consumer batch processing duration in ms', + labelNames: ['topic', 'groupId'], +}) + export const consumerBatchSize = new Histogram({ name: 'consumed_batch_size', help: 'Size of the batch fetched by the consumer',