Skip to content

Commit

Permalink
fix: Use offset store for blobby commits (#23814)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jul 18, 2024
1 parent 0336955 commit eb8307f
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 8 deletions.
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ export function getDefaultConfig(): PluginsServerConfig {
SESSION_RECORDING_OVERFLOW_MIN_PER_BATCH: 1_000_000, // All sessions consume at least 1MB/batch, to penalise poor batching
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 0, // 0 disables stats collection
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB
SESSION_RECORDING_USE_OFFSET_STORE: false,
// CDP
CDP_WATCHER_OBSERVATION_PERIOD: 10000,
CDP_WATCHER_DISABLED_PERIOD: 1000 * 60 * 10,
Expand Down
4 changes: 3 additions & 1 deletion plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ export const startBatchConsumer = async ({
kafkaStatisticIntervalMs = 0,
fetchMinBytes,
maxHealthHeartbeatIntervalMs = 60_000,
autoOffsetStore = true,
}: {
connectionConfig: GlobalConfig
groupId: string
topic: string
autoCommit: boolean
autoOffsetStore?: boolean
sessionTimeout: number
maxPollIntervalMs: number
consumerMaxBytesPerPartition: number
Expand Down Expand Up @@ -311,7 +313,7 @@ export const startBatchConsumer = async ({
status.debug('⌛️', logSummary, batchSummary)
}

if (autoCommit) {
if (autoCommit && autoOffsetStore) {
storeOffsetsForMessages(messages, consumer)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,15 @@ export class SessionRecordingIngester {
// eachBatchWithContext, then commits offsets for the batch.
// the batch consumer reads from the session replay kafka cluster
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config)

const autoCommit = this.config.SESSION_RECORDING_USE_OFFSET_STORE ? true : false

this.batchConsumer = await startBatchConsumer({
connectionConfig: replayClusterConnectionConfig,
groupId: this.consumerGroupId,
topic: this.topic,
autoCommit: false,
autoCommit,
autoOffsetStore: false, // We will use our own offset store logic
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
Expand Down Expand Up @@ -828,12 +832,21 @@ export class SessionRecordingIngester {
return
}

this.connectedBatchConsumer?.commit({
...tp,
// see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example
// for some reason you commit the next offset you expect to read and not the one you actually have
offset: highestOffsetToCommit + 1,
})
if (this.config.SESSION_RECORDING_USE_OFFSET_STORE) {
this.connectedBatchConsumer?.offsetsStore([
{
...tp,
offset: highestOffsetToCommit + 1,
},
])
} else {
this.connectedBatchConsumer?.commit({
...tp,
// see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example
// for some reason you commit the next offset you expect to read and not the one you actually have
offset: highestOffsetToCommit + 1,
})
}

// Store the committed offset to the persistent store to avoid rebalance issues
await this.persistentHighWaterMarker.add(tp, this.consumerGroupId, highestOffsetToCommit)
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ export interface PluginsServerConfig extends CdpConfig {

// kafka debug stats interval
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: number

// Whether to use the offset store approach that we are testing to see if it helps rebalances
SESSION_RECORDING_USE_OFFSET_STORE: boolean
}

export interface Hub extends PluginsServerConfig {
Expand Down

0 comments on commit eb8307f

Please sign in to comment.