From b118d4f21e3e122e8e9710b3e10ac7101cd162a9 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 27 Sep 2023 14:00:43 +0200 Subject: [PATCH] fix: Use dedicated watermark tracker for persistent values (#17599) --- .../services/replay-events-ingester.ts | 6 +- .../session-recordings-consumer-v2.ts | 189 ++++---- .../session-recording/utils.ts | 30 +- .../session-recording/fixtures.ts | 5 +- .../session-recordings-consumer-v2.test.ts | 409 ++++++++++-------- .../session-recording/utils.test.ts | 17 + 6 files changed, 385 insertions(+), 271 deletions(-) create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index c9dacf1fabdef..78d15f10406fa 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -28,7 +28,7 @@ export class ReplayEventsIngester { constructor( private readonly serverConfig: PluginsServerConfig, - private readonly offsetHighWaterMarker: OffsetHighWaterMarker + private readonly persistentHighWaterMarker: OffsetHighWaterMarker ) {} public async consumeBatch(messages: IncomingRecordingMessage[]) { @@ -75,7 +75,7 @@ export class ReplayEventsIngester { const topicPartitionOffsets = findOffsetsToCommit(messages.map((message) => message.metadata)) await Promise.all( - topicPartitionOffsets.map((tpo) => this.offsetHighWaterMarker.add(tpo, HIGH_WATERMARK_KEY, tpo.offset)) + topicPartitionOffsets.map((tpo) => this.persistentHighWaterMarker.add(tpo, HIGH_WATERMARK_KEY, tpo.offset)) ) } @@ -106,7 +106,7 @@ export class ReplayEventsIngester { } if ( - await this.offsetHighWaterMarker.isBelowHighWaterMark( + await this.persistentHighWaterMarker.isBelowHighWaterMark( event.metadata, HIGH_WATERMARK_KEY, event.metadata.offset diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index f783b7390bc7e..d42f756b7d49c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -24,15 +24,15 @@ import { RealtimeManager } from './services/realtime-manager' import { ReplayEventsIngester } from './services/replay-events-ingester' import { SessionManager } from './services/session-manager' import { IncomingRecordingMessage } from './types' -import { bufferFileDir, now } from './utils' +import { bufferFileDir, now, queryWatermarkOffsets } from './utils' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') -const groupId = 'session-recordings-blob' -const sessionTimeout = 30000 +// WARNING: Do not change this - it will essentially reset the consumer +const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob' +const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 const PARTITION_LOCK_INTERVAL_MS = 10000 -const HIGH_WATERMARK_KEY = 'session_replay_blob_ingester' // const flushIntervalTimeoutMs = 30000 @@ -97,7 +97,8 @@ type PartitionMetrics = { export class SessionRecordingIngesterV2 { redisPool: RedisPool sessions: Record = {} - offsetHighWaterMarker: OffsetHighWaterMarker + sessionHighWaterMarker: OffsetHighWaterMarker + persistentHighWaterMarker: OffsetHighWaterMarker realtimeManager: RealtimeManager replayEventsIngester: ReplayEventsIngester partitionLocker: PartitionLocker @@ -124,13 +125,18 @@ export class SessionRecordingIngesterV2 { this.realtimeManager = new RealtimeManager(this.redisPool, this.config) this.partitionLocker = new PartitionLocker(this.redisPool, this.config.SESSION_RECORDING_REDIS_PREFIX) - this.offsetHighWaterMarker = new OffsetHighWaterMarker( + this.sessionHighWaterMarker = new OffsetHighWaterMarker( this.redisPool, this.config.SESSION_RECORDING_REDIS_PREFIX ) + this.persistentHighWaterMarker = new OffsetHighWaterMarker( + this.redisPool, + this.config.SESSION_RECORDING_REDIS_PREFIX + 'persistent/' + ) + // NOTE: This is the only place where we need to use the shared server config - this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.offsetHighWaterMarker) + this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.persistentHighWaterMarker) this.teamsRefresher = new BackgroundRefresher(async () => { try { @@ -145,25 +151,9 @@ export class SessionRecordingIngesterV2 { this.offsetsRefresher = new BackgroundRefresher(async () => { const results = await Promise.all( - this.assignedTopicPartitions.map(async ({ partition }) => { - return new Promise<[number, number]>((resolve, reject) => { - if (!this.batchConsumer) { - return reject('Not connected') - } - this.batchConsumer.consumer.queryWatermarkOffsets( - KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, - partition, - (err, offsets) => { - if (err) { - status.error('🔥', 'Failed to query kafka watermark offsets', err) - return reject() - } - - resolve([partition, offsets.highOffset]) - } - ) - }) - }) + this.assignedTopicPartitions.map(({ partition }) => + queryWatermarkOffsets(this.batchConsumer, partition) + ) ) return results.reduce((acc, [partition, highOffset]) => { @@ -206,7 +196,9 @@ export class SessionRecordingIngesterV2 { }) // Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking) - if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, HIGH_WATERMARK_KEY, offset)) { + if ( + await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, KAFKA_CONSUMER_GROUP_ID, offset) + ) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', @@ -218,7 +210,7 @@ export class SessionRecordingIngesterV2 { return } - if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) { + if (await this.sessionHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', @@ -237,7 +229,7 @@ export class SessionRecordingIngesterV2 { this.config, this.objectStorage.s3, this.realtimeManager, - this.offsetHighWaterMarker, + this.sessionHighWaterMarker, team_id, session_id, partition, @@ -354,8 +346,9 @@ export class SessionRecordingIngesterV2 { // For some reason timestamp can be null. If it isn't, update our ingestion metrics metrics.lastMessageTimestamp = timestamp - // If we don't have a last known commit then set it to this offset as we can't commit lower than that - metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset + + // If we don't have a last known commit then set it to the offset before as that must be the last commit + metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset - 1 metrics.lastMessageOffset = offset counterKafkaMessageReceived.inc({ partition }) @@ -399,22 +392,19 @@ export class SessionRecordingIngesterV2 { }, }) - for (const message of messages) { - // Now that we have consumed everything, attempt to commit all messages in this batch - const { partition, offset } = message - await this.commitOffset(message.topic, partition, offset) - } - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`, + statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`, func: async () => { - await this.replayEventsIngester.consumeBatch(recordingMessages) + await this.flushAllReadySessions() }, }) + + await this.commitAllOffsets(this.partitionAssignments, Object.values(this.sessions)) + await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`, + statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`, func: async () => { - await this.flushAllReadySessions() + await this.replayEventsIngester.consumeBatch(recordingMessages) }, }) }, @@ -444,7 +434,6 @@ export class SessionRecordingIngesterV2 { await this.realtimeManager.subscribe() // Load teams into memory await this.teamsRefresher.refresh() - await this.replayEventsIngester.start() if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { @@ -460,9 +449,9 @@ export class SessionRecordingIngesterV2 { this.batchConsumer = await startBatchConsumer({ connectionConfig, - groupId, + groupId: KAFKA_CONSUMER_GROUP_ID, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, - sessionTimeout, + sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, // the largest size of a message that can be fetched by the consumer. // the largest size our MSK cluster allows is 20MB // we only use 9 or 10MB but there's no reason to limit this 🤷️ @@ -571,6 +560,7 @@ export class SessionRecordingIngesterV2 { } const sessionsToDrop: SessionManager[] = [] + const partitionsToDrop: Record = {} // First we pull out all sessions that are being dropped. This way if we get reassigned and start consuming, we don't accidentally destroy them Object.entries(this.sessions).forEach(([key, sessionManager]) => { @@ -583,13 +573,14 @@ export class SessionRecordingIngesterV2 { // Reset all metrics for the revoked partitions topicPartitions.forEach((topicPartition: TopicPartition) => { const partition = topicPartition.partition - + partitionsToDrop[partition] = this.partitionAssignments[partition] delete this.partitionAssignments[partition] gaugeLag.remove({ partition }) gaugeLagMilliseconds.remove({ partition }) gaugeOffsetCommitted.remove({ partition }) gaugeOffsetCommitFailed.remove({ partition }) - this.offsetHighWaterMarker.revoke(topicPartition) + this.sessionHighWaterMarker.revoke(topicPartition) + this.persistentHighWaterMarker.revoke(topicPartition) }) gaugeSessionsRevoked.set(sessionsToDrop.length) @@ -612,19 +603,20 @@ export class SessionRecordingIngesterV2 { await runInstrumentedFunction({ statsKey: `recordingingester.onRevokePartitions.flushSessions`, logExecutionTime: true, - func: async () => { + func: async () => await Promise.allSettled( sessionsToDrop .sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity) .map((x) => x.flush('partition_shutdown')) - ) - }, + ), }) + await this.commitAllOffsets(partitionsToDrop, sessionsToDrop) await this.partitionLocker.release(topicPartitions) } await Promise.allSettled(sessionsToDrop.map((x) => x.destroy())) + // TODO: If the above works, all sessions are removed. Can we drop? await this.offsetsRefresher.refresh() }, }) @@ -673,55 +665,70 @@ export class SessionRecordingIngesterV2 { ) } - // Given a topic and partition and a list of offsets, commit the highest offset - // that is no longer found across any of the existing sessions. - // This approach is fault-tolerant in that if anything goes wrong, the next commit on that partition will work - public async commitOffset(topic: string, partition: number, offset: number): Promise { - const topicPartition = { topic, partition } - let potentiallyBlockingSession: SessionManager | undefined - - for (const sessionManager of Object.values(this.sessions)) { - if (sessionManager.partition === partition && sessionManager.topic === topic) { - const lowestOffset = sessionManager.getLowestOffset() - if ( - lowestOffset !== null && - lowestOffset < (potentiallyBlockingSession?.getLowestOffset() || Infinity) - ) { - potentiallyBlockingSession = sessionManager + public async commitAllOffsets( + partitions: Record, + blockingSessions: SessionManager[] + ): Promise { + await Promise.all( + Object.entries(partitions).map(async ([p, metrics]) => { + /** + * For each partition we want to commit either: + * The lowest blocking session (one we haven't flushed yet on that partition) + * OR the latest offset we have consumed for that partition + */ + const partition = parseInt(p) + const tp = { + topic: this.topic, + partition, } - } - } - const potentiallyBlockingOffset = potentiallyBlockingSession?.getLowestOffset() ?? null + let potentiallyBlockingSession: SessionManager | undefined - // If we have any other session for this topic-partition then we can only commit offsets that are lower than it - const highestOffsetToCommit = - potentiallyBlockingOffset !== null && potentiallyBlockingOffset < offset - ? potentiallyBlockingOffset - : offset + for (const sessionManager of blockingSessions) { + if (sessionManager.partition === partition) { + const lowestOffset = sessionManager.getLowestOffset() + if ( + lowestOffset !== null && + lowestOffset < (potentiallyBlockingSession?.getLowestOffset() || Infinity) + ) { + potentiallyBlockingSession = sessionManager + } + } + } - const lastKnownCommit = this.partitionAssignments[partition]?.lastKnownCommit || 0 - // TODO: Check how long we have been blocked by any individual session and if it is too long then we should - // capture an exception to figure out why - if (lastKnownCommit >= highestOffsetToCommit) { - // If we have already commited this offset then we don't need to do it again - return - } + const potentiallyBlockingOffset = potentiallyBlockingSession?.getLowestOffset() ?? null - if (this.partitionAssignments[partition]) { - this.partitionAssignments[partition].lastKnownCommit = highestOffsetToCommit - } + // We will either try to commit the lowest blocking offset OR whatever we know to be the latest offset we have consumed + const highestOffsetToCommit = potentiallyBlockingOffset + ? potentiallyBlockingOffset - 1 // TRICKY: We want to commit the offset before the lowest blocking offset + : metrics.lastMessageOffset // Or the last message we have seen as it is no longer blocked + const lastKnownCommit = metrics.lastKnownCommit ?? -1 - this.batchConsumer?.consumer.commit({ - ...topicPartition, - // see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example - // for some reason you commit the next offset you expect to read and not the one you actually have - offset: highestOffsetToCommit + 1, - }) + if (!highestOffsetToCommit) { + return + } + + // If the last known commit is more than or equal to the highest offset we want to commit then we don't need to do anything + if (lastKnownCommit >= highestOffsetToCommit) { + return + } - await this.offsetHighWaterMarker.add(topicPartition, HIGH_WATERMARK_KEY, highestOffsetToCommit) - await this.offsetHighWaterMarker.clear({ topic, partition }, highestOffsetToCommit) - gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit) + this.batchConsumer?.consumer.commit({ + ...tp, + // see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example + // for some reason you commit the next offset you expect to read and not the one you actually have + offset: highestOffsetToCommit + 1, + }) + + metrics.lastKnownCommit = highestOffsetToCommit + + // Store the committed offset to the persistent store to avoid rebalance issues + await this.persistentHighWaterMarker.add(tp, KAFKA_CONSUMER_GROUP_ID, highestOffsetToCommit) + // Clear all session offsets below the committed offset (as we know they have been flushed) + await this.sessionHighWaterMarker.clear(tp, highestOffsetToCommit) + gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit) + }) + ) } public async destroySessions(sessionsToDestroy: [string, SessionManager][]): Promise { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 5ef9dace47c87..203c4ecd738a8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -1,6 +1,9 @@ import { DateTime } from 'luxon' import path from 'path' +import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' +import { BatchConsumer } from '../../../kafka/batch-consumer' +import { status } from '../../../utils/status' import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => { @@ -15,12 +18,35 @@ export const now = () => DateTime.now().toMillis() export const minDefined = (...args: (number | undefined)[]): number | undefined => { const definedArgs = args.filter((arg) => arg !== undefined) as number[] - return Math.min(...definedArgs) + return definedArgs.length ? Math.min(...definedArgs) : undefined } export const maxDefined = (...args: (number | undefined)[]): number | undefined => { const definedArgs = args.filter((arg) => arg !== undefined) as number[] - return Math.max(...definedArgs) + return definedArgs.length ? Math.max(...definedArgs) : undefined } export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files') + +export const queryWatermarkOffsets = ( + batchConsumer: BatchConsumer | undefined, + partition: number +): Promise<[number, number]> => { + return new Promise<[number, number]>((resolve, reject) => { + if (!batchConsumer) { + return reject('Not connected') + } + batchConsumer.consumer.queryWatermarkOffsets( + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + partition, + (err, offsets) => { + if (err) { + status.error('🔥', 'Failed to query kafka watermark offsets', err) + return reject() + } + + resolve([partition, offsets.highOffset]) + } + ) + }) +} diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts index 0ad3b2c5d46fc..2f82463b156bf 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -1,5 +1,6 @@ import { Message } from 'node-rdkafka-acosom' +import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics' import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types' import jsonFullSnapshot from './data/snapshot-full.json' @@ -40,7 +41,7 @@ export function createKafkaMessage( ): Message { const message: Message = { partition: 1, - topic: 'topic', + topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, offset: 0, timestamp: messageOverrides.timestamp ?? Date.now(), size: 1, @@ -66,6 +67,6 @@ export function createKafkaMessage( return message } -export function createTP(partition: number, topic = 'topic') { +export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) { return { topic, partition } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts index 53cc8f019d861..a2562eedcaa27 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts @@ -1,11 +1,11 @@ -import { mkdirSync, rmSync } from 'node:fs' +import { mkdirSync, readdirSync, rmSync } from 'node:fs' import { Message } from 'node-rdkafka-acosom' import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' import { defaultConfig } from '../../../../src/config/config' import { SessionRecordingIngesterV2 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v2' -import { Hub, PluginsServerConfig } from '../../../../src/types' +import { Hub, PluginsServerConfig, Team } from '../../../../src/types' import { createHub } from '../../../../src/utils/db/hub' import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql' import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures' @@ -60,7 +60,9 @@ describe('ingester', () => { let hub: Hub let closeHub: () => Promise + let team: Team let teamToken = '' + let nextOffset: number beforeAll(async () => { mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) @@ -69,12 +71,20 @@ describe('ingester', () => { beforeEach(async () => { ;[hub, closeHub] = await createHub() - const team = await getFirstTeam(hub) + team = await getFirstTeam(hub) teamToken = team.api_token await deleteKeysWithPrefix(hub) ingester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage) await ingester.start() + nextOffset = 1 + + // Our tests will use multiple partitions so we assign them to begin with + await ingester.onAssignPartitions([createTP(1), createTP(2)]) + expect(ingester.partitionAssignments).toMatchObject({ + '1': {}, + '2': {}, + }) }) afterEach(async () => { @@ -89,6 +99,23 @@ describe('ingester', () => { jest.useRealTimers() }) + const commitAllOffsets = async () => { + await ingester.commitAllOffsets(ingester.partitionAssignments, Object.values(ingester.sessions)) + } + + const createMessage = (session_id: string, partition = 1) => { + return createKafkaMessage( + teamToken, + { + partition, + offset: nextOffset++, + }, + { + $session_id: session_id, + } + ) + } + it('creates a new session manager if needed', async () => { const event = createIncomingRecordingMessage() await ingester.consume(event) @@ -208,126 +235,210 @@ describe('ingester', () => { }) }) - // NOTE: Committing happens by the parent describe('offset committing', () => { - const metadata = { - partition: 1, - topic: 'session_recording_events', - } - let _offset = 0 - const offset = () => _offset++ - - const addMessage = (session_id: string) => - createIncomingRecordingMessage({ session_id }, { ...metadata, offset: offset() }) - - beforeEach(() => { - _offset = 0 - }) - - const tryToCommitLatestOffset = async () => { - await ingester.commitOffset(metadata.topic, metadata.partition, _offset) - } - it('should commit offsets in simple cases', async () => { - await ingester.consume(addMessage('sid1')) - await ingester.consume(addMessage('sid1')) - expect(_offset).toBe(2) - await tryToCommitLatestOffset() + await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')]) + expect(ingester.partitionAssignments[1]).toMatchObject({ + lastMessageOffset: 2, + lastKnownCommit: 0, + }) + + await commitAllOffsets() // Doesn't flush if we have a blocking session expect(mockCommit).toHaveBeenCalledTimes(0) - await ingester.sessions['1-sid1']?.flush('buffer_age') - await tryToCommitLatestOffset() + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() expect(mockCommit).toHaveBeenCalledTimes(1) - expect(mockCommit).toHaveBeenLastCalledWith({ - ...metadata, - offset: 3, - }) + expect(mockCommit).toHaveBeenLastCalledWith( + expect.objectContaining({ + offset: 2 + 1, + partition: 1, + }) + ) }) it('should commit higher values but not lower', async () => { - // We need to simulate the paritition assignent logic here - ingester.partitionAssignments[1] = {} - await ingester.consume(addMessage('sid1')) - await ingester.sessions['1-sid1']?.flush('buffer_age') - await tryToCommitLatestOffset() + await ingester.handleEachBatch([createMessage('sid1')]) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + expect(ingester.partitionAssignments[1].lastMessageOffset).toBe(1) + await commitAllOffsets() expect(mockCommit).toHaveBeenCalledTimes(1) - expect(mockCommit).toHaveBeenLastCalledWith({ - ...metadata, - offset: 2, - }) - - const olderOffsetSomehow = addMessage('sid1') - olderOffsetSomehow.metadata.offset = 1 + expect(mockCommit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2, + }) + ) - await ingester.consume(olderOffsetSomehow) - await ingester.sessions['1-sid1']?.flush('buffer_age') - await ingester.commitOffset(metadata.topic, metadata.partition, 1) + // Repeat commit doesn't do anything + await commitAllOffsets() expect(mockCommit).toHaveBeenCalledTimes(1) - await ingester.consume(addMessage('sid1')) - await ingester.sessions['1-sid1']?.flush('buffer_age') - await tryToCommitLatestOffset() + await ingester.handleEachBatch([createMessage('sid1')]) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() expect(mockCommit).toHaveBeenCalledTimes(2) - expect(mockCommit).toHaveBeenLastCalledWith({ - ...metadata, - offset: 4, - }) + expect(mockCommit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2 + 1, + }) + ) }) it('should commit the lowest known offset if there is a blocking session', async () => { - await ingester.consume(addMessage('sid1')) // 1 - await ingester.consume(addMessage('sid2')) // 2 - await ingester.consume(addMessage('sid2')) // 3 - await ingester.consume(addMessage('sid2')) // 4 - await ingester.sessions['1-sid2']?.flush('buffer_age') - await tryToCommitLatestOffset() + await ingester.handleEachBatch([ + createMessage('sid1'), + createMessage('sid2'), + createMessage('sid2'), + createMessage('sid2'), + ]) + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + + expect(ingester.partitionAssignments[1]).toMatchObject({ + lastMessageOffset: 4, + lastKnownCommit: 0, + }) // No offsets are below the blocking one expect(mockCommit).not.toHaveBeenCalled() - await ingester.sessions['1-sid1']?.flush('buffer_age') + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - // Simulating the next incoming message triggers a commit for sure - await tryToCommitLatestOffset() - expect(mockCommit).toHaveBeenLastCalledWith({ - ...metadata, - offset: 5, - }) + // Subsequent commit will commit the last known offset + await commitAllOffsets() + expect(mockCommit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 4 + 1, + }) + ) }) it('should commit one lower than the blocking session if that is the highest', async () => { - await ingester.consume(addMessage('sid1')) // 1 - await ingester.consume(addMessage('sid2')) // 2 - await ingester.consume(addMessage('sid2')) // 3 - await ingester.consume(addMessage('sid2')) // 4 - await ingester.sessions['1-sid2']?.flush('buffer_age') - await tryToCommitLatestOffset() + await ingester.handleEachBatch([ + createMessage('sid1'), + createMessage('sid2'), + createMessage('sid2'), + createMessage('sid2'), + ]) + // Flush the second session so the first one is still blocking + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() // No offsets are below the blocking one expect(mockCommit).not.toHaveBeenCalled() - await ingester.consume(addMessage('sid2')) // 5 - await ingester.sessions['1-sid1']?.flush('buffer_age') - await tryToCommitLatestOffset() - expect(mockCommit).toHaveBeenLastCalledWith({ - ...metadata, - offset: 5, // Same as the blocking session and more than the highest commitable for sid1 (1) - }) + // Add a new message and session and flush the old one + await ingester.handleEachBatch([createMessage('sid2')]) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + + // We should commit the offset of the blocking session + expect(mockCommit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: ingester.sessions[`${team.id}-sid2`].getLowestOffset(), + }) + ) }) it('should not be affected by other partitions ', async () => { - createIncomingRecordingMessage({ session_id: 'sid1' }, { ...metadata, partition: 2, offset: offset() }) - await ingester.consume(addMessage('sid2')) // 2 - await ingester.consume(addMessage('sid2')) // 3 - await ingester.sessions['1-sid2']?.flush('buffer_age') - await tryToCommitLatestOffset() - - expect(mockCommit).toHaveBeenLastCalledWith({ - ...metadata, - offset: 4, + await ingester.handleEachBatch([ + createMessage('sid1', 1), // offset 1 + createMessage('sid2', 2), // offset 2 + createMessage('sid2', 2), // offset 3 + ]) + + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await ingester.handleEachBatch([createMessage('sid1', 1)]) // offset 4 + + // We should now have a blocking session on partition 1 and 2 with partition 1 being commitable + await commitAllOffsets() + expect(mockCommit).toHaveBeenCalledTimes(1) + expect(mockCommit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 4, + }) + ) + + mockCommit.mockReset() + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + expect(mockCommit).toHaveBeenCalledTimes(2) + expect(mockCommit).toHaveBeenCalledWith( + expect.objectContaining({ + partition: 1, + offset: 5, + }) + ) + expect(mockCommit).toHaveBeenCalledWith( + expect.objectContaining({ + partition: 2, + offset: 4, + }) + ) + }) + }) + + describe('watermarkers', () => { + const getSessionWaterMarks = (partition = 1) => + ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition)) + const getPersistentWaterMarks = (partition = 1) => + ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition)) + + it('should update session watermarkers with flushing', async () => { + await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid3')]) + await expect(getSessionWaterMarks()).resolves.toEqual({}) + + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1 }) + await ingester.sessions[`${team.id}-sid3`].flush('buffer_age') + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1, sid2: 2, sid3: 3 }) + }) + + it('should update partition watermarkers when committing', async () => { + await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid1')]) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + expect(mockCommit).toHaveBeenCalledTimes(1) + + // sid1 should be watermarked up until the 3rd message as it HAS been processed + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 3 }) + // all replay events should be watermarked up until the 3rd message as they HAVE been processed + // whereas the commited kafka offset should be the 1st message as the 2nd message HAS not been processed + await expect(getPersistentWaterMarks()).resolves.toEqual({ + 'session-recordings-blob': 1, + session_replay_events_ingester: 3, + }) + }) + + it('should drop events that are higher than the watermarks', async () => { + const events = [createMessage('sid1'), createMessage('sid2'), createMessage('sid2')] + + await expect(getPersistentWaterMarks()).resolves.toEqual({}) + await ingester.handleEachBatch([events[0], events[1]]) + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + expect(mockCommit).not.toHaveBeenCalled() + await expect(getPersistentWaterMarks()).resolves.toEqual({ + session_replay_events_ingester: 2, + }) + await expect(getSessionWaterMarks()).resolves.toEqual({ + sid2: 2, // only processed the second message so far }) + + // Simulate a re-processing + await ingester.destroySessions(Object.entries(ingester.sessions)) + await ingester.handleEachBatch(events) + expect(ingester.sessions[`${team.id}-sid2`].buffer.count).toBe(1) + expect(ingester.sessions[`${team.id}-sid1`].buffer.count).toBe(1) }) }) @@ -347,52 +458,8 @@ describe('ingester', () => { * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs */ it('rebalances new consumers', async () => { - const partitionMsgs1 = [ - createKafkaMessage( - teamToken, - { - partition: 1, - offset: 1, - }, - { - $session_id: 'session_id_1', - } - ), - - createKafkaMessage( - teamToken, - { - partition: 1, - offset: 2, - }, - { - $session_id: 'session_id_2', - } - ), - ] - - const partitionMsgs2 = [ - createKafkaMessage( - teamToken, - { - partition: 2, - offset: 1, - }, - { - $session_id: 'session_id_3', - } - ), - createKafkaMessage( - teamToken, - { - partition: 2, - offset: 2, - }, - { - $session_id: 'session_id_4', - } - ), - ] + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] await ingester.onAssignPartitions([createTP(1), createTP(2), createTP(3)]) await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2]) @@ -413,20 +480,7 @@ describe('ingester', () => { // Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit // Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke - await otherIngester.handleEachBatch([ - ...partitionMsgs2, - createKafkaMessage( - teamToken, - { - partition: 2, - offset: 3, - }, - { - $session_id: 'session_id_4', - } - ), - ]) - + await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)]) await Promise.all(rebalancePromises) // Should still have the partition 1 sessions that didnt move @@ -439,34 +493,43 @@ describe('ingester', () => { Object.values(otherIngester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) ).toEqual(['2:session_id_4:1']) }) + + it("flushes and commits as it's revoked", async () => { + await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid3', 2)]) + const revokePromise = ingester.onRevokePartitions([createTP(1)]) + + expect(Object.keys(ingester.sessions)).toEqual([`${team.id}-sid3`]) + + expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ + expect.stringContaining(`${team.id}.sid1.`), // gz + expect.stringContaining(`${team.id}.sid1.`), // json + expect.stringContaining(`${team.id}.sid2.`), // gz + expect.stringContaining(`${team.id}.sid2.`), // json + expect.stringContaining(`${team.id}.sid3.`), // gz + expect.stringContaining(`${team.id}.sid3.`), // json + ]) + + await revokePromise + + // Only files left on the system should be the sid3 ones + expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ + expect.stringContaining(`${team.id}.sid3.`), // gz + expect.stringContaining(`${team.id}.sid3.`), // json + ]) + + expect(mockCommit).toHaveBeenCalledTimes(1) + expect(mockCommit).toHaveBeenLastCalledWith( + expect.objectContaining({ + offset: 2 + 1, + partition: 1, + }) + ) + }) }) describe('stop()', () => { const setup = async (): Promise => { - const partitionMsgs1 = [ - createKafkaMessage( - teamToken, - { - partition: 1, - offset: 1, - }, - { - $session_id: 'session_id_1', - } - ), - - createKafkaMessage( - teamToken, - { - partition: 1, - offset: 2, - }, - { - $session_id: 'session_id_2', - } - ), - ] - + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] await ingester.onAssignPartitions([createTP(1)]) await ingester.handleEachBatch(partitionMsgs1) } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts new file mode 100644 index 0000000000000..228ad183395b3 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -0,0 +1,17 @@ +import { maxDefined, minDefined } from '../../../../src/main/ingestion-queues/session-recording/utils' + +describe('session-recording utils', () => { + it('minDefined', () => { + expect(minDefined(1, 2, 3)).toEqual(1) + expect(minDefined(1, undefined, 3)).toEqual(1) + expect(minDefined(undefined, undefined, undefined)).toEqual(undefined) + expect(maxDefined()).toEqual(undefined) + }) + + it('maxDefined', () => { + expect(maxDefined(1, 2, 3)).toEqual(3) + expect(maxDefined(1, undefined, 3)).toEqual(3) + expect(maxDefined(undefined, undefined, undefined)).toEqual(undefined) + expect(maxDefined()).toEqual(undefined) + }) +})