diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 8ef61ee4ff4f9..1f62c9411d3ac 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -76,9 +76,8 @@ export const startBatchConsumer = async ({ ...connectionConfig, 'group.id': groupId, 'session.timeout.ms': sessionTimeout, - // We disable auto commit and rather we commit after one batch has - // completed. - 'enable.auto.commit': false, + 'enable.auto.commit': autoCommit, + 'enable.auto.offset.store': false, /** * max.partition.fetch.bytes * The maximum amount of data per-partition the server will return. diff --git a/plugin-server/src/kafka/consumer.ts b/plugin-server/src/kafka/consumer.ts index 14a45f946376e..d370248455289 100644 --- a/plugin-server/src/kafka/consumer.ts +++ b/plugin-server/src/kafka/consumer.ts @@ -213,8 +213,8 @@ export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaC }) if (topicPartitionOffsets.length > 0) { - status.debug('📝', 'Committing offsets', { topicPartitionOffsets }) - consumer.commit(topicPartitionOffsets) + status.debug('📝', 'Storing offsets', { topicPartitionOffsets }) + consumer.offsetsStore(topicPartitionOffsets) } }