Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add max.poll.interval.ms to our kafka lib via env #18877

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_CONSUMPTION_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: null,
KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: 30_000,
KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS: 300_000,
KAFKA_TOPIC_CREATION_TIMEOUT_MS: isDevEnv() ? 30_000 : 5_000, // rdkafka default is 5s, increased in devenv to resist to slow kafka
KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500,
APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000,
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const startBatchConsumer = async ({
topic,
autoCommit,
sessionTimeout,
maxPollIntervalMs,
consumerMaxBytesPerPartition,
consumerMaxBytes,
consumerMaxWaitMs,
Expand All @@ -44,6 +45,7 @@ export const startBatchConsumer = async ({
topic: string
autoCommit: boolean
sessionTimeout: number
maxPollIntervalMs: number
consumerMaxBytesPerPartition: number
consumerMaxBytes: number
consumerMaxWaitMs: number
Expand Down Expand Up @@ -78,6 +80,7 @@ export const startBatchConsumer = async ({
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
'max.poll.interval.ms': maxPollIntervalMs,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ export class IngestionConsumer {
groupId: this.consumerGroupId,
autoCommit: true,
sessionTimeout: this.pluginsServer.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.pluginsServer.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
consumerMaxBytes: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: this.pluginsServer.KAFKA_CONSUMPTION_MAX_WAIT_MS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ export class SessionRecordingIngester {
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: false,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export interface PluginsServerConfig {
KAFKA_CONSUMPTION_OVERFLOW_TOPIC: string | null
KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: number | null
KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: number
KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS: number
KAFKA_TOPIC_CREATION_TIMEOUT_MS: number
KAFKA_PRODUCER_LINGER_MS: number // linger.ms rdkafka parameter
KAFKA_PRODUCER_BATCH_SIZE: number // batch.size rdkafka parameter
Expand Down
Loading