Skip to content

Commit

Permalink
chore(plugin-server): remove default value for autoCommit arg to star…
Browse files Browse the repository at this point in the history
…tBatchConsumer
  • Loading branch information
bretthoerner committed Oct 2, 2023
1 parent 085d661 commit cc512bb
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 3 deletions.
4 changes: 2 additions & 2 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export const startBatchConsumer = async ({
connectionConfig,
groupId,
topic,
autoCommit,
sessionTimeout,
consumerMaxBytesPerPartition,
consumerMaxBytes,
Expand All @@ -32,13 +33,13 @@ export const startBatchConsumer = async ({
batchingTimeoutMs,
topicCreationTimeoutMs,
eachBatch,
autoCommit = true,
cooperativeRebalance = true,
queuedMinMessages = 100000,
}: {
connectionConfig: GlobalConfig
groupId: string
topic: string
autoCommit: boolean
sessionTimeout: number
consumerMaxBytesPerPartition: number
consumerMaxBytes: number
Expand All @@ -48,7 +49,6 @@ export const startBatchConsumer = async ({
batchingTimeoutMs: number
topicCreationTimeoutMs: number
eachBatch: (messages: Message[]) => Promise<void>
autoCommit?: boolean
cooperativeRebalance?: boolean
queuedMinMessages?: number
}): Promise<BatchConsumer> => {
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ export class IngestionConsumer {
connectionConfig: createRdConnectionConfigFromEnvVars(this.pluginsServer as KafkaConfig),
topic: this.topic,
groupId: this.consumerGroupId,
autoCommit: true,
sessionTimeout: 30000,
consumerMaxBytes: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export const startSessionRecordingEventsConsumerV1 = async ({
connectionConfig,
groupId,
topic: KAFKA_SESSION_RECORDING_EVENTS,
autoCommit: true,
sessionTimeout,
consumerMaxBytesPerPartition,
consumerMaxBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ export class SessionRecordingIngesterV2 {
connectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: false,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
Expand All @@ -464,7 +465,6 @@ export class SessionRecordingIngesterV2 {
fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
autoCommit: false,
eachBatch: async (messages) => {
return await this.handleEachBatch(messages)
},
Expand Down

0 comments on commit cc512bb

Please sign in to comment.