Skip to content

Commit

Permalink
fix: Use dedicated watermark tracker for persistent values (#17599)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 27, 2023
1 parent ef3bcb9 commit b118d4f
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class ReplayEventsIngester {

constructor(
private readonly serverConfig: PluginsServerConfig,
private readonly offsetHighWaterMarker: OffsetHighWaterMarker
private readonly persistentHighWaterMarker: OffsetHighWaterMarker
) {}

public async consumeBatch(messages: IncomingRecordingMessage[]) {
Expand Down Expand Up @@ -75,7 +75,7 @@ export class ReplayEventsIngester {

const topicPartitionOffsets = findOffsetsToCommit(messages.map((message) => message.metadata))
await Promise.all(
topicPartitionOffsets.map((tpo) => this.offsetHighWaterMarker.add(tpo, HIGH_WATERMARK_KEY, tpo.offset))
topicPartitionOffsets.map((tpo) => this.persistentHighWaterMarker.add(tpo, HIGH_WATERMARK_KEY, tpo.offset))
)
}

Expand Down Expand Up @@ -106,7 +106,7 @@ export class ReplayEventsIngester {
}

if (
await this.offsetHighWaterMarker.isBelowHighWaterMark(
await this.persistentHighWaterMarker.isBelowHighWaterMark(
event.metadata,
HIGH_WATERMARK_KEY,
event.metadata.offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import { RealtimeManager } from './services/realtime-manager'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { SessionManager } from './services/session-manager'
import { IncomingRecordingMessage } from './types'
import { bufferFileDir, now } from './utils'
import { bufferFileDir, now, queryWatermarkOffsets } from './utils'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')

const groupId = 'session-recordings-blob'
const sessionTimeout = 30000
// WARNING: Do not change this - it will essentially reset the consumer
const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob'
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000
const PARTITION_LOCK_INTERVAL_MS = 10000
const HIGH_WATERMARK_KEY = 'session_replay_blob_ingester'

// const flushIntervalTimeoutMs = 30000

Expand Down Expand Up @@ -97,7 +97,8 @@ type PartitionMetrics = {
export class SessionRecordingIngesterV2 {
redisPool: RedisPool
sessions: Record<string, SessionManager> = {}
offsetHighWaterMarker: OffsetHighWaterMarker
sessionHighWaterMarker: OffsetHighWaterMarker
persistentHighWaterMarker: OffsetHighWaterMarker
realtimeManager: RealtimeManager
replayEventsIngester: ReplayEventsIngester
partitionLocker: PartitionLocker
Expand All @@ -124,13 +125,18 @@ export class SessionRecordingIngesterV2 {
this.realtimeManager = new RealtimeManager(this.redisPool, this.config)
this.partitionLocker = new PartitionLocker(this.redisPool, this.config.SESSION_RECORDING_REDIS_PREFIX)

this.offsetHighWaterMarker = new OffsetHighWaterMarker(
this.sessionHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
this.config.SESSION_RECORDING_REDIS_PREFIX
)

this.persistentHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
this.config.SESSION_RECORDING_REDIS_PREFIX + 'persistent/'
)

// NOTE: This is the only place where we need to use the shared server config
this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.offsetHighWaterMarker)
this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.persistentHighWaterMarker)

this.teamsRefresher = new BackgroundRefresher(async () => {
try {
Expand All @@ -145,25 +151,9 @@ export class SessionRecordingIngesterV2 {

this.offsetsRefresher = new BackgroundRefresher(async () => {
const results = await Promise.all(
this.assignedTopicPartitions.map(async ({ partition }) => {
return new Promise<[number, number]>((resolve, reject) => {
if (!this.batchConsumer) {
return reject('Not connected')
}
this.batchConsumer.consumer.queryWatermarkOffsets(
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
partition,
(err, offsets) => {
if (err) {
status.error('🔥', 'Failed to query kafka watermark offsets', err)
return reject()
}

resolve([partition, offsets.highOffset])
}
)
})
})
this.assignedTopicPartitions.map(({ partition }) =>
queryWatermarkOffsets(this.batchConsumer, partition)
)
)

return results.reduce((acc, [partition, highOffset]) => {
Expand Down Expand Up @@ -206,7 +196,9 @@ export class SessionRecordingIngesterV2 {
})

// Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking)
if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, HIGH_WATERMARK_KEY, offset)) {
if (
await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, KAFKA_CONSUMER_GROUP_ID, offset)
) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
Expand All @@ -218,7 +210,7 @@ export class SessionRecordingIngesterV2 {
return
}

if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) {
if (await this.sessionHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
Expand All @@ -237,7 +229,7 @@ export class SessionRecordingIngesterV2 {
this.config,
this.objectStorage.s3,
this.realtimeManager,
this.offsetHighWaterMarker,
this.sessionHighWaterMarker,
team_id,
session_id,
partition,
Expand Down Expand Up @@ -354,8 +346,9 @@ export class SessionRecordingIngesterV2 {

// For some reason timestamp can be null. If it isn't, update our ingestion metrics
metrics.lastMessageTimestamp = timestamp
// If we don't have a last known commit then set it to this offset as we can't commit lower than that
metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset

// If we don't have a last known commit then set it to the offset before as that must be the last commit
metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset - 1
metrics.lastMessageOffset = offset

counterKafkaMessageReceived.inc({ partition })
Expand Down Expand Up @@ -399,22 +392,19 @@ export class SessionRecordingIngesterV2 {
},
})

for (const message of messages) {
// Now that we have consumed everything, attempt to commit all messages in this batch
const { partition, offset } = message
await this.commitOffset(message.topic, partition, offset)
}

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`,
statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`,
func: async () => {
await this.replayEventsIngester.consumeBatch(recordingMessages)
await this.flushAllReadySessions()
},
})

await this.commitAllOffsets(this.partitionAssignments, Object.values(this.sessions))

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`,
statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`,
func: async () => {
await this.flushAllReadySessions()
await this.replayEventsIngester.consumeBatch(recordingMessages)
},
})
},
Expand Down Expand Up @@ -444,7 +434,6 @@ export class SessionRecordingIngesterV2 {
await this.realtimeManager.subscribe()
// Load teams into memory
await this.teamsRefresher.refresh()

await this.replayEventsIngester.start()

if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
Expand All @@ -460,9 +449,9 @@ export class SessionRecordingIngesterV2 {

this.batchConsumer = await startBatchConsumer({
connectionConfig,
groupId,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
sessionTimeout,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
Expand Down Expand Up @@ -571,6 +560,7 @@ export class SessionRecordingIngesterV2 {
}

const sessionsToDrop: SessionManager[] = []
const partitionsToDrop: Record<number, PartitionMetrics> = {}

// First we pull out all sessions that are being dropped. This way if we get reassigned and start consuming, we don't accidentally destroy them
Object.entries(this.sessions).forEach(([key, sessionManager]) => {
Expand All @@ -583,13 +573,14 @@ export class SessionRecordingIngesterV2 {
// Reset all metrics for the revoked partitions
topicPartitions.forEach((topicPartition: TopicPartition) => {
const partition = topicPartition.partition

partitionsToDrop[partition] = this.partitionAssignments[partition]
delete this.partitionAssignments[partition]
gaugeLag.remove({ partition })
gaugeLagMilliseconds.remove({ partition })
gaugeOffsetCommitted.remove({ partition })
gaugeOffsetCommitFailed.remove({ partition })
this.offsetHighWaterMarker.revoke(topicPartition)
this.sessionHighWaterMarker.revoke(topicPartition)
this.persistentHighWaterMarker.revoke(topicPartition)
})

gaugeSessionsRevoked.set(sessionsToDrop.length)
Expand All @@ -612,19 +603,20 @@ export class SessionRecordingIngesterV2 {
await runInstrumentedFunction({
statsKey: `recordingingester.onRevokePartitions.flushSessions`,
logExecutionTime: true,
func: async () => {
func: async () =>
await Promise.allSettled(
sessionsToDrop
.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity)
.map((x) => x.flush('partition_shutdown'))
)
},
),
})

await this.commitAllOffsets(partitionsToDrop, sessionsToDrop)
await this.partitionLocker.release(topicPartitions)
}

await Promise.allSettled(sessionsToDrop.map((x) => x.destroy()))
// TODO: If the above works, all sessions are removed. Can we drop?
await this.offsetsRefresher.refresh()
},
})
Expand Down Expand Up @@ -673,55 +665,70 @@ export class SessionRecordingIngesterV2 {
)
}

// Given a topic and partition and a list of offsets, commit the highest offset
// that is no longer found across any of the existing sessions.
// This approach is fault-tolerant in that if anything goes wrong, the next commit on that partition will work
public async commitOffset(topic: string, partition: number, offset: number): Promise<void> {
const topicPartition = { topic, partition }
let potentiallyBlockingSession: SessionManager | undefined

for (const sessionManager of Object.values(this.sessions)) {
if (sessionManager.partition === partition && sessionManager.topic === topic) {
const lowestOffset = sessionManager.getLowestOffset()
if (
lowestOffset !== null &&
lowestOffset < (potentiallyBlockingSession?.getLowestOffset() || Infinity)
) {
potentiallyBlockingSession = sessionManager
public async commitAllOffsets(
partitions: Record<number, PartitionMetrics>,
blockingSessions: SessionManager[]
): Promise<void> {
await Promise.all(
Object.entries(partitions).map(async ([p, metrics]) => {
/**
* For each partition we want to commit either:
* The lowest blocking session (one we haven't flushed yet on that partition)
* OR the latest offset we have consumed for that partition
*/
const partition = parseInt(p)
const tp = {
topic: this.topic,
partition,
}
}
}

const potentiallyBlockingOffset = potentiallyBlockingSession?.getLowestOffset() ?? null
let potentiallyBlockingSession: SessionManager | undefined

// If we have any other session for this topic-partition then we can only commit offsets that are lower than it
const highestOffsetToCommit =
potentiallyBlockingOffset !== null && potentiallyBlockingOffset < offset
? potentiallyBlockingOffset
: offset
for (const sessionManager of blockingSessions) {
if (sessionManager.partition === partition) {
const lowestOffset = sessionManager.getLowestOffset()
if (
lowestOffset !== null &&
lowestOffset < (potentiallyBlockingSession?.getLowestOffset() || Infinity)
) {
potentiallyBlockingSession = sessionManager
}
}
}

const lastKnownCommit = this.partitionAssignments[partition]?.lastKnownCommit || 0
// TODO: Check how long we have been blocked by any individual session and if it is too long then we should
// capture an exception to figure out why
if (lastKnownCommit >= highestOffsetToCommit) {
// If we have already commited this offset then we don't need to do it again
return
}
const potentiallyBlockingOffset = potentiallyBlockingSession?.getLowestOffset() ?? null

if (this.partitionAssignments[partition]) {
this.partitionAssignments[partition].lastKnownCommit = highestOffsetToCommit
}
// We will either try to commit the lowest blocking offset OR whatever we know to be the latest offset we have consumed
const highestOffsetToCommit = potentiallyBlockingOffset
? potentiallyBlockingOffset - 1 // TRICKY: We want to commit the offset before the lowest blocking offset
: metrics.lastMessageOffset // Or the last message we have seen as it is no longer blocked
const lastKnownCommit = metrics.lastKnownCommit ?? -1

this.batchConsumer?.consumer.commit({
...topicPartition,
// see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example
// for some reason you commit the next offset you expect to read and not the one you actually have
offset: highestOffsetToCommit + 1,
})
if (!highestOffsetToCommit) {
return
}

// If the last known commit is more than or equal to the highest offset we want to commit then we don't need to do anything
if (lastKnownCommit >= highestOffsetToCommit) {
return
}

await this.offsetHighWaterMarker.add(topicPartition, HIGH_WATERMARK_KEY, highestOffsetToCommit)
await this.offsetHighWaterMarker.clear({ topic, partition }, highestOffsetToCommit)
gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit)
this.batchConsumer?.consumer.commit({
...tp,
// see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example
// for some reason you commit the next offset you expect to read and not the one you actually have
offset: highestOffsetToCommit + 1,
})

metrics.lastKnownCommit = highestOffsetToCommit

// Store the committed offset to the persistent store to avoid rebalance issues
await this.persistentHighWaterMarker.add(tp, KAFKA_CONSUMER_GROUP_ID, highestOffsetToCommit)
// Clear all session offsets below the committed offset (as we know they have been flushed)
await this.sessionHighWaterMarker.clear(tp, highestOffsetToCommit)
gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit)
})
)
}

public async destroySessions(sessionsToDestroy: [string, SessionManager][]): Promise<void> {
Expand Down
Loading

0 comments on commit b118d4f

Please sign in to comment.