diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index 1c581451e44ec..a9381b88d8766 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -3,11 +3,9 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node- import { Counter } from 'prom-client' import { KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics' -import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config' import { findOffsetsToCommit } from '../../../../kafka/consumer' import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling' -import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' -import { PluginsServerConfig } from '../../../../types' +import { flushProducer, produce } from '../../../../kafka/producer' import { status } from '../../../../utils/status' import { eventDroppedCounter } from '../../metrics' import { ConsoleLogEntry, gatherConsoleLogEvents, RRWebEventType } from '../process-event' @@ -42,10 +40,8 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons // TODO this is an almost exact duplicate of the replay events ingester // am going to leave this duplication and then collapse it when/if we add a performance events ingester export class ConsoleLogsIngester { - producer?: RdKafkaProducer - constructor( - private readonly serverConfig: PluginsServerConfig, + private readonly producer: RdKafkaProducer, private readonly persistentHighWaterMarker?: OffsetHighWaterMarker ) {} @@ -175,22 +171,4 @@ export class ConsoleLogsIngester { }) } } - - public async start(): Promise { - const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) - - const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig) - - this.producer = await createKafkaProducer(connectionConfig, producerConfig) - this.producer.connect() - } - - public async stop(): Promise { - status.info('🔁', '[console-log-events-ingester] stopping') - - if (this.producer && this.producer.isConnected()) { - status.info('🔁', '[console-log-events-ingester] disconnecting kafka producer in batchConsumer stop') - await disconnectProducer(this.producer) - } - } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index 029f28f20bb9a..669a8edc72a90 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -5,11 +5,9 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node- import { Counter } from 'prom-client' import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics' -import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config' import { findOffsetsToCommit } from '../../../../kafka/consumer' import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling' -import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' -import { PluginsServerConfig } from '../../../../types' +import { flushProducer, produce } from '../../../../kafka/producer' import { KafkaProducerWrapper } from '../../../../utils/db/kafka-producer-wrapper' import { status } from '../../../../utils/status' import { captureIngestionWarning } from '../../../../worker/ingestion/utils' @@ -26,10 +24,8 @@ const replayEventsCounter = new Counter({ }) export class ReplayEventsIngester { - producer?: RdKafkaProducer - constructor( - private readonly serverConfig: PluginsServerConfig, + private readonly producer: RdKafkaProducer, private readonly persistentHighWaterMarker?: OffsetHighWaterMarker ) {} @@ -180,19 +176,4 @@ export class ReplayEventsIngester { }) } } - public async start(): Promise { - const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) - const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig) - this.producer = await createKafkaProducer(connectionConfig, producerConfig) - this.producer.connect() - } - - public async stop(): Promise { - status.info('🔁', '[replay-events] stopping') - - if (this.producer && this.producer.isConnected()) { - status.info('🔁', '[replay-events] disconnecting kafka producer in batchConsumer stop') - await disconnectProducer(this.producer) - } - } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index b161a1990ec4b..e4ce7bd00d934 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -8,9 +8,11 @@ import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' +import { createKafkaProducer } from '../../../kafka/producer' import { PluginsServerConfig, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' +import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' @@ -81,6 +83,7 @@ export class SessionRecordingIngesterV3 { // if ingestion is lagging on a single partition it is often hard to identify _why_, // this allows us to output more information for that partition private debugPartition: number | undefined = undefined + private sharedClusterProducerWrapper: KafkaProducerWrapper | undefined constructor( private globalServerConfig: PluginsServerConfig, @@ -273,15 +276,22 @@ export class SessionRecordingIngesterV3 { // Load teams into memory await this.teamsRefresher.refresh() + // NOTE: This is the only place where we need to use the shared server config + const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.globalServerConfig) + const globalProducerConfig = createRdProducerConfigFromEnvVars(this.globalServerConfig) + + this.sharedClusterProducerWrapper = new KafkaProducerWrapper( + await createKafkaProducer(globalConnectionConfig, globalProducerConfig) + ) + this.sharedClusterProducerWrapper.producer.connect() + // NOTE: This is the only place where we need to use the shared server config if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) { - this.consoleLogsIngester = new ConsoleLogsIngester(this.globalServerConfig) - await this.consoleLogsIngester.start() + this.consoleLogsIngester = new ConsoleLogsIngester(this.sharedClusterProducerWrapper.producer) } if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) { - this.replayEventsIngester = new ReplayEventsIngester(this.globalServerConfig) - await this.replayEventsIngester.start() + this.replayEventsIngester = new ReplayEventsIngester(this.sharedClusterProducerWrapper.producer) } const connectionConfig = createRdConnectionConfigFromEnvVars(this.config) @@ -340,15 +350,12 @@ export class SessionRecordingIngesterV3 { ) ) - if (this.replayEventsIngester) { - void this.scheduleWork(this.replayEventsIngester.stop()) - } - if (this.consoleLogsIngester) { - void this.scheduleWork(this.consoleLogsIngester!.stop()) - } - const promiseResults = await Promise.allSettled(this.promises) + if (this.sharedClusterProducerWrapper) { + await this.sharedClusterProducerWrapper.disconnect() + } + status.info('👍', 'session-replay-ingestion - stopped!') return promiseResults diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 2e84d7826c002..8c37c1c19de08 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -8,9 +8,11 @@ import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' +import { createKafkaProducer } from '../../../kafka/producer' import { PluginsServerConfig, RedisPool, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' +import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' import { createRedisPool } from '../../../utils/utils' @@ -148,6 +150,8 @@ export class SessionRecordingIngester { // this allows us to output more information for that partition private debugPartition: number | undefined = undefined + private sharedClusterProducerWrapper: KafkaProducerWrapper | undefined = undefined + constructor( private globalServerConfig: PluginsServerConfig, private postgres: PostgresRouter, @@ -447,17 +451,26 @@ export class SessionRecordingIngester { await this.teamsRefresher.refresh() // NOTE: This is the only place where we need to use the shared server config + const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.globalServerConfig) + const globalProducerConfig = createRdProducerConfigFromEnvVars(this.globalServerConfig) + + this.sharedClusterProducerWrapper = new KafkaProducerWrapper( + await createKafkaProducer(globalConnectionConfig, globalProducerConfig) + ) + this.sharedClusterProducerWrapper.producer.connect() + if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) { - this.consoleLogsIngester = new ConsoleLogsIngester(this.globalServerConfig, this.persistentHighWaterMarker) - await this.consoleLogsIngester.start() + this.consoleLogsIngester = new ConsoleLogsIngester( + this.sharedClusterProducerWrapper.producer, + this.persistentHighWaterMarker + ) } if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) { this.replayEventsIngester = new ReplayEventsIngester( - this.globalServerConfig, + this.sharedClusterProducerWrapper.producer, this.persistentHighWaterMarker ) - await this.replayEventsIngester.start() } const connectionConfig = createRdConnectionConfigFromEnvVars(this.config) @@ -541,15 +554,12 @@ export class SessionRecordingIngester { void this.scheduleWork(this.onRevokePartitions(assignedPartitions)) void this.scheduleWork(this.realtimeManager.unsubscribe()) - if (this.replayEventsIngester) { - void this.scheduleWork(this.replayEventsIngester.stop()) - } - if (this.consoleLogsIngester) { - void this.scheduleWork(this.consoleLogsIngester.stop()) - } - const promiseResults = await Promise.allSettled(this.promises) + if (this.sharedClusterProducerWrapper) { + await this.sharedClusterProducerWrapper.disconnect() + } + // Finally we clear up redis once we are sure everything else has been handled await this.redisPool.drain() await this.redisPool.clear() diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts index 6698b40a8ca6a..8a62f36cb6aa3 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts @@ -1,11 +1,9 @@ import { HighLevelProducer } from 'node-rdkafka' -import { defaultConfig } from '../../../../../src/config/config' -import { createKafkaProducer, produce } from '../../../../../src/kafka/producer' +import { produce } from '../../../../../src/kafka/producer' import { ConsoleLogsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/console-logs-ingester' import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker' import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types' -import { PluginsServerConfig } from '../../../../../src/types' import { status } from '../../../../../src/utils/status' jest.mock('../../../../../src/utils/status') @@ -26,6 +24,7 @@ const makeIncomingMessage = ( topic: 'topic', timestamp: 0, consoleLogIngestionEnabled, + rawSize: 0, }, session_id: '', team_id: 0, @@ -37,17 +36,15 @@ describe('console log ingester', () => { let consoleLogIngester: ConsoleLogsIngester const mockProducer: jest.Mock = jest.fn() - beforeEach(async () => { + beforeEach(() => { mockProducer.mockClear() mockProducer['connect'] = jest.fn() - jest.mocked(createKafkaProducer).mockImplementation(() => - Promise.resolve(mockProducer as unknown as HighLevelProducer) - ) - const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker - consoleLogIngester = new ConsoleLogsIngester({ ...defaultConfig } as PluginsServerConfig, mockedHighWaterMarker) - await consoleLogIngester.start() + consoleLogIngester = new ConsoleLogsIngester( + mockProducer as unknown as HighLevelProducer, + mockedHighWaterMarker + ) }) describe('when enabled on team', () => { test('it truncates large console logs', async () => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts index af798504406e6..21418a7088231 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts @@ -1,12 +1,11 @@ import { DateTime } from 'luxon' import { HighLevelProducer } from 'node-rdkafka' -import { defaultConfig } from '../../../../../src/config/config' -import { createKafkaProducer, produce } from '../../../../../src/kafka/producer' +import { produce } from '../../../../../src/kafka/producer' import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker' import { ReplayEventsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/replay-events-ingester' import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types' -import { PluginsServerConfig, TimestampFormat } from '../../../../../src/types' +import { TimestampFormat } from '../../../../../src/types' import { status } from '../../../../../src/utils/status' import { castTimestampOrNow } from '../../../../../src/utils/utils' @@ -25,6 +24,7 @@ const makeIncomingMessage = (source: string | null, timestamp: number): Incoming topic: 'topic', timestamp: timestamp, consoleLogIngestionEnabled: true, + rawSize: 0, }, session_id: '', team_id: 0, @@ -36,17 +36,12 @@ describe('replay events ingester', () => { let ingester: ReplayEventsIngester const mockProducer: jest.Mock = jest.fn() - beforeEach(async () => { + beforeEach(() => { mockProducer.mockClear() mockProducer['connect'] = jest.fn() - jest.mocked(createKafkaProducer).mockImplementation(() => - Promise.resolve(mockProducer as unknown as HighLevelProducer) - ) - const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker - ingester = new ReplayEventsIngester({ ...defaultConfig } as PluginsServerConfig, mockedHighWaterMarker) - await ingester.start() + ingester = new ReplayEventsIngester(mockProducer as unknown as HighLevelProducer, mockedHighWaterMarker) }) test('does not ingest messages from a month in the future', async () => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts index d0c5d66ff8ec1..efa50efb4833c 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -126,7 +126,6 @@ describe('ingester', () => { afterEach(async () => { jest.setTimeout(10000) await deleteKeysWithPrefix(hub) - await ingester.stop() await closeHub() }) @@ -151,203 +150,210 @@ describe('ingester', () => { ) } - it('can parse debug partition config', () => { - const config = { - SESSION_RECORDING_DEBUG_PARTITION: '103', - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig + // disconnecting a producer is not safe to call multiple times + // in order to let us test stopping the ingester elsewhere + // in most tests we automatically stop the ingester during teardown + describe('when ingester.stop is called in teardown', () => { + afterEach(async () => { + await ingester.stop() + }) - const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toEqual(103) - }) + it('can parse debug partition config', () => { + const config = { + SESSION_RECORDING_DEBUG_PARTITION: '103', + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig - it('can parse absence of debug partition config', () => { - const config = { - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig + const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toEqual(103) + }) - const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toBeUndefined() - }) + it('can parse absence of debug partition config', () => { + const config = { + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig - it('creates a new session manager if needed', async () => { - const event = createIncomingRecordingMessage() - await ingester.consume(event) - await waitForExpect(() => { - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1__session_id_1']).toBeDefined() + const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toBeUndefined() }) - }) - it('handles multiple incoming sessions', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage({ - session_id: 'session_id_2', + it('creates a new session manager if needed', async () => { + const event = createIncomingRecordingMessage() + await ingester.consume(event) + await waitForExpect(() => { + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['1__session_id_1']).toBeDefined() + }) }) - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(2) - expect(ingester.sessions['1__session_id_1']).toBeDefined() - expect(ingester.sessions['1__session_id_2']).toBeDefined() - }) - it('handles parallel ingestion of the same session', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage() - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1__session_id_1']).toBeDefined() - }) + it('handles multiple incoming sessions', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage({ + session_id: 'session_id_2', + }) + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(Object.keys(ingester.sessions).length).toBe(2) + expect(ingester.sessions['1__session_id_1']).toBeDefined() + expect(ingester.sessions['1__session_id_2']).toBeDefined() + }) - it('destroys a session manager if finished', async () => { - const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` - const event = createIncomingRecordingMessage({ - session_id: sessionId, + it('handles parallel ingestion of the same session', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage() + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['1__session_id_1']).toBeDefined() }) - await ingester.consume(event) - expect(ingester.sessions[`1__${sessionId}`]).toBeDefined() - ingester.sessions[`1__${sessionId}`].buffer!.createdAt = 0 - await ingester.flushAllReadySessions(() => undefined) + it('destroys a session manager if finished', async () => { + const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` + const event = createIncomingRecordingMessage({ + session_id: sessionId, + }) + await ingester.consume(event) + expect(ingester.sessions[`1__${sessionId}`]).toBeDefined() + ingester.sessions[`1__${sessionId}`].buffer!.createdAt = 0 - await waitForExpect(() => { - expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined() - }, 10000) - }) + await ingester.flushAllReadySessions(() => undefined) - describe('batch event processing', () => { - it('should batch parse incoming events and batch them to reduce writes', async () => { - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) - await ingester.handleEachBatch( - [ - createMessage('session_id_1', 1), - createMessage('session_id_1', 1), - createMessage('session_id_1', 1), - createMessage('session_id_2', 1), - ], - noop - ) - - expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.count).toBe(1) - expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.count).toBe(1) - - let fileContents = await fs.readFile( - path.join(ingester.sessions[`${team.id}__session_id_1`].context.dir, 'buffer.jsonl'), - 'utf-8' - ) - - expect(JSON.parse(fileContents).data).toHaveLength(3) - - fileContents = await fs.readFile( - path.join(ingester.sessions[`${team.id}__session_id_2`].context.dir, 'buffer.jsonl'), - 'utf-8' - ) - - expect(JSON.parse(fileContents).data).toHaveLength(1) + await waitForExpect(() => { + expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined() + }, 10000) }) - }) - - describe('simulated rebalancing', () => { - let otherIngester: SessionRecordingIngesterV3 - jest.setTimeout(5000) // Increased to cover lock delay - beforeEach(async () => { - otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - await otherIngester.start() + describe('batch event processing', () => { + it('should batch parse incoming events and batch them to reduce writes', async () => { + mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + await ingester.handleEachBatch( + [ + createMessage('session_id_1', 1), + createMessage('session_id_1', 1), + createMessage('session_id_1', 1), + createMessage('session_id_2', 1), + ], + noop + ) + + expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.count).toBe(1) + expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.count).toBe(1) + + let fileContents = await fs.readFile( + path.join(ingester.sessions[`${team.id}__session_id_1`].context.dir, 'buffer.jsonl'), + 'utf-8' + ) + + expect(JSON.parse(fileContents).data).toHaveLength(3) + + fileContents = await fs.readFile( + path.join(ingester.sessions[`${team.id}__session_id_2`].context.dir, 'buffer.jsonl'), + 'utf-8' + ) + + expect(JSON.parse(fileContents).data).toHaveLength(1) + }) }) - afterEach(async () => { - await otherIngester.stop() + describe('simulated rebalancing', () => { + let otherIngester: SessionRecordingIngesterV3 + jest.setTimeout(5000) // Increased to cover lock delay + + beforeEach(async () => { + otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + await otherIngester.start() + }) + + afterEach(async () => { + await otherIngester.stop() + }) + + const getSessions = ( + ingester: SessionRecordingIngesterV3 + ): (SessionManagerContext & SessionManagerBufferContext)[] => + Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer! })) + + /** + * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs + * Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up + */ + it('rebalances new consumers', async () => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] + + mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) + + expect(getSessions(ingester)).toMatchObject([ + { sessionId: 'session_id_1', partition: 1, count: 1 }, + { sessionId: 'session_id_2', partition: 1, count: 1 }, + { sessionId: 'session_id_3', partition: 2, count: 1 }, + { sessionId: 'session_id_4', partition: 2, count: 1 }, + ]) + + // Call handleEachBatch with both consumers - we simulate the assignments which + // is what is responsible for the actual syncing of the sessions + mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + await otherIngester.handleEachBatch( + [createMessage('session_id_4', 2), createMessage('session_id_5', 2)], + noop + ) + mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop) + + // Should still have the partition 1 sessions that didnt move with added events + expect(getSessions(ingester)).toMatchObject([ + { sessionId: 'session_id_1', partition: 1, count: 2 }, + { sessionId: 'session_id_2', partition: 1, count: 1 }, + ]) + expect(getSessions(otherIngester)).toMatchObject([ + { sessionId: 'session_id_3', partition: 2, count: 1 }, + { sessionId: 'session_id_4', partition: 2, count: 2 }, + { sessionId: 'session_id_5', partition: 2, count: 1 }, + ]) + }) }) - const getSessions = ( - ingester: SessionRecordingIngesterV3 - ): (SessionManagerContext & SessionManagerBufferContext)[] => - Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer! })) - - /** - * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs - * Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up - */ - it('rebalances new consumers', async () => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) - await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) - - expect(getSessions(ingester)).toMatchObject([ - { sessionId: 'session_id_1', partition: 1, count: 1 }, - { sessionId: 'session_id_2', partition: 1, count: 1 }, - { sessionId: 'session_id_3', partition: 2, count: 1 }, - { sessionId: 'session_id_4', partition: 2, count: 1 }, - ]) - - // Call handleEachBatch with both consumers - we simulate the assignments which - // is what is responsible for the actual syncing of the sessions - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) - await otherIngester.handleEachBatch( - [createMessage('session_id_4', 2), createMessage('session_id_5', 2)], - noop - ) - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) - await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop) - - // Should still have the partition 1 sessions that didnt move with added events - expect(getSessions(ingester)).toMatchObject([ - { sessionId: 'session_id_1', partition: 1, count: 2 }, - { sessionId: 'session_id_2', partition: 1, count: 1 }, - ]) - expect(getSessions(otherIngester)).toMatchObject([ - { sessionId: 'session_id_3', partition: 2, count: 1 }, - { sessionId: 'session_id_4', partition: 2, count: 2 }, - { sessionId: 'session_id_5', partition: 2, count: 1 }, - ]) + describe('when a team is disabled', () => { + it('ignores invalid teams', async () => { + // non-zero offset because the code can't commit offset 0 + await ingester.handleEachBatch( + [ + createKafkaMessage('invalid_token', { offset: 12 }), + createKafkaMessage('invalid_token', { offset: 13 }), + ], + noop + ) + + expect(ingester.sessions).toEqual({}) + }) }) - }) - describe('stop()', () => { - const setup = async (): Promise => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, noop) - } + describe('heartbeats', () => { + it('it should send them whilst processing', async () => { + const heartbeat = jest.fn() + // non-zero offset because the code can't commit offset 0 + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, heartbeat) - it('shuts down without error', async () => { - await setup() - - await expect(ingester.stop()).resolves.toMatchObject([ - // destroy sessions, - { status: 'fulfilled' }, - // stop replay ingester - { status: 'fulfilled' }, - // stop console ingester - { status: 'fulfilled' }, - ]) + expect(heartbeat).toBeCalledTimes(5) + }) }) }) - describe('when a team is disabled', () => { - it('ignores invalid teams', async () => { - // non-zero offset because the code can't commit offset 0 - await ingester.handleEachBatch( - [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), - ], - noop - ) - - expect(ingester.sessions).toEqual({}) - }) - }) + describe('when ingester.stop is not called in teardown', () => { + describe('stop()', () => { + const setup = async (): Promise => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, noop) + } - describe('heartbeats', () => { - it('it should send them whilst processing', async () => { - const heartbeat = jest.fn() - // non-zero offset because the code can't commit offset 0 - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, heartbeat) + it('shuts down without error', async () => { + await setup() - expect(heartbeat).toBeCalledTimes(5) + await expect(ingester.stop()).resolves.toMatchObject([ + // destroy sessions, + { status: 'fulfilled' }, + ]) + }) }) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 730fe28f481ac..5c805078b2495 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -125,7 +125,6 @@ describe('ingester', () => { await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY) await hub.redisPool.release(redisConn) await deleteKeysWithPrefix(hub) - await ingester.stop() await closeHub() }) @@ -156,503 +155,510 @@ describe('ingester', () => { ) } - it('can parse debug partition config', () => { - const config = { - SESSION_RECORDING_DEBUG_PARTITION: '103', - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig - - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) - expect(ingester['debugPartition']).toEqual(103) - }) - - it('can parse absence of debug partition config', () => { - const config = { - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig + // disconnecting a producer is not safe to call multiple times + // in order to let us test stopping the ingester elsewhere + // in most tests we automatically stop the ingester during teardown + describe('when ingester.stop is called in teardown', () => { + afterEach(async () => { + await ingester.stop() + }) + it('can parse debug partition config', () => { + const config = { + SESSION_RECORDING_DEBUG_PARTITION: '103', + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig + + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + expect(ingester['debugPartition']).toEqual(103) + }) - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) - expect(ingester['debugPartition']).toBeUndefined() - }) + it('can parse absence of debug partition config', () => { + const config = { + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig - it('creates a new session manager if needed', async () => { - const event = createIncomingRecordingMessage() - await ingester.consume(event) - await waitForExpect(() => { - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1-session_id_1']).toBeDefined() + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + expect(ingester['debugPartition']).toBeUndefined() }) - }) - it('removes sessions on destroy', async () => { - await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_1' })) - await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_2' })) + it('creates a new session manager if needed', async () => { + const event = createIncomingRecordingMessage() + await ingester.consume(event) + await waitForExpect(() => { + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['1-session_id_1']).toBeDefined() + }) + }) - expect(Object.keys(ingester.sessions).length).toBe(2) - expect(ingester.sessions['2-session_id_1']).toBeDefined() - expect(ingester.sessions['2-session_id_2']).toBeDefined() + it('removes sessions on destroy', async () => { + await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_1' })) + await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_2' })) - await ingester.destroySessions([['2-session_id_1', ingester.sessions['2-session_id_1']]]) + expect(Object.keys(ingester.sessions).length).toBe(2) + expect(ingester.sessions['2-session_id_1']).toBeDefined() + expect(ingester.sessions['2-session_id_2']).toBeDefined() - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['2-session_id_2']).toBeDefined() - }) + await ingester.destroySessions([['2-session_id_1', ingester.sessions['2-session_id_1']]]) - it('handles multiple incoming sessions', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage({ - session_id: 'session_id_2', + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['2-session_id_2']).toBeDefined() }) - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(2) - expect(ingester.sessions['1-session_id_1']).toBeDefined() - expect(ingester.sessions['1-session_id_2']).toBeDefined() - }) - // This test is flaky and no-one has time to look into it https://posthog.slack.com/archives/C0460HY55M0/p1696437876690329 - it.skip('destroys a session manager if finished', async () => { - const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` - const event = createIncomingRecordingMessage({ - session_id: sessionId, + it('handles multiple incoming sessions', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage({ + session_id: 'session_id_2', + }) + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(Object.keys(ingester.sessions).length).toBe(2) + expect(ingester.sessions['1-session_id_1']).toBeDefined() + expect(ingester.sessions['1-session_id_2']).toBeDefined() }) - await ingester.consume(event) - expect(ingester.sessions[`1-${sessionId}`]).toBeDefined() - // Force the flush - ingester.partitionMetrics[event.metadata.partition] = { - lastMessageTimestamp: Date.now() + defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS, - } - - await ingester.flushAllReadySessions(noop) - - await waitForExpect(() => { - expect(ingester.sessions[`1-${sessionId}`]).not.toBeDefined() - }, 10000) - }) - describe('offset committing', () => { - it('should commit offsets in simple cases', async () => { - await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')], noop) - expect(ingester.partitionMetrics[1]).toMatchObject({ - lastMessageOffset: 2, + // This test is flaky and no-one has time to look into it https://posthog.slack.com/archives/C0460HY55M0/p1696437876690329 + it.skip('destroys a session manager if finished', async () => { + const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` + const event = createIncomingRecordingMessage({ + session_id: sessionId, }) + await ingester.consume(event) + expect(ingester.sessions[`1-${sessionId}`]).toBeDefined() + // Force the flush + ingester.partitionMetrics[event.metadata.partition] = { + lastMessageTimestamp: Date.now() + defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS, + } - await commitAllOffsets() - // Doesn't flush if we have a blocking session - expect(mockConsumer.commit).toHaveBeenCalledTimes(0) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() + await ingester.flushAllReadySessions(noop) - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - offset: 2 + 1, - partition: 1, - }) - ) + await waitForExpect(() => { + expect(ingester.sessions[`1-${sessionId}`]).not.toBeDefined() + }, 10000) }) - it.skip('should commit higher values but not lower', async () => { - await ingester.handleEachBatch([createMessage('sid1')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - expect(ingester.partitionMetrics[1].lastMessageOffset).toBe(1) - await commitAllOffsets() - - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 2, + describe('offset committing', () => { + it('should commit offsets in simple cases', async () => { + await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')], noop) + expect(ingester.partitionMetrics[1]).toMatchObject({ + lastMessageOffset: 2, }) - ) - // Repeat commit doesn't do anything - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + await commitAllOffsets() + // Doesn't flush if we have a blocking session + expect(mockConsumer.commit).toHaveBeenCalledTimes(0) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + offset: 2 + 1, + partition: 1, + }) + ) + }) - await ingester.handleEachBatch([createMessage('sid1')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() + it.skip('should commit higher values but not lower', async () => { + await ingester.handleEachBatch([createMessage('sid1')], noop) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + expect(ingester.partitionMetrics[1].lastMessageOffset).toBe(1) + await commitAllOffsets() + + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2, + }) + ) - expect(mockConsumer.commit).toHaveBeenCalledTimes(2) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 2 + 1, - }) - ) - }) + // Repeat commit doesn't do anything + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - it('should commit the lowest known offset if there is a blocking session', async () => { - await ingester.handleEachBatch( - [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], - noop - ) - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() + await ingester.handleEachBatch([createMessage('sid1')], noop) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() - expect(ingester.partitionMetrics[1]).toMatchObject({ - lastMessageOffset: 4, + expect(mockConsumer.commit).toHaveBeenCalledTimes(2) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2 + 1, + }) + ) }) - // No offsets are below the blocking one - expect(mockConsumer.commit).not.toHaveBeenCalled() - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + it('should commit the lowest known offset if there is a blocking session', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], + noop + ) + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() - // Subsequent commit will commit the last known offset - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 4 + 1, + expect(ingester.partitionMetrics[1]).toMatchObject({ + lastMessageOffset: 4, }) - ) - }) - it('should commit one lower than the blocking session if that is the highest', async () => { - await ingester.handleEachBatch( - [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], - noop - ) - // Flush the second session so the first one is still blocking - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() - - // No offsets are below the blocking one - expect(mockConsumer.commit).not.toHaveBeenCalled() - - // Add a new message and session and flush the old one - await ingester.handleEachBatch([createMessage('sid2')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() - - // We should commit the offset of the blocking session - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: ingester.sessions[`${team.id}-sid2`].getLowestOffset(), - }) - ) + // No offsets are below the blocking one + expect(mockConsumer.commit).not.toHaveBeenCalled() + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + + // Subsequent commit will commit the last known offset + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 4 + 1, + }) + ) + }) + + it('should commit one lower than the blocking session if that is the highest', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], + noop + ) + // Flush the second session so the first one is still blocking + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + + // No offsets are below the blocking one + expect(mockConsumer.commit).not.toHaveBeenCalled() + + // Add a new message and session and flush the old one + await ingester.handleEachBatch([createMessage('sid2')], noop) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + + // We should commit the offset of the blocking session + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: ingester.sessions[`${team.id}-sid2`].getLowestOffset(), + }) + ) + }) + + it.skip('should not be affected by other partitions ', async () => { + await ingester.handleEachBatch( + [createMessage('sid1', 1), createMessage('sid2', 2), createMessage('sid2', 2)], + noop + ) + + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await ingester.handleEachBatch([createMessage('sid1', 1)], noop) + + // We should now have a blocking session on partition 1 and 2 with partition 1 being committable + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2, + }) + ) + + mockConsumer.commit.mockReset() + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(2) + expect(mockConsumer.commit).toHaveBeenCalledWith( + expect.objectContaining({ + partition: 1, + offset: 3, + }) + ) + expect(mockConsumer.commit).toHaveBeenCalledWith( + expect.objectContaining({ + partition: 2, + offset: 3, + }) + ) + }) }) - it.skip('should not be affected by other partitions ', async () => { - await ingester.handleEachBatch( - [createMessage('sid1', 1), createMessage('sid2', 2), createMessage('sid2', 2)], - noop - ) + describe('watermarkers', () => { + const getSessionWaterMarks = (partition = 1) => + ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition)) + const getPersistentWaterMarks = (partition = 1) => + ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition)) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await ingester.handleEachBatch([createMessage('sid1', 1)], noop) + it('should update session watermarkers with flushing', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid3')], + noop + ) + await expect(getSessionWaterMarks()).resolves.toEqual({}) - // We should now have a blocking session on partition 1 and 2 with partition 1 being committable - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 2, + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1 }) + await ingester.sessions[`${team.id}-sid3`].flush('buffer_age') + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1, sid2: 2, sid3: 3 }) + }) + + it('should update partition watermarkers when committing', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid1')], + noop + ) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + + // all replay events should be watermarked up until the 3rd message as they HAVE been processed + // whereas the commited kafka offset should be the 1st message as the 2nd message HAS not been processed + await expect(getPersistentWaterMarks()).resolves.toEqual({ + 'session-recordings-blob': 1, + session_replay_console_logs_events_ingester: 3, + session_replay_events_ingester: 3, }) - ) - - mockConsumer.commit.mockReset() - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(2) - expect(mockConsumer.commit).toHaveBeenCalledWith( - expect.objectContaining({ - partition: 1, - offset: 3, + // sid1 should be watermarked up until the 3rd message as it HAS been processed + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 3 }) + }) + + it('should drop events that are higher than the watermarks', async () => { + const events = [createMessage('sid1'), createMessage('sid2'), createMessage('sid2')] + + await expect(getPersistentWaterMarks()).resolves.toEqual({}) + await ingester.handleEachBatch([events[0], events[1]], noop) + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + expect(mockConsumer.commit).not.toHaveBeenCalled() + await expect(getPersistentWaterMarks()).resolves.toEqual({ + session_replay_console_logs_events_ingester: 2, + session_replay_events_ingester: 2, }) - ) - expect(mockConsumer.commit).toHaveBeenCalledWith( - expect.objectContaining({ - partition: 2, - offset: 3, + await expect(getSessionWaterMarks()).resolves.toEqual({ + sid2: 2, // only processed the second message so far }) - ) - }) - }) - - describe('watermarkers', () => { - const getSessionWaterMarks = (partition = 1) => - ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition)) - const getPersistentWaterMarks = (partition = 1) => - ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition)) - - it('should update session watermarkers with flushing', async () => { - await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid3')], noop) - await expect(getSessionWaterMarks()).resolves.toEqual({}) - - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1 }) - await ingester.sessions[`${team.id}-sid3`].flush('buffer_age') - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1, sid2: 2, sid3: 3 }) - }) - it('should update partition watermarkers when committing', async () => { - await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid1')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - - // all replay events should be watermarked up until the 3rd message as they HAVE been processed - // whereas the commited kafka offset should be the 1st message as the 2nd message HAS not been processed - await expect(getPersistentWaterMarks()).resolves.toEqual({ - 'session-recordings-blob': 1, - session_replay_console_logs_events_ingester: 3, - session_replay_events_ingester: 3, + // Simulate a re-processing + await ingester.destroySessions(Object.entries(ingester.sessions)) + await ingester.handleEachBatch(events, noop) + expect(ingester.sessions[`${team.id}-sid2`].buffer.count).toBe(1) + expect(ingester.sessions[`${team.id}-sid1`].buffer.count).toBe(1) }) - // sid1 should be watermarked up until the 3rd message as it HAS been processed - await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 3 }) }) - it('should drop events that are higher than the watermarks', async () => { - const events = [createMessage('sid1'), createMessage('sid2'), createMessage('sid2')] - - await expect(getPersistentWaterMarks()).resolves.toEqual({}) - await ingester.handleEachBatch([events[0], events[1]], noop) - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() - expect(mockConsumer.commit).not.toHaveBeenCalled() - await expect(getPersistentWaterMarks()).resolves.toEqual({ - session_replay_console_logs_events_ingester: 2, - session_replay_events_ingester: 2, - }) - await expect(getSessionWaterMarks()).resolves.toEqual({ - sid2: 2, // only processed the second message so far - }) + describe('simulated rebalancing', () => { + let otherIngester: SessionRecordingIngester + jest.setTimeout(5000) // Increased to cover lock delay - // Simulate a re-processing - await ingester.destroySessions(Object.entries(ingester.sessions)) - await ingester.handleEachBatch(events, noop) - expect(ingester.sessions[`${team.id}-sid2`].buffer.count).toBe(1) - expect(ingester.sessions[`${team.id}-sid1`].buffer.count).toBe(1) - }) - }) + beforeEach(async () => { + otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + await otherIngester.start() + }) - describe('simulated rebalancing', () => { - let otherIngester: SessionRecordingIngester - jest.setTimeout(5000) // Increased to cover lock delay + afterEach(async () => { + await otherIngester.stop() + }) - beforeEach(async () => { - otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) - await otherIngester.start() - }) + /** + * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs + */ + it('rebalances new consumers', async () => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] + + mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) + + expect( + Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['1:session_id_1:1', '1:session_id_2:1', '2:session_id_3:1', '2:session_id_4:1']) + + const rebalancePromises = [ingester.onRevokePartitions([createTP(2), createTP(3)])] + + // Should immediately be removed from the tracked sessions + expect( + Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) + + // Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit + // Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke + mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)], noop) + await Promise.all(rebalancePromises) + + // Should still have the partition 1 sessions that didnt move + expect( + Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) + + // Should have session_id_4 but not session_id_3 as it was flushed + expect( + Object.values(otherIngester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['2:session_id_3:1', '2:session_id_4:1']) + }) - afterEach(async () => { - await otherIngester.stop() - }) + it("flushes and commits as it's revoked", async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid3', 2)], + noop + ) - /** - * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs - */ - it('rebalances new consumers', async () => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) - await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) - - expect( - Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['1:session_id_1:1', '1:session_id_2:1', '2:session_id_3:1', '2:session_id_4:1']) - - const rebalancePromises = [ingester.onRevokePartitions([createTP(2), createTP(3)])] - - // Should immediately be removed from the tracked sessions - expect( - Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) - - // Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit - // Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) - await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)], noop) - await Promise.all(rebalancePromises) - - // Should still have the partition 1 sessions that didnt move - expect( - Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) - - // Should have session_id_4 but not session_id_3 as it was flushed - expect( - Object.values(otherIngester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['2:session_id_3:1', '2:session_id_4:1']) + expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ + expect.stringContaining(`${team.id}.sid1.`), // gz + expect.stringContaining(`${team.id}.sid1.`), // json + expect.stringContaining(`${team.id}.sid2.`), // gz + expect.stringContaining(`${team.id}.sid2.`), // json + expect.stringContaining(`${team.id}.sid3.`), // gz + expect.stringContaining(`${team.id}.sid3.`), // json + ]) + + const revokePromise = ingester.onRevokePartitions([createTP(1)]) + + expect(Object.keys(ingester.sessions)).toEqual([`${team.id}-sid3`]) + + await revokePromise + + // Only files left on the system should be the sid3 ones + expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ + expect.stringContaining(`${team.id}.sid3.`), // gz + expect.stringContaining(`${team.id}.sid3.`), // json + ]) + + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + offset: 2 + 1, + partition: 1, + }) + ) + }) }) - it("flushes and commits as it's revoked", async () => { - await ingester.handleEachBatch( - [createMessage('sid1'), createMessage('sid2'), createMessage('sid3', 2)], - noop - ) - - expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ - expect.stringContaining(`${team.id}.sid1.`), // gz - expect.stringContaining(`${team.id}.sid1.`), // json - expect.stringContaining(`${team.id}.sid2.`), // gz - expect.stringContaining(`${team.id}.sid2.`), // json - expect.stringContaining(`${team.id}.sid3.`), // gz - expect.stringContaining(`${team.id}.sid3.`), // json - ]) - - const revokePromise = ingester.onRevokePartitions([createTP(1)]) - - expect(Object.keys(ingester.sessions)).toEqual([`${team.id}-sid3`]) - - await revokePromise - - // Only files left on the system should be the sid3 ones - expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ - expect.stringContaining(`${team.id}.sid3.`), // gz - expect.stringContaining(`${team.id}.sid3.`), // json - ]) - - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - offset: 2 + 1, + describe('when a team is disabled', () => { + it('can commit even if an entire batch is disabled', async () => { + // non-zero offset because the code can't commit offset 0 + await ingester.handleEachBatch( + [ + createKafkaMessage('invalid_token', { offset: 12 }), + createKafkaMessage('invalid_token', { offset: 13 }), + ], + noop + ) + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenCalledWith({ + offset: 14, partition: 1, + topic: 'session_recording_snapshot_item_events_test', }) - ) + }) }) - }) - describe('stop()', () => { - const setup = async (): Promise => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, noop) - } - - // NOTE: This test is a sanity check for the follow up test. It demonstrates what happens if we shutdown in the wrong order - // It doesn't reliably work though as the onRevoke is called via the kafka lib ending up with dangling promises so rather it is here as a reminder - // demonstation for when we need it - it.skip('shuts down with error if redis forcefully shutdown', async () => { - await setup() - - await ingester.redisPool.drain() - await ingester.redisPool.clear() - - // revoke, realtime unsub, replay stop - await expect(ingester.stop()).resolves.toMatchObject([ - { status: 'rejected' }, - { status: 'fulfilled' }, - { status: 'fulfilled' }, - ]) - }) - it('shuts down without error', async () => { - await setup() - - // revoke, realtime unsub, replay stop, console ingestion stop - await expect(ingester.stop()).resolves.toMatchObject([ - { status: 'fulfilled' }, - { status: 'fulfilled' }, - { status: 'fulfilled' }, - { status: 'fulfilled' }, - ]) - }) - }) + describe('overflow detection', () => { + const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { + const first_timestamp = Date.now() - 2 * timestamp_delta * count + + // Because messages from the same batch are reduced into a single one, we call handleEachBatch + // with individual messages to have better control on the message timestamp + for (let n = 0; n < count; n++) { + const message = createMessage('sid1', 1, { + size: size_bytes, + timestamp: first_timestamp + n * timestamp_delta, + }) + await ingester.handleEachBatch([message], noop) + } + } - describe('when a team is disabled', () => { - it('can commit even if an entire batch is disabled', async () => { - // non-zero offset because the code can't commit offset 0 - await ingester.handleEachBatch( - [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), - ], - noop - ) - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenCalledWith({ - offset: 14, - partition: 1, - topic: 'session_recording_snapshot_item_events_test', + it('should not trigger overflow if under threshold', async () => { + await ingestBurst(10, 100, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) }) - }) - }) - describe('overflow detection', () => { - const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { - const first_timestamp = Date.now() - 2 * timestamp_delta * count + it('should trigger overflow during bursts', async () => { + const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds + await ingestBurst(10, 150_000, 10) + + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect( + await redisConn.zrangebyscore( + CAPTURE_OVERFLOW_REDIS_KEY, + expected_expiration - 10, + expected_expiration + 10 + ) + ).toEqual([`${team.id}:sid1`]) + }) - // Because messages from the same batch are reduced into a single one, we call handleEachBatch - // with individual messages to have better control on the message timestamp - for (let n = 0; n < count; n++) { - const message = createMessage('sid1', 1, { - size: size_bytes, - timestamp: first_timestamp + n * timestamp_delta, - }) - await ingester.handleEachBatch([message], noop) - } - } + it('should not trigger overflow during backfills', async () => { + await ingestBurst(10, 150_000, 150_000) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) - it('should not trigger overflow if under threshold', async () => { - await ingestBurst(10, 100, 10) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + it('should cleanup older entries when triggering', async () => { + await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 7000, 'expired:session') + await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 1000, 'not_expired:session') + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'expired:session', + 'not_expired:session', + ]) + + await ingestBurst(10, 150_000, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'not_expired:session', + `${team.id}:sid1`, + ]) + }) }) - it('should trigger overflow during bursts', async () => { - const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds - await ingestBurst(10, 150_000, 10) + describe('lag reporting', () => { + it('should return the latest offsets', async () => { + mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { + cb(null, { highOffset: 1000 + partition, lowOffset: 0 }) + }) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) - expect( - await redisConn.zrangebyscore( - CAPTURE_OVERFLOW_REDIS_KEY, - expected_expiration - 10, - expected_expiration + 10 - ) - ).toEqual([`${team.id}:sid1`]) - }) + const results = await ingester.latestOffsetsRefresher.get() - it('should not trigger overflow during backfills', async () => { - await ingestBurst(10, 150_000, 150_000) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + expect(results).toEqual({ + 0: 1000, + 1: 1001, + }) + }) }) - it('should cleanup older entries when triggering', async () => { - await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 7000, 'expired:session') - await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 1000, 'not_expired:session') - expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ - 'expired:session', - 'not_expired:session', - ]) - - await ingestBurst(10, 150_000, 10) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) - expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ - 'not_expired:session', - `${team.id}:sid1`, - ]) + describe('heartbeats', () => { + it('it should send them whilst processing', async () => { + const heartbeat = jest.fn() + // non-zero offset because the code can't commit offset 0 + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, heartbeat) + + // NOTE: the number here can change as we change the code. Important is that it is called a number of times + expect(heartbeat).toBeCalledTimes(7) + }) }) }) - describe('lag reporting', () => { - it('should return the latest offsets', async () => { - mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { - cb(null, { highOffset: 1000 + partition, lowOffset: 0 }) - }) + describe('when ingester.stop is called in teardown', () => { + describe('stop()', () => { + const setup = async (): Promise => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, noop) + } - const results = await ingester.latestOffsetsRefresher.get() + // NOTE: This test is a sanity check for the follow up test. It demonstrates what happens if we shutdown in the wrong order + // It doesn't reliably work though as the onRevoke is called via the kafka lib ending up with dangling promises so rather it is here as a reminder + // demonstation for when we need it + it.skip('shuts down with error if redis forcefully shutdown', async () => { + await setup() - expect(results).toEqual({ - 0: 1000, - 1: 1001, - }) - }) - }) + await ingester.redisPool.drain() + await ingester.redisPool.clear() - describe('heartbeats', () => { - it('it should send them whilst processing', async () => { - const heartbeat = jest.fn() - // non-zero offset because the code can't commit offset 0 - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, heartbeat) + // revoke, realtime unsub, replay stop + await expect(ingester.stop()).resolves.toMatchObject([{ status: 'rejected' }, { status: 'fulfilled' }]) + }) + it('shuts down without error', async () => { + await setup() - // NOTE: the number here can change as we change the code. Important is that it is called a number of times - expect(heartbeat).toBeCalledTimes(7) + // revoke, realtime unsub, replay stop, console ingestion stop + await expect(ingester.stop()).resolves.toMatchObject([{ status: 'fulfilled' }, { status: 'fulfilled' }]) + }) }) }) })