From 085d66152075dd6f57147fa565ca0062e6c53be3 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Wed, 27 Sep 2023 13:32:44 -0600 Subject: [PATCH] fix(plugin-server): use rdkafka autocommit by storing our own offsets --- plugin-server/src/kafka/batch-consumer.ts | 5 ++--- plugin-server/src/kafka/consumer.ts | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 8ef61ee4ff4f9a..1f62c9411d3acb 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -76,9 +76,8 @@ export const startBatchConsumer = async ({ ...connectionConfig, 'group.id': groupId, 'session.timeout.ms': sessionTimeout, - // We disable auto commit and rather we commit after one batch has - // completed. - 'enable.auto.commit': false, + 'enable.auto.commit': autoCommit, + 'enable.auto.offset.store': false, /** * max.partition.fetch.bytes * The maximum amount of data per-partition the server will return. diff --git a/plugin-server/src/kafka/consumer.ts b/plugin-server/src/kafka/consumer.ts index 14a45f946376e6..d3702484552895 100644 --- a/plugin-server/src/kafka/consumer.ts +++ b/plugin-server/src/kafka/consumer.ts @@ -213,8 +213,8 @@ export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaC }) if (topicPartitionOffsets.length > 0) { - status.debug('📝', 'Committing offsets', { topicPartitionOffsets }) - consumer.commit(topicPartitionOffsets) + status.debug('📝', 'Storing offsets', { topicPartitionOffsets }) + consumer.offsetsStore(topicPartitionOffsets) } }