diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 03c9e2de6db37..8ef61ee4ff4f9 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -123,7 +123,7 @@ export const startBatchConsumer = async ({ instrumentConsumerMetrics(consumer, groupId, cooperativeRebalance) let isShuttingDown = false - let lastLoopTime = Date.now() + let lastConsumeTime = 0 // Before subscribing, we need to ensure that the topic exists. We don't // currently have a way to manage topic creation elsewhere (we handle this @@ -148,11 +148,11 @@ export const startBatchConsumer = async ({ consumer.subscribe([topic]) const startConsuming = async () => { - // Start consuming in a loop, fetching a batch of a max of 500 messages then - // processing these with eachMessage, and finally calling + // Start consuming in a loop, fetching a batch of a max of `fetchBatchSize` + // messages then processing these with eachMessage, and finally calling // consumer.offsetsStore. This will not actually commit offsets on the // brokers, but rather just store the offsets locally such that when commit - // is called, either manually of via auto-commit, these are the values that + // is called, either manually or via auto-commit, these are the values that // will be used. // // Note that we rely on librdkafka handling retries for any Kafka @@ -167,7 +167,7 @@ export const startBatchConsumer = async ({ const statusLogInterval = setInterval(() => { status.info('🔁', 'main_loop', { messagesPerSecond: messagesProcessed / (statusLogMilliseconds / 1000), - lastLoopTime: new Date(lastLoopTime).toISOString(), + lastConsumeTime: new Date(lastConsumeTime).toISOString(), }) messagesProcessed = 0 @@ -175,10 +175,15 @@ export const startBatchConsumer = async ({ try { while (!isShuttingDown) { - lastLoopTime = Date.now() - status.debug('🔁', 'main_loop_consuming') const messages = await consumeMessages(consumer, fetchBatchSize) + + // It's important that we only set the `lastConsumeTime` after a successful consume + // call. Even if we received 0 messages, a successful call means we are actually + // subscribed and didn't receive, for example, an error about an inconsistent group + // protocol. If we never manage to consume, we don't want our health checks to pass. + lastConsumeTime = Date.now() + if (!messages) { status.debug('🔁', 'main_loop_empty_batch', { cause: 'undefined' }) continue @@ -230,7 +235,7 @@ export const startBatchConsumer = async ({ const isHealthy = () => { // We define health as the last consumer loop having run in the last // minute. This might not be bullet-proof, let's see. - return Date.now() - lastLoopTime < 60000 + return Date.now() - lastConsumeTime < 60000 } const stop = async () => {