Skip to content

Commit

Permalink
fix(plugin-server): don't lie about rdkafka healthcheck before seeing… (
Browse files Browse the repository at this point in the history
#17476)

fix(plugin-server): don't lie about rdkafka healthcheck before seeing at least 1 consume
  • Loading branch information
bretthoerner authored Sep 19, 2023
1 parent cd67049 commit 240b545
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export const startBatchConsumer = async ({
instrumentConsumerMetrics(consumer, groupId, cooperativeRebalance)

let isShuttingDown = false
let lastLoopTime = Date.now()
let lastConsumeTime = 0

// Before subscribing, we need to ensure that the topic exists. We don't
// currently have a way to manage topic creation elsewhere (we handle this
Expand All @@ -148,11 +148,11 @@ export const startBatchConsumer = async ({
consumer.subscribe([topic])

const startConsuming = async () => {
// Start consuming in a loop, fetching a batch of a max of 500 messages then
// processing these with eachMessage, and finally calling
// Start consuming in a loop, fetching a batch of a max of `fetchBatchSize`
// messages then processing these with eachMessage, and finally calling
// consumer.offsetsStore. This will not actually commit offsets on the
// brokers, but rather just store the offsets locally such that when commit
// is called, either manually of via auto-commit, these are the values that
// is called, either manually or via auto-commit, these are the values that
// will be used.
//
// Note that we rely on librdkafka handling retries for any Kafka
Expand All @@ -167,18 +167,23 @@ export const startBatchConsumer = async ({
const statusLogInterval = setInterval(() => {
status.info('🔁', 'main_loop', {
messagesPerSecond: messagesProcessed / (statusLogMilliseconds / 1000),
lastLoopTime: new Date(lastLoopTime).toISOString(),
lastConsumeTime: new Date(lastConsumeTime).toISOString(),
})

messagesProcessed = 0
}, statusLogMilliseconds)

try {
while (!isShuttingDown) {
lastLoopTime = Date.now()

status.debug('🔁', 'main_loop_consuming')
const messages = await consumeMessages(consumer, fetchBatchSize)

// It's important that we only set the `lastConsumeTime` after a successful consume
// call. Even if we received 0 messages, a successful call means we are actually
// subscribed and didn't receive, for example, an error about an inconsistent group
// protocol. If we never manage to consume, we don't want our health checks to pass.
lastConsumeTime = Date.now()

if (!messages) {
status.debug('🔁', 'main_loop_empty_batch', { cause: 'undefined' })
continue
Expand Down Expand Up @@ -230,7 +235,7 @@ export const startBatchConsumer = async ({
const isHealthy = () => {
// We define health as the last consumer loop having run in the last
// minute. This might not be bullet-proof, let's see.
return Date.now() - lastLoopTime < 60000
return Date.now() - lastConsumeTime < 60000
}

const stop = async () => {
Expand Down

0 comments on commit 240b545

Please sign in to comment.