Skip to content

Commit

Permalink
fix(plugin-server): use rdkafka autocommit by storing our own offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Oct 2, 2023
1 parent fae1c4d commit 085d661
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
5 changes: 2 additions & 3 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ export const startBatchConsumer = async ({
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
// We disable auto commit and rather we commit after one batch has
// completed.
'enable.auto.commit': false,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
* max.partition.fetch.bytes
* The maximum amount of data per-partition the server will return.
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/kafka/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaC
})

if (topicPartitionOffsets.length > 0) {
status.debug('📝', 'Committing offsets', { topicPartitionOffsets })
consumer.commit(topicPartitionOffsets)
status.debug('📝', 'Storing offsets', { topicPartitionOffsets })
consumer.offsetsStore(topicPartitionOffsets)
}
}

Expand Down

0 comments on commit 085d661

Please sign in to comment.