Skip to content

Commit

Permalink
feat(plugin-server): add metrics for assigned partition count for rdk… (
Browse files Browse the repository at this point in the history
#17403)

feat(plugin-server): add metrics for assigned partition count for rdkafka consumers
  • Loading branch information
bretthoerner authored Sep 12, 2023
1 parent 4af3b8f commit da89485
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
13 changes: 12 additions & 1 deletion plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka-acosom'
import { exponentialBuckets, Histogram } from 'prom-client'
import { exponentialBuckets, Gauge, Histogram } from 'prom-client'

import { status } from '../utils/status'
import { createAdminClient, ensureTopicExists } from './admin'
import {
commitOffsetsForMessages,
consumeMessages,
countPartitionsPerTopic,
createKafkaConsumer,
disconnectConsumer,
instrumentConsumerMetrics,
Expand Down Expand Up @@ -181,6 +182,10 @@ export const startBatchConsumer = async ({
continue
}

for (const [topic, count] of countPartitionsPerTopic(consumer.assignments())) {
kafkaAbsolutePartitionCount.labels({ topic }).set(count)
}

status.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length })
if (!messages.length) {
status.debug('🔁', 'main_loop_empty_batch', { cause: 'empty' })
Expand Down Expand Up @@ -278,3 +283,9 @@ const consumedMessageSizeBytes = new Histogram({
labelNames: ['topic', 'groupId', 'messageType'],
buckets: exponentialBuckets(1, 8, 4).map((bucket) => bucket * 1024),
})

const kafkaAbsolutePartitionCount = new Gauge({
name: 'kafka_absolute_partition_count',
help: 'Number of partitions assigned to this consumer. (Absolute value from the consumer state.)',
labelNames: ['topic'],
})
23 changes: 22 additions & 1 deletion plugin-server/src/kafka/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Assignment,
ClientMetrics,
CODES,
ConsumerGlobalConfig,
Expand All @@ -9,7 +10,7 @@ import {
TopicPartitionOffset,
} from 'node-rdkafka-acosom'

import { latestOffsetTimestampGauge } from '../main/ingestion-queues/metrics'
import { kafkaRebalancePartitionCount, latestOffsetTimestampGauge } from '../main/ingestion-queues/metrics'
import { status } from '../utils/status'

export const createKafkaConsumer = async (config: ConsumerGlobalConfig) => {
Expand Down Expand Up @@ -54,6 +55,20 @@ export const createKafkaConsumer = async (config: ConsumerGlobalConfig) => {
})
})
}

export function countPartitionsPerTopic(assignments: Assignment[]): Map<string, number> {
const partitionsPerTopic = new Map()
for (const assignment of assignments) {
if (assignment.topic in partitionsPerTopic) {
partitionsPerTopic.set(assignment.topic, partitionsPerTopic.get(assignment.topic) + 1)
} else {
partitionsPerTopic.set(assignment.topic, 1)
}
}

return partitionsPerTopic
}

export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: string) => {
// For each message consumed, we record the latest timestamp processed for
// each partition assigned to this consumer group member. This consumer
Expand Down Expand Up @@ -89,8 +104,14 @@ export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: st
*/
if (error.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
status.info('📝️', 'librdkafka rebalance, partitions assigned', { assignments })
for (const [topic, count] of countPartitionsPerTopic(assignments)) {
kafkaRebalancePartitionCount.labels({ topic: topic }).inc(count)
}
} else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
status.info('📝️', 'librdkafka rebalance started, partitions revoked', { assignments })
for (const [topic, count] of countPartitionsPerTopic(assignments)) {
kafkaRebalancePartitionCount.labels({ topic: topic }).dec(count)
}
} else {
// We had a "real" error
status.error('⚠️', 'rebalance_error', { error })
Expand Down
6 changes: 6 additions & 0 deletions plugin-server/src/main/ingestion-queues/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import { Counter, Gauge } from 'prom-client'

export const kafkaRebalancePartitionCount = new Gauge({
name: 'kafka_rebalance_partition_count',
help: 'Number of partitions assigned to this consumer. (Calculated during rebalance events.)',
labelNames: ['topic'],
})

export const latestOffsetTimestampGauge = new Gauge({
name: 'latest_processed_timestamp_ms',
help: 'Timestamp of the latest offset that has been committed.',
Expand Down

0 comments on commit da89485

Please sign in to comment.