From 6439d06ea7942cd2e686a74b453b06c232813092 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Thu, 19 Oct 2023 11:10:34 -0600 Subject: [PATCH] =?UTF-8?q?fix(plugin-server):=20log=20EPS=20every=2010=20?= =?UTF-8?q?seconds,=20only=20log=20individual=20bat=E2=80=A6=20(#18105)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix(plugin-server): log EPS every 10 seconds, only log individual batches when they are slow --- plugin-server/src/kafka/batch-consumer.ts | 37 +++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index b5cc846874b442..08915759c1cf6a 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -20,6 +20,9 @@ export interface BatchConsumer { isHealthy: () => boolean } +const STATUS_LOG_INTERVAL_MS = 10000 +const SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS = 10000 + export const startBatchConsumer = async ({ connectionConfig, groupId, @@ -156,9 +159,22 @@ export const startBatchConsumer = async ({ // it will handle in the background rebalances, during which time consumeMessages will // simply return an empty array. // - // We also log the number of messages we have processed every time we go through the loop - // and process 1 or more messages, which should give some feedback to the user that things - // are functioning as expected. + // We log the number of messages that have been processed every 10 seconds, which should + // give some feedback to the user that things are functioning as expected. If a single batch + // takes more than SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS we log it individually. + let messagesProcessed = 0 + let batchesProcessed = 0 + const statusLogInterval = setInterval(() => { + status.info('🔁', 'main_loop', { + messagesPerSecond: messagesProcessed / (STATUS_LOG_INTERVAL_MS / 1000), + batchesProcessed: batchesProcessed, + lastConsumeTime: new Date(lastConsumeTime).toISOString(), + }) + + messagesProcessed = 0 + batchesProcessed = 0 + }, STATUS_LOG_INTERVAL_MS) + try { while (!isShuttingDown) { status.debug('🔁', 'main_loop_consuming') @@ -187,7 +203,7 @@ export const startBatchConsumer = async ({ continue } - const startProcessingTime = new Date().valueOf() + const startProcessingTimeMs = new Date().valueOf() consumerBatchSize.labels({ topic, groupId }).observe(messages.length) for (const message of messages) { @@ -198,8 +214,16 @@ export const startBatchConsumer = async ({ // the implementation of `eachBatch`. await eachBatch(messages) - const processingTime = new Date().valueOf() - startProcessingTime - status.info('🕒', `Processed ${messages.length} events in ${Math.round(processingTime / 10) / 100}s`) + messagesProcessed += messages.length + batchesProcessed += 1 + + const processingTimeMs = new Date().valueOf() - startProcessingTimeMs + if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) { + status.warn( + '🕒', + `Slow batch: ${messages.length} events in ${Math.round(processingTimeMs / 10) / 100}s` + ) + } if (autoCommit) { storeOffsetsForMessages(messages, consumer) @@ -210,6 +234,7 @@ export const startBatchConsumer = async ({ throw error } finally { status.info('🔁', 'main_loop_stopping') + clearInterval(statusLogInterval) // Finally, disconnect from the broker. If stored offsets have changed via // `storeOffsetsForMessages` above, they will be committed before shutdown (so long