diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index b7285d1b1ebee..caa5b8f576a11 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processAsyncOnEventHandlers: true, processAsyncWebhooksHandlers: true, sessionRecordingBlobIngestion: true, + sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED, personOverrides: true, appManagementSingleton: true, preflightSchedules: true, @@ -55,7 +56,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin sessionRecordingBlobIngestion: true, ...sharedCapabilities, } - + case PluginServerMode.recordings_blob_ingestion_overflow: + return { + sessionRecordingBlobOverflowIngestion: true, + ...sharedCapabilities, + } case PluginServerMode.recordings_ingestion_v3: return { sessionRecordingV3Ingestion: true, diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 4fd3e54b043b5..71f9bd8ee79da 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -29,6 +29,8 @@ export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffi // read session recording snapshot items export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}` +export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = `${prefix}session_recording_snapshot_item_overflow${suffix}` + // write session recording and replay events to ClickHouse export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}` export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}` diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index c8961d1e1696b..009109e62ab28 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -6,7 +6,10 @@ import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartit import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' +import { + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, +} from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' import { createKafkaProducer } from '../../../kafka/producer' @@ -43,6 +46,7 @@ require('@sentry/tracing') // WARNING: Do not change this - it will essentially reset the consumer const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob' +const KAFKA_CONSUMER_GROUP_ID_OVERFLOW = 'session-recordings-blob-overflow' const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000 const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' @@ -141,7 +145,8 @@ export class SessionRecordingIngester { teamsRefresher: BackgroundRefresher> latestOffsetsRefresher: BackgroundRefresher> config: PluginsServerConfig - topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + topic: string + consumerGroupId: string totalNumPartitions = 0 isStopping = false @@ -156,11 +161,16 @@ export class SessionRecordingIngester { private globalServerConfig: PluginsServerConfig, private postgres: PostgresRouter, private objectStorage: ObjectStorage, + private consumeOverflow: boolean, captureRedis: Redis | undefined ) { this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION ? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION) : undefined + this.topic = consumeOverflow + ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW + : KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis // We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. @@ -169,7 +179,7 @@ export class SessionRecordingIngester { this.realtimeManager = new RealtimeManager(this.redisPool, this.config) - if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) { + if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis && !consumeOverflow) { this.overflowDetection = new OverflowManager( globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY, globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE, @@ -207,7 +217,7 @@ export class SessionRecordingIngester { this.latestOffsetsRefresher = new BackgroundRefresher(async () => { const results = await Promise.all( this.assignedTopicPartitions.map(({ partition }) => - queryWatermarkOffsets(this.connectedBatchConsumer, partition).catch((err) => { + queryWatermarkOffsets(this.connectedBatchConsumer, this.topic, partition).catch((err) => { // NOTE: This can error due to a timeout or the consumer being disconnected, not stop the process // as it is currently only used for reporting lag. captureException(err) @@ -259,8 +269,6 @@ export class SessionRecordingIngester { const { team_id, session_id } = event const key = `${team_id}-${session_id}` - // TODO: use this for session key too if it's safe to do so - const overflowKey = `${team_id}:${session_id}` const { partition, highOffset } = event.metadata const isDebug = this.debugPartition === partition @@ -285,11 +293,7 @@ export class SessionRecordingIngester { // Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking) if ( - await this.persistentHighWaterMarker.isBelowHighWaterMark( - event.metadata, - KAFKA_CONSUMER_GROUP_ID, - highOffset - ) + await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, this.consumerGroupId, highOffset) ) { dropEvent('high_water_mark_partition') return @@ -318,7 +322,7 @@ export class SessionRecordingIngester { await Promise.allSettled([ this.sessions[key]?.add(event), - this.overflowDetection?.observe(overflowKey, event.metadata.rawSize, event.metadata.timestamp), + this.overflowDetection?.observe(session_id, event.metadata.rawSize, event.metadata.timestamp), ]) } @@ -486,8 +490,8 @@ export class SessionRecordingIngester { const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config) this.batchConsumer = await startBatchConsumer({ connectionConfig: replayClusterConnectionConfig, - groupId: KAFKA_CONSUMER_GROUP_ID, - topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + groupId: this.consumerGroupId, + topic: this.topic, autoCommit: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, @@ -510,7 +514,7 @@ export class SessionRecordingIngester { debug: this.config.SESSION_RECORDING_KAFKA_DEBUG, }) - this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer)).length + this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer, this.topic)).length addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) @@ -820,7 +824,7 @@ export class SessionRecordingIngester { }) // Store the committed offset to the persistent store to avoid rebalance issues - await this.persistentHighWaterMarker.add(tp, KAFKA_CONSUMER_GROUP_ID, highestOffsetToCommit) + await this.persistentHighWaterMarker.add(tp, this.consumerGroupId, highestOffsetToCommit) // Clear all session offsets below the committed offset (as we know they have been flushed) await this.sessionHighWaterMarker.clear(tp, highestOffsetToCommit) gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 896c824ff8c8f..5774d233d95f0 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -4,7 +4,6 @@ import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartitio import path from 'path' import { Counter } from 'prom-client' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types' import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { status } from '../../../utils/status' @@ -36,6 +35,7 @@ export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-f export const queryWatermarkOffsets = ( kafkaConsumer: KafkaConsumer | undefined, + topic: string, partition: number, timeout = 10000 ): Promise<[number, number]> => { @@ -44,20 +44,15 @@ export const queryWatermarkOffsets = ( return reject('Not connected') } - kafkaConsumer.queryWatermarkOffsets( - KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, - partition, - timeout, - (err, offsets) => { - if (err) { - captureException(err) - status.error('🔥', 'Failed to query kafka watermark offsets', err) - return reject(err) - } - - resolve([partition, offsets.highOffset]) + kafkaConsumer.queryWatermarkOffsets(topic, partition, timeout, (err, offsets) => { + if (err) { + captureException(err) + status.error('🔥', 'Failed to query kafka watermark offsets', err) + return reject(err) } - ) + + resolve([partition, offsets.highOffset]) + }) }) } @@ -89,7 +84,7 @@ export const queryCommittedOffsets = ( export const getPartitionsForTopic = ( kafkaConsumer: KafkaConsumer | undefined, - topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + topic: string ): Promise => { return new Promise((resolve, reject) => { if (!kafkaConsumer) { diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 8c910e1857b06..bbb153b2ee631 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -109,6 +109,7 @@ export async function startPluginsServer( // meantime. let bufferConsumer: Consumer | undefined let stopSessionRecordingBlobConsumer: (() => void) | undefined + let stopSessionRecordingBlobOverflowConsumer: (() => void) | undefined let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined @@ -151,6 +152,7 @@ export async function startPluginsServer( bufferConsumer?.disconnect(), jobsConsumer?.disconnect(), stopSessionRecordingBlobConsumer?.(), + stopSessionRecordingBlobOverflowConsumer?.(), schedulerTasksConsumer?.disconnect(), personOverridesPeriodicTask?.stop(), ]) @@ -446,7 +448,7 @@ export async function startPluginsServer( throw new Error("Can't start session recording blob ingestion without object storage") } // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas - const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, captureRedis) + const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, false, captureRedis) await ingester.start() const batchConsumer = ingester.batchConsumer @@ -458,6 +460,28 @@ export async function startPluginsServer( } } + if (capabilities.sessionRecordingBlobOverflowIngestion) { + const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) + const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) + + if (!s3) { + throw new Error("Can't start session recording blob ingestion without object storage") + } + // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas + // NOTE: We don't pass captureRedis to disable overflow computation on the overflow topic + const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, true, undefined) + await ingester.start() + + const batchConsumer = ingester.batchConsumer + + if (batchConsumer) { + stopSessionRecordingBlobOverflowConsumer = () => ingester.stop() + shutdownOnConsumerExit(batchConsumer) + healthChecks['session-recordings-blob-overflow'] = () => ingester.isHealthy() ?? false + } + } + if (capabilities.sessionRecordingV3Ingestion) { const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index db9350490bd70..82ff40eaaca9d 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -77,6 +77,7 @@ export enum PluginServerMode { scheduler = 'scheduler', analytics_ingestion = 'analytics-ingestion', recordings_blob_ingestion = 'recordings-blob-ingestion', + recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow', recordings_ingestion_v3 = 'recordings-ingestion-v3', person_overrides = 'person-overrides', } @@ -306,6 +307,7 @@ export interface PluginServerCapabilities { processAsyncOnEventHandlers?: boolean processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestion?: boolean + sessionRecordingBlobOverflowIngestion?: boolean sessionRecordingV3Ingestion?: boolean personOverrides?: boolean appManagementSingleton?: boolean diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts index 79602befd50f3..94e10a98aca7d 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -1,6 +1,5 @@ import { Message } from 'node-rdkafka' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics' import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types' import jsonFullSnapshot from './data/snapshot-full.json' @@ -41,13 +40,14 @@ export function createIncomingRecordingMessage( } export function createKafkaMessage( + topic: string, token: number | string, messageOverrides: Partial = {}, eventProperties: Record = {} ): Message { return { partition: 1, - topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + topic, offset: 0, timestamp: messageOverrides.timestamp ?? Date.now(), size: 1, @@ -72,6 +72,6 @@ export function createKafkaMessage( } } -export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) { +export function createTP(partition: number, topic: string) { return { topic, partition } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts index efa50efb4833c..26a3af8896226 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -6,6 +6,7 @@ import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' import { defaultConfig } from '../../../../src/config/config' +import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics' import { SessionManagerBufferContext, SessionManagerContext, @@ -76,6 +77,7 @@ describe('ingester', () => { let teamToken = '' let mockOffsets: Record = {} let mockCommittedOffsets: Record = {} + const consumedTopic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS beforeAll(async () => { mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) @@ -120,7 +122,7 @@ describe('ingester', () => { ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) await ingester.start() - mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)]) }) afterEach(async () => { @@ -139,6 +141,7 @@ describe('ingester', () => { mockOffsets[partition]++ return createKafkaMessage( + consumedTopic, teamToken, { partition, @@ -223,7 +226,7 @@ describe('ingester', () => { describe('batch event processing', () => { it('should batch parse incoming events and batch them to reduce writes', async () => { - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)]) await ingester.handleEachBatch( [ createMessage('session_id_1', 1), @@ -279,7 +282,11 @@ describe('ingester', () => { const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(1, consumedTopic), + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) expect(getSessions(ingester)).toMatchObject([ @@ -291,12 +298,15 @@ describe('ingester', () => { // Call handleEachBatch with both consumers - we simulate the assignments which // is what is responsible for the actual syncing of the sessions - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await otherIngester.handleEachBatch( [createMessage('session_id_4', 2), createMessage('session_id_5', 2)], noop ) - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)]) await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop) // Should still have the partition 1 sessions that didnt move with added events @@ -317,8 +327,8 @@ describe('ingester', () => { // non-zero offset because the code can't commit offset 0 await ingester.handleEachBatch( [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }), ], noop ) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 5c805078b2495..85de272e7ae34 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -64,7 +64,7 @@ jest.mock('../../../../src/kafka/batch-consumer', () => { jest.setTimeout(1000) -describe('ingester', () => { +describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOverflow) => { let ingester: SessionRecordingIngester let hub: Hub @@ -74,6 +74,9 @@ describe('ingester', () => { let mockOffsets: Record = {} let mockCommittedOffsets: Record = {} let redisConn: Redis + const consumedTopic = consumeOverflow + ? 'session_recording_snapshot_item_overflow_test' + : 'session_recording_snapshot_item_events_test' beforeAll(async () => { mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) @@ -114,10 +117,10 @@ describe('ingester', () => { await deleteKeysWithPrefix(hub) - ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, redisConn) + ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, consumeOverflow, redisConn) await ingester.start() - mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)]) }) afterEach(async () => { @@ -132,7 +135,6 @@ describe('ingester', () => { rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) jest.useRealTimers() }) - const commitAllOffsets = async () => { // Simulate a background refresh for testing await ingester.commitAllOffsets(ingester.partitionMetrics, Object.values(ingester.sessions)) @@ -143,6 +145,7 @@ describe('ingester', () => { mockOffsets[partition]++ return createKafkaMessage( + consumedTopic, teamToken, { partition, @@ -168,7 +171,13 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + const ingester = new SessionRecordingIngester( + config, + hub.postgres, + hub.objectStorage, + consumeOverflow, + undefined + ) expect(ingester['debugPartition']).toEqual(103) }) @@ -177,7 +186,13 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + const ingester = new SessionRecordingIngester( + config, + hub.postgres, + hub.objectStorage, + consumeOverflow, + undefined + ) expect(ingester['debugPartition']).toBeUndefined() }) @@ -251,6 +266,7 @@ describe('ingester', () => { expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, offset: 2 + 1, partition: 1, }) @@ -266,6 +282,7 @@ describe('ingester', () => { expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 2, }) @@ -282,6 +299,7 @@ describe('ingester', () => { expect(mockConsumer.commit).toHaveBeenCalledTimes(2) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 2 + 1, }) @@ -308,6 +326,7 @@ describe('ingester', () => { await commitAllOffsets() expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 4 + 1, }) @@ -334,6 +353,7 @@ describe('ingester', () => { // We should commit the offset of the blocking session expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: ingester.sessions[`${team.id}-sid2`].getLowestOffset(), }) @@ -354,6 +374,7 @@ describe('ingester', () => { expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 2, }) @@ -366,6 +387,7 @@ describe('ingester', () => { expect(mockConsumer.commit).toHaveBeenCalledTimes(2) expect(mockConsumer.commit).toHaveBeenCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 3, }) @@ -381,9 +403,9 @@ describe('ingester', () => { describe('watermarkers', () => { const getSessionWaterMarks = (partition = 1) => - ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition)) + ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition, consumedTopic)) const getPersistentWaterMarks = (partition = 1) => - ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition)) + ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition, consumedTopic)) it('should update session watermarkers with flushing', async () => { await ingester.handleEachBatch( @@ -410,11 +432,17 @@ describe('ingester', () => { // all replay events should be watermarked up until the 3rd message as they HAVE been processed // whereas the commited kafka offset should be the 1st message as the 2nd message HAS not been processed - await expect(getPersistentWaterMarks()).resolves.toEqual({ - 'session-recordings-blob': 1, + const expectedWaterMarks = { session_replay_console_logs_events_ingester: 3, session_replay_events_ingester: 3, - }) + } + if (consumeOverflow) { + expectedWaterMarks['session-recordings-blob-overflow'] = 1 + } else { + expectedWaterMarks['session-recordings-blob'] = 1 + } + await expect(getPersistentWaterMarks()).resolves.toEqual(expectedWaterMarks) + // sid1 should be watermarked up until the 3rd message as it HAS been processed await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 3 }) }) @@ -448,7 +476,13 @@ describe('ingester', () => { jest.setTimeout(5000) // Increased to cover lock delay beforeEach(async () => { - otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + otherIngester = new SessionRecordingIngester( + config, + hub.postgres, + hub.objectStorage, + consumeOverflow, + undefined + ) await otherIngester.start() }) @@ -463,14 +497,20 @@ describe('ingester', () => { const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(1, consumedTopic), + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) expect( Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) ).toEqual(['1:session_id_1:1', '1:session_id_2:1', '2:session_id_3:1', '2:session_id_4:1']) - const rebalancePromises = [ingester.onRevokePartitions([createTP(2), createTP(3)])] + const rebalancePromises = [ + ingester.onRevokePartitions([createTP(2, consumedTopic), createTP(3, consumedTopic)]), + ] // Should immediately be removed from the tracked sessions expect( @@ -479,7 +519,10 @@ describe('ingester', () => { // Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit // Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)], noop) await Promise.all(rebalancePromises) @@ -509,7 +552,7 @@ describe('ingester', () => { expect.stringContaining(`${team.id}.sid3.`), // json ]) - const revokePromise = ingester.onRevokePartitions([createTP(1)]) + const revokePromise = ingester.onRevokePartitions([createTP(1, consumedTopic)]) expect(Object.keys(ingester.sessions)).toEqual([`${team.id}-sid3`]) @@ -524,6 +567,7 @@ describe('ingester', () => { expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, offset: 2 + 1, partition: 1, }) @@ -536,75 +580,90 @@ describe('ingester', () => { // non-zero offset because the code can't commit offset 0 await ingester.handleEachBatch( [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }), ], noop ) expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenCalledWith({ + topic: consumedTopic, offset: 14, partition: 1, - topic: 'session_recording_snapshot_item_events_test', }) }) }) - describe('overflow detection', () => { - const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { - const first_timestamp = Date.now() - 2 * timestamp_delta * count - - // Because messages from the same batch are reduced into a single one, we call handleEachBatch - // with individual messages to have better control on the message timestamp - for (let n = 0; n < count; n++) { - const message = createMessage('sid1', 1, { - size: size_bytes, - timestamp: first_timestamp + n * timestamp_delta, - }) - await ingester.handleEachBatch([message], noop) - } - } - - it('should not trigger overflow if under threshold', async () => { - await ingestBurst(10, 100, 10) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) - }) - - it('should trigger overflow during bursts', async () => { - const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds - await ingestBurst(10, 150_000, 10) - - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) - expect( - await redisConn.zrangebyscore( - CAPTURE_OVERFLOW_REDIS_KEY, - expected_expiration - 10, - expected_expiration + 10 - ) - ).toEqual([`${team.id}:sid1`]) - }) - - it('should not trigger overflow during backfills', async () => { - await ingestBurst(10, 150_000, 150_000) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) - }) - - it('should cleanup older entries when triggering', async () => { - await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 7000, 'expired:session') - await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 1000, 'not_expired:session') - expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ - 'expired:session', - 'not_expired:session', - ]) - - await ingestBurst(10, 150_000, 10) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) - expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ - 'not_expired:session', - `${team.id}:sid1`, - ]) - }) - }) + describe( + 'overflow detection', + consumeOverflow + ? () => {} // Skip these tests when running with consumeOverflow (it's disabled) + : () => { + const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { + const first_timestamp = Date.now() - 2 * timestamp_delta * count + + // Because messages from the same batch are reduced into a single one, we call handleEachBatch + // with individual messages to have better control on the message timestamp + for (let n = 0; n < count; n++) { + const message = createMessage('sid1', 1, { + size: size_bytes, + timestamp: first_timestamp + n * timestamp_delta, + }) + await ingester.handleEachBatch([message], noop) + } + } + + it('should not trigger overflow if under threshold', async () => { + await ingestBurst(10, 100, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + it('should trigger overflow during bursts', async () => { + const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds + await ingestBurst(10, 150_000, 10) + + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect( + await redisConn.zrangebyscore( + CAPTURE_OVERFLOW_REDIS_KEY, + expected_expiration - 10, + expected_expiration + 10 + ) + ).toEqual([`sid1`]) + }) + + it('should not trigger overflow during backfills', async () => { + await ingestBurst(10, 150_000, 150_000) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + it('should cleanup older entries when triggering', async () => { + await redisConn.zadd( + CAPTURE_OVERFLOW_REDIS_KEY, + 'NX', + Date.now() / 1000 - 7000, + 'expired:session' + ) + await redisConn.zadd( + CAPTURE_OVERFLOW_REDIS_KEY, + 'NX', + Date.now() / 1000 - 1000, + 'not_expired:session' + ) + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'expired:session', + 'not_expired:session', + ]) + + await ingestBurst(10, 150_000, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'not_expired:session', + `sid1`, + ]) + }) + } + ) describe('lag reporting', () => { it('should return the latest offsets', async () => {