Skip to content

Commit

Permalink
fix(plugin-server): use rdkafka autocommit by storing our own offsets (
Browse files Browse the repository at this point in the history
…#17657)

* fix(plugin-server): use rdkafka autocommit by storing our own offsets

* chore(plugin-server): remove default value for autoCommit arg to startBatchConsumer

* chore(plugin-server): commitOffsetsForMessages -> storeOffsetsForMessages
  • Loading branch information
bretthoerner authored Oct 3, 2023
1 parent 4cb9a68 commit 76396ab
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 11 deletions.
13 changes: 6 additions & 7 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { exponentialBuckets, Gauge, Histogram } from 'prom-client'
import { status } from '../utils/status'
import { createAdminClient, ensureTopicExists } from './admin'
import {
commitOffsetsForMessages,
consumeMessages,
countPartitionsPerTopic,
createKafkaConsumer,
disconnectConsumer,
instrumentConsumerMetrics,
storeOffsetsForMessages,
} from './consumer'

export interface BatchConsumer {
Expand All @@ -23,6 +23,7 @@ export const startBatchConsumer = async ({
connectionConfig,
groupId,
topic,
autoCommit,
sessionTimeout,
consumerMaxBytesPerPartition,
consumerMaxBytes,
Expand All @@ -32,13 +33,13 @@ export const startBatchConsumer = async ({
batchingTimeoutMs,
topicCreationTimeoutMs,
eachBatch,
autoCommit = true,
cooperativeRebalance = true,
queuedMinMessages = 100000,
}: {
connectionConfig: GlobalConfig
groupId: string
topic: string
autoCommit: boolean
sessionTimeout: number
consumerMaxBytesPerPartition: number
consumerMaxBytes: number
Expand All @@ -48,7 +49,6 @@ export const startBatchConsumer = async ({
batchingTimeoutMs: number
topicCreationTimeoutMs: number
eachBatch: (messages: Message[]) => Promise<void>
autoCommit?: boolean
cooperativeRebalance?: boolean
queuedMinMessages?: number
}): Promise<BatchConsumer> => {
Expand Down Expand Up @@ -76,9 +76,8 @@ export const startBatchConsumer = async ({
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
// We disable auto commit and rather we commit after one batch has
// completed.
'enable.auto.commit': false,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
* max.partition.fetch.bytes
* The maximum amount of data per-partition the server will return.
Expand Down Expand Up @@ -211,7 +210,7 @@ export const startBatchConsumer = async ({
messagesProcessed += messages.length

if (autoCommit) {
commitOffsetsForMessages(messages, consumer)
storeOffsetsForMessages(messages, consumer)
}
}
} catch (error) {
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/kafka/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ export const findOffsetsToCommit = (messages: TopicPartitionOffset[]): TopicPart
return highestOffsets
}

export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => {
export const storeOffsetsForMessages = (messages: Message[], consumer: RdKafkaConsumer) => {
const topicPartitionOffsets = findOffsetsToCommit(messages).map((message) => {
return {
...message,
Expand All @@ -213,8 +213,8 @@ export const commitOffsetsForMessages = (messages: Message[], consumer: RdKafkaC
})

if (topicPartitionOffsets.length > 0) {
status.debug('📝', 'Committing offsets', { topicPartitionOffsets })
consumer.commit(topicPartitionOffsets)
status.debug('📝', 'Storing offsets', { topicPartitionOffsets })
consumer.offsetsStore(topicPartitionOffsets)
}
}

Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ export class IngestionConsumer {
connectionConfig: createRdConnectionConfigFromEnvVars(this.pluginsServer as KafkaConfig),
topic: this.topic,
groupId: this.consumerGroupId,
autoCommit: true,
sessionTimeout: 30000,
consumerMaxBytes: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export const startSessionRecordingEventsConsumerV1 = async ({
connectionConfig,
groupId,
topic: KAFKA_SESSION_RECORDING_EVENTS,
autoCommit: true,
sessionTimeout,
consumerMaxBytesPerPartition,
consumerMaxBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ export class SessionRecordingIngesterV2 {
connectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: false,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
Expand All @@ -464,7 +465,6 @@ export class SessionRecordingIngesterV2 {
fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
autoCommit: false,
eachBatch: async (messages) => {
return await this.handleEachBatch(messages)
},
Expand Down

0 comments on commit 76396ab

Please sign in to comment.