Skip to content

Commit

Permalink
fix(plugin-server): log EPS every 10 seconds, only log individual bat… (
Browse files Browse the repository at this point in the history
#18105)

fix(plugin-server): log EPS every 10 seconds, only log individual batches when they are slow
  • Loading branch information
bretthoerner authored and daibhin committed Oct 23, 2023
1 parent 73dc7a1 commit 38b0065
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export interface BatchConsumer {
isHealthy: () => boolean
}

const STATUS_LOG_INTERVAL_MS = 10000
const SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS = 10000

export const startBatchConsumer = async ({
connectionConfig,
groupId,
Expand Down Expand Up @@ -156,9 +159,22 @@ export const startBatchConsumer = async ({
// it will handle in the background rebalances, during which time consumeMessages will
// simply return an empty array.
//
// We also log the number of messages we have processed every time we go through the loop
// and process 1 or more messages, which should give some feedback to the user that things
// are functioning as expected.
// We log the number of messages that have been processed every 10 seconds, which should
// give some feedback to the user that things are functioning as expected. If a single batch
// takes more than SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS we log it individually.
let messagesProcessed = 0
let batchesProcessed = 0
const statusLogInterval = setInterval(() => {
status.info('🔁', 'main_loop', {
messagesPerSecond: messagesProcessed / (STATUS_LOG_INTERVAL_MS / 1000),
batchesProcessed: batchesProcessed,
lastConsumeTime: new Date(lastConsumeTime).toISOString(),
})

messagesProcessed = 0
batchesProcessed = 0
}, STATUS_LOG_INTERVAL_MS)

try {
while (!isShuttingDown) {
status.debug('🔁', 'main_loop_consuming')
Expand Down Expand Up @@ -187,7 +203,7 @@ export const startBatchConsumer = async ({
continue
}

const startProcessingTime = new Date().valueOf()
const startProcessingTimeMs = new Date().valueOf()

consumerBatchSize.labels({ topic, groupId }).observe(messages.length)
for (const message of messages) {
Expand All @@ -198,8 +214,16 @@ export const startBatchConsumer = async ({
// the implementation of `eachBatch`.
await eachBatch(messages)

const processingTime = new Date().valueOf() - startProcessingTime
status.info('🕒', `Processed ${messages.length} events in ${Math.round(processingTime / 10) / 100}s`)
messagesProcessed += messages.length
batchesProcessed += 1

const processingTimeMs = new Date().valueOf() - startProcessingTimeMs
if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) {
status.warn(
'🕒',
`Slow batch: ${messages.length} events in ${Math.round(processingTimeMs / 10) / 100}s`
)
}

if (autoCommit) {
storeOffsetsForMessages(messages, consumer)
Expand All @@ -210,6 +234,7 @@ export const startBatchConsumer = async ({
throw error
} finally {
status.info('🔁', 'main_loop_stopping')
clearInterval(statusLogInterval)

// Finally, disconnect from the broker. If stored offsets have changed via
// `storeOffsetsForMessages` above, they will be committed before shutdown (so long
Expand Down

0 comments on commit 38b0065

Please sign in to comment.