diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index d53b7b7fddd5f..77d86ba3b60eb 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -181,15 +181,15 @@ export const startBatchConsumer = async ({ // protocol. If we never manage to consume, we don't want our health checks to pass. lastConsumeTime = Date.now() + for (const [topic, count] of countPartitionsPerTopic(consumer.assignments())) { + kafkaAbsolutePartitionCount.labels({ topic }).set(count) + } + if (!messages) { status.debug('🔁', 'main_loop_empty_batch', { cause: 'undefined' }) continue } - for (const [topic, count] of countPartitionsPerTopic(consumer.assignments())) { - kafkaAbsolutePartitionCount.labels({ topic }).set(count) - } - status.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length }) if (!messages.length) { status.debug('🔁', 'main_loop_empty_batch', { cause: 'empty' })