From d980d13f76066ed4011904428864baf8470d1cd6 Mon Sep 17 00:00:00 2001 From: Tiina Turban Date: Fri, 24 Nov 2023 00:48:42 +0100 Subject: [PATCH] feat: Add max.poll.interval.ms to our kafka lib via env (#18877) --- plugin-server/src/config/config.ts | 1 + plugin-server/src/kafka/batch-consumer.ts | 3 +++ plugin-server/src/main/ingestion-queues/kafka-queue.ts | 1 + .../session-recording/session-recordings-consumer.ts | 1 + plugin-server/src/types.ts | 1 + 5 files changed, 7 insertions(+) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index bcfbc606b51c6..aa2ada4a10e49 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -55,6 +55,7 @@ export function getDefaultConfig(): PluginsServerConfig { KAFKA_CONSUMPTION_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: null, KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: 30_000, + KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS: 300_000, KAFKA_TOPIC_CREATION_TIMEOUT_MS: isDevEnv() ? 30_000 : 5_000, // rdkafka default is 5s, increased in devenv to resist to slow kafka KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500, APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000, diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 08915759c1cf6..83139ad21798d 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -29,6 +29,7 @@ export const startBatchConsumer = async ({ topic, autoCommit, sessionTimeout, + maxPollIntervalMs, consumerMaxBytesPerPartition, consumerMaxBytes, consumerMaxWaitMs, @@ -44,6 +45,7 @@ export const startBatchConsumer = async ({ topic: string autoCommit: boolean sessionTimeout: number + maxPollIntervalMs: number consumerMaxBytesPerPartition: number consumerMaxBytes: number consumerMaxWaitMs: number @@ -78,6 +80,7 @@ export const startBatchConsumer = async ({ ...connectionConfig, 'group.id': groupId, 'session.timeout.ms': sessionTimeout, + 'max.poll.interval.ms': maxPollIntervalMs, 'enable.auto.commit': autoCommit, 'enable.auto.offset.store': false, /** diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 48de128e739d3..7b47db8c0c695 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -204,6 +204,7 @@ export class IngestionConsumer { groupId: this.consumerGroupId, autoCommit: true, sessionTimeout: this.pluginsServer.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS, + maxPollIntervalMs: this.pluginsServer.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, consumerMaxBytes: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES, consumerMaxBytesPerPartition: this.pluginsServer.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, consumerMaxWaitMs: this.pluginsServer.KAFKA_CONSUMPTION_MAX_WAIT_MS, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 470827f569422..c28021f047cfb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -379,6 +379,7 @@ export class SessionRecordingIngester { topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, autoCommit: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, + maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, // the largest size of a message that can be fetched by the consumer. // the largest size our MSK cluster allows is 20MB // we only use 9 or 10MB but there's no reason to limit this 🤷️ diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index b0b7535954875..ecd7d6e8c8182 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -137,6 +137,7 @@ export interface PluginsServerConfig { KAFKA_CONSUMPTION_OVERFLOW_TOPIC: string | null KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: number | null KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: number + KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS: number KAFKA_TOPIC_CREATION_TIMEOUT_MS: number KAFKA_PRODUCER_LINGER_MS: number // linger.ms rdkafka parameter KAFKA_PRODUCER_BATCH_SIZE: number // batch.size rdkafka parameter