Skip to content

Commit

Permalink
feat(plugin-server): Track and log partition offsets and message coun…
Browse files Browse the repository at this point in the history
…ts within a batch (#20541)
  • Loading branch information
tkaemming authored Feb 23, 2024
1 parent c94d41e commit 2897711
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,36 @@ export interface BatchConsumer {
const STATUS_LOG_INTERVAL_MS = 10000
const SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS = 10000

type PartitionSummary = {
// number of messages received (often this can be derived from the
// difference between the minimum and maximum offset values + 1, but not
// always in case of messages deleted on the broker, or offset resets)
count: number
// minimum and maximum offsets observed
offsets: [number, number]
}

class BatchSummary {
// NOTE: ``Map`` would probably be more appropriate here, but ``Record`` is
// easier to JSON serialize.
private partitions: Record<number, PartitionSummary> = {}

public record(message: Message) {
let summary = this.partitions[message.partition]
if (summary === undefined) {
summary = {
count: 1,
offsets: [message.offset, message.offset],
}
this.partitions[message.partition] = summary
} else {
summary.count += 1
summary.offsets[0] = Math.min(summary.offsets[0], message.offset)
summary.offsets[1] = Math.max(summary.offsets[1], message.offset)
}
}
}

export const startBatchConsumer = async ({
connectionConfig,
groupId,
Expand Down Expand Up @@ -218,26 +248,30 @@ export const startBatchConsumer = async ({
}

const startProcessingTimeMs = new Date().valueOf()
const batchSummary = new BatchSummary()

consumerBatchSize.labels({ topic, groupId }).observe(messages.length)
for (const message of messages) {
consumedMessageSizeBytes.labels({ topic, groupId }).observe(message.size)
batchSummary.record(message)
}

// NOTE: we do not handle any retries. This should be handled by
// the implementation of `eachBatch`.
status.debug('⏳', `Starting to process batch of ${messages.length} events...`, batchSummary)
await eachBatch(messages)

messagesProcessed += messages.length
batchesProcessed += 1

const processingTimeMs = new Date().valueOf() - startProcessingTimeMs
consumedBatchDuration.labels({ topic, groupId }).observe(processingTimeMs)

const logSummary = `Processed ${messages.length} events in ${Math.round(processingTimeMs / 10) / 100}s`
if (processingTimeMs > SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS) {
status.warn(
'🕒',
`Slow batch: ${messages.length} events in ${Math.round(processingTimeMs / 10) / 100}s`
)
status.warn('🕒', `Slow batch: ${logSummary}`, batchSummary)
} else {
status.debug('⌛️', logSummary, batchSummary)
}

if (autoCommit) {
Expand Down

0 comments on commit 2897711

Please sign in to comment.