From 76396abddd0cdfda0aa5922f6f17184508b74d51 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 3 Oct 2023 07:52:54 -0600 Subject: [PATCH] fix(plugin-server): use rdkafka autocommit by storing our own offsets (#17657) * fix(plugin-server): use rdkafka autocommit by storing our own offsets * chore(plugin-server): remove default value for autoCommit arg to startBatchConsumer * chore(plugin-server): commitOffsetsForMessages -> storeOffsetsForMessages --- plugin-server/src/kafka/batch-consumer.ts | 13 ++++++------- plugin-server/src/kafka/consumer.ts | 6 +++--- .../src/main/ingestion-queues/kafka-queue.ts | 1 + .../session-recordings-consumer-v1.ts | 1 + .../session-recordings-consumer-v2.ts | 2 +- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 8ef61ee4ff4f9..0a4b8fa88cc37 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -4,12 +4,12 @@ import { exponentialBuckets, Gauge, Histogram } from 'prom-client' import { status } from '../utils/status' import { createAdminClient, ensureTopicExists } from './admin' import { - commitOffsetsForMessages, consumeMessages, countPartitionsPerTopic, createKafkaConsumer, disconnectConsumer, instrumentConsumerMetrics, + storeOffsetsForMessages, } from './consumer' export interface BatchConsumer { @@ -23,6 +23,7 @@ export const startBatchConsumer = async ({ connectionConfig, groupId, topic, + autoCommit, sessionTimeout, consumerMaxBytesPerPartition, consumerMaxBytes, @@ -32,13 +33,13 @@ export const startBatchConsumer = async ({ batchingTimeoutMs, topicCreationTimeoutMs, eachBatch, - autoCommit = true, cooperativeRebalance = true, queuedMinMessages = 100000, }: { connectionConfig: GlobalConfig groupId: string topic: string + autoCommit: boolean sessionTimeout: number consumerMaxBytesPerPartition: number consumerMaxBytes: number @@ -48,7 +49,6 @@ export const startBatchConsumer = async ({ batchingTimeoutMs: number topicCreationTimeoutMs: number eachBatch: (messages: Message[]) => Promise - autoCommit?: boolean cooperativeRebalance?: boolean queuedMinMessages?: number }): Promise => { @@ -76,9 +76,8 @@ export const startBatchConsumer = async ({ ...connectionConfig, 'group.id': groupId, 'session.timeout.ms': sessionTimeout, - // We disable auto commit and rather we commit after one batch has - // completed. - 'enable.auto.commit': false, + 'enable.auto.commit': autoCommit, + 'enable.auto.offset.store': false, /** * max.partition.fetch.bytes * The maximum amount of data per-partition the server will return. @@ -211,7 +210,7 @@ export const startBatchConsumer = async ({ messagesProcessed += messages.length if (autoCommit) { - commitOffsetsForMessages(messages, consumer) + storeOffsetsForMessages(messages, consumer) } } } catch (error) { diff --git a/plugin-server/src/kafka/consumer.ts b/plugin-server/src/kafka/consumer.ts index 14a45f946376e..d05013aa7e6f0 100644 --- a/plugin-server/src/kafka/consumer.ts +++ b/plugin-server/src/kafka/consumer.ts @@ -203,7 +203,7 @@ export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPart return highestOffsets } -export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => { +export const storeOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => { const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => { return { ...message, @@ -213,8 +213,8 @@ export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaC }) if (topicPartitionOffsets.length > 0) { - status.debug('📝', 'Committing offsets', { topicPartitionOffsets }) - consumer.commit(topicPartitionOffsets) + status.debug('📝', 'Storing offsets', { topicPartitionOffsets }) + consumer.offsetsStore(topicPartitionOffsets) } } diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 7989efd4b356a..3d15270307722 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -249,6 +249,7 @@ export class IngestionConsumer { connectionConfig: createRdConnectionConfigFromEnvVars(this.pluginsServer as KafkaConfig), topic: this.topic, groupId: this.consumerGroupId, + autoCommit: true, sessionTimeout: 30000, consumerMaxBytes: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES, consumerMaxBytesPerPartition: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts index 7f1c6f3fdd2f9..81b4fc9ec2be8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts @@ -85,6 +85,7 @@ export const startSessionRecordingEventsConsumerV1 = async ({ connectionConfig, groupId, topic: KAFKA_SESSION_RECORDING_EVENTS, + autoCommit: true, sessionTimeout, consumerMaxBytesPerPartition, consumerMaxBytes, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index d42f756b7d49c..1cdbcdbb0aa5c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -451,6 +451,7 @@ export class SessionRecordingIngesterV2 { connectionConfig, groupId: KAFKA_CONSUMER_GROUP_ID, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + autoCommit: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, // the largest size of a message that can be fetched by the consumer. // the largest size our MSK cluster allows is 20MB @@ -464,7 +465,6 @@ export class SessionRecordingIngesterV2 { fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE, batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS, - autoCommit: false, eachBatch: async (messages) => { return await this.handleEachBatch(messages) },