From eb8307f36699c63d86fdbcc5e8d727fd3f7d6b92 Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jul 2024 16:28:07 +0200 Subject: [PATCH] fix: Use offset store for blobby commits (#23814) --- plugin-server/src/config/config.ts | 1 + plugin-server/src/kafka/batch-consumer.ts | 4 ++- .../session-recordings-consumer.ts | 27 ++++++++++++++----- plugin-server/src/types.ts | 3 +++ 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 6437dbc56d9df..625f684954b2f 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -173,6 +173,7 @@ export function getDefaultConfig(): PluginsServerConfig { SESSION_RECORDING_OVERFLOW_MIN_PER_BATCH: 1_000_000, // All sessions consume at least 1MB/batch, to penalise poor batching SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 0, // 0 disables stats collection SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB + SESSION_RECORDING_USE_OFFSET_STORE: false, // CDP CDP_WATCHER_OBSERVATION_PERIOD: 10000, CDP_WATCHER_DISABLED_PERIOD: 1000 * 60 * 10, diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 4f86bd9cc77e9..dfc9dcba04983 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -75,11 +75,13 @@ export const startBatchConsumer = async ({ kafkaStatisticIntervalMs = 0, fetchMinBytes, maxHealthHeartbeatIntervalMs = 60_000, + autoOffsetStore = true, }: { connectionConfig: GlobalConfig groupId: string topic: string autoCommit: boolean + autoOffsetStore?: boolean sessionTimeout: number maxPollIntervalMs: number consumerMaxBytesPerPartition: number @@ -311,7 +313,7 @@ export const startBatchConsumer = async ({ status.debug('⌛️', logSummary, batchSummary) } - if (autoCommit) { + if (autoCommit && autoOffsetStore) { storeOffsetsForMessages(messages, consumer) } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 1e3b9ec315996..94cb62616589d 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -481,11 +481,15 @@ export class SessionRecordingIngester { // eachBatchWithContext, then commits offsets for the batch. // the batch consumer reads from the session replay kafka cluster const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config) + + const autoCommit = this.config.SESSION_RECORDING_USE_OFFSET_STORE ? true : false + this.batchConsumer = await startBatchConsumer({ connectionConfig: replayClusterConnectionConfig, groupId: this.consumerGroupId, topic: this.topic, - autoCommit: false, + autoCommit, + autoOffsetStore: false, // We will use our own offset store logic sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, // the largest size of a message that can be fetched by the consumer. @@ -828,12 +832,21 @@ export class SessionRecordingIngester { return } - this.connectedBatchConsumer?.commit({ - ...tp, - // see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example - // for some reason you commit the next offset you expect to read and not the one you actually have - offset: highestOffsetToCommit + 1, - }) + if (this.config.SESSION_RECORDING_USE_OFFSET_STORE) { + this.connectedBatchConsumer?.offsetsStore([ + { + ...tp, + offset: highestOffsetToCommit + 1, + }, + ]) + } else { + this.connectedBatchConsumer?.commit({ + ...tp, + // see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example + // for some reason you commit the next offset you expect to read and not the one you actually have + offset: highestOffsetToCommit + 1, + }) + } // Store the committed offset to the persistent store to avoid rebalance issues await this.persistentHighWaterMarker.add(tp, this.consumerGroupId, highestOffsetToCommit) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 762a994d0c83b..800b6bca5397c 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -272,6 +272,9 @@ export interface PluginsServerConfig extends CdpConfig { // kafka debug stats interval SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: number + + // Whether to use the offset store approach that we are testing to see if it helps rebalances + SESSION_RECORDING_USE_OFFSET_STORE: boolean } export interface Hub extends PluginsServerConfig {