From 3d5b19846df58b091ba1213ac23ae8ec221766cf Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 2 Oct 2023 12:12:02 -0600 Subject: [PATCH] chore(plugin-server): remove default value for autoCommit arg to startBatchConsumer --- plugin-server/src/kafka/batch-consumer.ts | 4 ++-- plugin-server/src/main/ingestion-queues/kafka-queue.ts | 1 + .../session-recording/session-recordings-consumer-v1.ts | 1 + .../session-recording/session-recordings-consumer-v2.ts | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 1f62c9411d3ac..96f11942d87dc 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -23,6 +23,7 @@ export const startBatchConsumer = async ({ connectionConfig, groupId, topic, + autoCommit, sessionTimeout, consumerMaxBytesPerPartition, consumerMaxBytes, @@ -32,13 +33,13 @@ export const startBatchConsumer = async ({ batchingTimeoutMs, topicCreationTimeoutMs, eachBatch, - autoCommit = true, cooperativeRebalance = true, queuedMinMessages = 100000, }: { connectionConfig: GlobalConfig groupId: string topic: string + autoCommit: boolean sessionTimeout: number consumerMaxBytesPerPartition: number consumerMaxBytes: number @@ -48,7 +49,6 @@ export const startBatchConsumer = async ({ batchingTimeoutMs: number topicCreationTimeoutMs: number eachBatch: (messages: Message[]) => Promise - autoCommit?: boolean cooperativeRebalance?: boolean queuedMinMessages?: number }): Promise => { diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 7989efd4b356a..3d15270307722 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -249,6 +249,7 @@ export class IngestionConsumer { connectionConfig: createRdConnectionConfigFromEnvVars(this.pluginsServer as KafkaConfig), topic: this.topic, groupId: this.consumerGroupId, + autoCommit: true, sessionTimeout: 30000, consumerMaxBytes: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES, consumerMaxBytesPerPartition: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts index 7f1c6f3fdd2f9..81b4fc9ec2be8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts @@ -85,6 +85,7 @@ export const startSessionRecordingEventsConsumerV1 = async ({ connectionConfig, groupId, topic: KAFKA_SESSION_RECORDING_EVENTS, + autoCommit: true, sessionTimeout, consumerMaxBytesPerPartition, consumerMaxBytes, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index d42f756b7d49c..1cdbcdbb0aa5c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -451,6 +451,7 @@ export class SessionRecordingIngesterV2 { connectionConfig, groupId: KAFKA_CONSUMER_GROUP_ID, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + autoCommit: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, // the largest size of a message that can be fetched by the consumer. // the largest size our MSK cluster allows is 20MB @@ -464,7 +465,6 @@ export class SessionRecordingIngesterV2 { fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE, batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS, - autoCommit: false, eachBatch: async (messages) => { return await this.handleEachBatch(messages) },