From da89485d9120435fce19284e898ad1aa8333dfb5 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 12 Sep 2023 11:15:44 -0500 Subject: [PATCH] =?UTF-8?q?feat(plugin-server):=20add=20metrics=20for=20as?= =?UTF-8?q?signed=20partition=20count=20for=20rdk=E2=80=A6=20(#17403)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feat(plugin-server): add metrics for assigned partition count for rdkafka consumers --- plugin-server/src/kafka/batch-consumer.ts | 13 ++++++++++- plugin-server/src/kafka/consumer.ts | 23 ++++++++++++++++++- .../src/main/ingestion-queues/metrics.ts | 6 +++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index a82aed8861098..3acae7a88b57d 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -1,11 +1,12 @@ import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka-acosom' -import { exponentialBuckets, Histogram } from 'prom-client' +import { exponentialBuckets, Gauge, Histogram } from 'prom-client' import { status } from '../utils/status' import { createAdminClient, ensureTopicExists } from './admin' import { commitOffsetsForMessages, consumeMessages, + countPartitionsPerTopic, createKafkaConsumer, disconnectConsumer, instrumentConsumerMetrics, @@ -181,6 +182,10 @@ export const startBatchConsumer = async ({ continue } + for (const [topic, count] of countPartitionsPerTopic(consumer.assignments())) { + kafkaAbsolutePartitionCount.labels({ topic }).set(count) + } + status.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length }) if (!messages.length) { status.debug('🔁', 'main_loop_empty_batch', { cause: 'empty' }) @@ -278,3 +283,9 @@ const consumedMessageSizeBytes = new Histogram({ labelNames: ['topic', 'groupId', 'messageType'], buckets: exponentialBuckets(1, 8, 4).map((bucket) => bucket * 1024), }) + +const kafkaAbsolutePartitionCount = new Gauge({ + name: 'kafka_absolute_partition_count', + help: 'Number of partitions assigned to this consumer. (Absolute value from the consumer state.)', + labelNames: ['topic'], +}) diff --git a/plugin-server/src/kafka/consumer.ts b/plugin-server/src/kafka/consumer.ts index f3b3a91d2be44..32a6594009f7a 100644 --- a/plugin-server/src/kafka/consumer.ts +++ b/plugin-server/src/kafka/consumer.ts @@ -1,4 +1,5 @@ import { + Assignment, ClientMetrics, CODES, ConsumerGlobalConfig, @@ -9,7 +10,7 @@ import { TopicPartitionOffset, } from 'node-rdkafka-acosom' -import { latestOffsetTimestampGauge } from '../main/ingestion-queues/metrics' +import { kafkaRebalancePartitionCount, latestOffsetTimestampGauge } from '../main/ingestion-queues/metrics' import { status } from '../utils/status' export const createKafkaConsumer = async (config: ConsumerGlobalConfig) => { @@ -54,6 +55,20 @@ export const createKafkaConsumer = async (config: ConsumerGlobalConfig) => { }) }) } + +export function countPartitionsPerTopic(assignments: Assignment[]): Map { + const partitionsPerTopic = new Map() + for (const assignment of assignments) { + if (assignment.topic in partitionsPerTopic) { + partitionsPerTopic.set(assignment.topic, partitionsPerTopic.get(assignment.topic) + 1) + } else { + partitionsPerTopic.set(assignment.topic, 1) + } + } + + return partitionsPerTopic +} + export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: string) => { // For each message consumed, we record the latest timestamp processed for // each partition assigned to this consumer group member. This consumer @@ -89,8 +104,14 @@ export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: st */ if (error.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { status.info('📝️', 'librdkafka rebalance, partitions assigned', { assignments }) + for (const [topic, count] of countPartitionsPerTopic(assignments)) { + kafkaRebalancePartitionCount.labels({ topic: topic }).inc(count) + } } else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) { status.info('📝️', 'librdkafka rebalance started, partitions revoked', { assignments }) + for (const [topic, count] of countPartitionsPerTopic(assignments)) { + kafkaRebalancePartitionCount.labels({ topic: topic }).dec(count) + } } else { // We had a "real" error status.error('⚠️', 'rebalance_error', { error }) diff --git a/plugin-server/src/main/ingestion-queues/metrics.ts b/plugin-server/src/main/ingestion-queues/metrics.ts index 97188247cbefa..099832e1ea14c 100644 --- a/plugin-server/src/main/ingestion-queues/metrics.ts +++ b/plugin-server/src/main/ingestion-queues/metrics.ts @@ -2,6 +2,12 @@ import { Counter, Gauge } from 'prom-client' +export const kafkaRebalancePartitionCount = new Gauge({ + name: 'kafka_rebalance_partition_count', + help: 'Number of partitions assigned to this consumer. (Calculated during rebalance events.)', + labelNames: ['topic'], +}) + export const latestOffsetTimestampGauge = new Gauge({ name: 'latest_processed_timestamp_ms', help: 'Timestamp of the latest offset that has been committed.',