From 993eba9a8859f43a5a0e9babf84bd674c45e5e80 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 21:12:16 +0000 Subject: [PATCH 01/21] feat: ingestion warning on old version --- .../session-recordings-consumer.ts | 32 +++++++++---- .../session-recording/utils.ts | 48 ++++++++++++++++++- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 30aaab4a023d5..d702bc437729c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -1,13 +1,22 @@ import { captureException } from '@sentry/node' import crypto from 'crypto' import { mkdirSync, rmSync } from 'node:fs' -import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' +import { + CODES, + features, + HighLevelProducer, + KafkaConsumer, + librdkafkaVersion, + Message, + TopicPartition, +} from 'node-rdkafka' import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' +import { createKafkaProducer } from '../../../kafka/producer' import { PluginsServerConfig, RedisPool, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' @@ -143,6 +152,7 @@ export class SessionRecordingIngester { // if ingestion is lagging on a single partition it is often hard to identify _why_, // this allows us to output more information for that partition private debugPartition: number | undefined = undefined + private ingestionWarningProducer?: HighLevelProducer constructor( private globalServerConfig: PluginsServerConfig, @@ -227,7 +237,6 @@ export class SessionRecordingIngester { */ this.promises.add(promise) - // eslint-disable-next-line @typescript-eslint/no-floating-promises promise.finally(() => this.promises.delete(promise)) return promise @@ -326,11 +335,14 @@ export class SessionRecordingIngester { counterKafkaMessageReceived.inc({ partition }) - const recordingMessage = await parseKafkaMessage(message, (token) => - this.teamsRefresher.get().then((teams) => ({ - teamId: teams[token]?.teamId || null, - consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, - })) + const recordingMessage = await parseKafkaMessage( + message, + (token) => + this.teamsRefresher.get().then((teams) => ({ + teamId: teams[token]?.teamId || null, + consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, + })), + this.ingestionWarningProducer ) if (recordingMessage) { @@ -505,6 +517,10 @@ export class SessionRecordingIngester { status.info('🔁', 'blob_ingester_consumer batch consumer disconnected, cleaning up', { err }) await this.stop() }) + + const producerConfig = createRdProducerConfigFromEnvVars(this.config) + this.ingestionWarningProducer = await createKafkaProducer(connectionConfig, producerConfig) + this.ingestionWarningProducer.connect() } public async stop(): Promise[]> { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 4b4345d43b48d..e1bb3a458e572 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -1,12 +1,21 @@ import { captureException } from '@sentry/node' import { DateTime } from 'luxon' -import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka' +import { + HighLevelProducer, + KafkaConsumer, + Message, + MessageHeader, + PartitionMetadata, + TopicPartition, +} from 'node-rdkafka' import path from 'path' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types' +import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { status } from '../../../utils/status' import { cloneObject } from '../../../utils/utils' +import { captureIngestionWarning } from '../../../worker/ingestion/utils' import { eventDroppedCounter } from '../metrics' import { TeamIDWithConfig } from './session-recordings-consumer' import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' @@ -128,9 +137,27 @@ export async function readTokenFromHeaders( return { token, teamIdWithConfig } } +function readLibVersionFromHeaders(headers: MessageHeader[] | undefined): string | undefined { + const libVersionHeader = headers?.find((header: MessageHeader) => { + return header['lib_version'] + })?.['lib_version'] + return typeof libVersionHeader === 'string' ? libVersionHeader : libVersionHeader?.toString() +} + +function minorVersionFrom(libVersion: string | undefined): number | undefined { + try { + const minorString = libVersion && libVersion.includes('.') ? libVersion.split('.')[1] : undefined + return minorString ? parseInt(minorString) : undefined + } catch (e) { + status.warn('⚠️', 'could_not_read_minor_lib_version', { libVersion }) + return undefined + } +} + export const parseKafkaMessage = async ( message: Message, - getTeamFn: (s: string) => Promise + getTeamFn: (s: string) => Promise, + ingestionWarningProducer: HighLevelProducer | undefined ): Promise => { const dropMessage = (reason: string, extra?: Record) => { eventDroppedCounter @@ -166,6 +193,23 @@ export const parseKafkaMessage = async ( }) } + if (ingestionWarningProducer && teamIdWithConfig?.teamId) { + const libVersion = readLibVersionFromHeaders(message.headers) + const minorVersion = minorVersionFrom(libVersion) + if (minorVersion && minorVersion <= 74) { + await captureIngestionWarning( + new KafkaProducerWrapper(ingestionWarningProducer), + teamIdWithConfig.teamId, + 'replay_lib_version_too_old', + { + libVersion, + minorVersion, + }, + { key: libVersion || minorVersion.toString() } + ) + } + } + let messagePayload: RawEventMessage let event: PipelineEvent From bcd58342f28d3fa994704835374285ac38f55fbe Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 22:33:43 +0000 Subject: [PATCH 02/21] and some tests --- .../session-recordings-consumer.ts | 18 +-- .../session-recording/utils.ts | 54 ++++----- .../session-recording/utils.test.ts | 106 ++++++++++++++++-- 3 files changed, 130 insertions(+), 48 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index d702bc437729c..04f9fa5823aeb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -1,15 +1,7 @@ import { captureException } from '@sentry/node' import crypto from 'crypto' import { mkdirSync, rmSync } from 'node:fs' -import { - CODES, - features, - HighLevelProducer, - KafkaConsumer, - librdkafkaVersion, - Message, - TopicPartition, -} from 'node-rdkafka' +import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' @@ -19,6 +11,7 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars import { createKafkaProducer } from '../../../kafka/producer' import { PluginsServerConfig, RedisPool, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' +import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' import { createRedisPool } from '../../../utils/utils' @@ -152,7 +145,7 @@ export class SessionRecordingIngester { // if ingestion is lagging on a single partition it is often hard to identify _why_, // this allows us to output more information for that partition private debugPartition: number | undefined = undefined - private ingestionWarningProducer?: HighLevelProducer + private ingestionWarningProducer?: KafkaProducerWrapper constructor( private globalServerConfig: PluginsServerConfig, @@ -519,8 +512,9 @@ export class SessionRecordingIngester { }) const producerConfig = createRdProducerConfigFromEnvVars(this.config) - this.ingestionWarningProducer = await createKafkaProducer(connectionConfig, producerConfig) - this.ingestionWarningProducer.connect() + const producer = await createKafkaProducer(connectionConfig, producerConfig) + producer.connect() + this.ingestionWarningProducer = new KafkaProducerWrapper(producer) } public async stop(): Promise[]> { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index e1bb3a458e572..d5ff1c0ae4e86 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -1,13 +1,6 @@ import { captureException } from '@sentry/node' import { DateTime } from 'luxon' -import { - HighLevelProducer, - KafkaConsumer, - Message, - MessageHeader, - PartitionMetadata, - TopicPartition, -} from 'node-rdkafka' +import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka' import path from 'path' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' @@ -146,7 +139,14 @@ function readLibVersionFromHeaders(headers: MessageHeader[] | undefined): string function minorVersionFrom(libVersion: string | undefined): number | undefined { try { - const minorString = libVersion && libVersion.includes('.') ? libVersion.split('.')[1] : undefined + let minorString: string | undefined = undefined + if (libVersion && libVersion.includes('.')) { + const splat = libVersion.split('.') + // very loose check for three part semantic version number + if (splat.length === 3) { + minorString = splat[1] + } + } return minorString ? parseInt(minorString) : undefined } catch (e) { status.warn('⚠️', 'could_not_read_minor_lib_version', { libVersion }) @@ -157,7 +157,7 @@ function minorVersionFrom(libVersion: string | undefined): number | undefined { export const parseKafkaMessage = async ( message: Message, getTeamFn: (s: string) => Promise, - ingestionWarningProducer: HighLevelProducer | undefined + ingestionWarningProducer: KafkaProducerWrapper | undefined ): Promise => { const dropMessage = (reason: string, extra?: Record) => { eventDroppedCounter @@ -193,23 +193,6 @@ export const parseKafkaMessage = async ( }) } - if (ingestionWarningProducer && teamIdWithConfig?.teamId) { - const libVersion = readLibVersionFromHeaders(message.headers) - const minorVersion = minorVersionFrom(libVersion) - if (minorVersion && minorVersion <= 74) { - await captureIngestionWarning( - new KafkaProducerWrapper(ingestionWarningProducer), - teamIdWithConfig.teamId, - 'replay_lib_version_too_old', - { - libVersion, - minorVersion, - }, - { key: libVersion || minorVersion.toString() } - ) - } - } - let messagePayload: RawEventMessage let event: PipelineEvent @@ -252,6 +235,23 @@ export const parseKafkaMessage = async ( } // end of deprecated mechanism + if (!!ingestionWarningProducer && !!teamIdWithConfig.teamId) { + const libVersion = readLibVersionFromHeaders(message.headers) + const minorVersion = minorVersionFrom(libVersion) + if (minorVersion && minorVersion <= 74) { + await captureIngestionWarning( + ingestionWarningProducer, + teamIdWithConfig.teamId, + 'replay_lib_version_too_old', + { + libVersion, + minorVersion, + }, + { key: libVersion || minorVersion.toString() } + ) + } + } + const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { // we sometimes see events that are null // there will always be some unexpected data but, we should try to filter out the worst of it diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index b8e6dc59284e7..7e21c6dd3e0d5 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -1,3 +1,4 @@ +import { Settings } from 'luxon' import { Message, MessageHeader } from 'node-rdkafka' import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types' @@ -9,6 +10,7 @@ import { parseKafkaMessage, reduceRecordingMessages, } from '../../../../src/main/ingestion-queues/session-recording/utils' +import { KafkaProducerWrapper } from '../../../../src/utils/db/kafka-producer-wrapper' describe('session-recording utils', () => { const validMessage = (distinctId: number | string, headers?: MessageHeader[], value?: Record) => @@ -64,16 +66,26 @@ describe('session-recording utils', () => { } satisfies Message) describe('parsing the message', () => { + let fakeProducer: KafkaProducerWrapper + beforeEach(() => { + Settings.now = () => new Date('2023-08-30T19:15:54.887316+00:00').getTime() + fakeProducer = { queueMessage: jest.fn() } as unknown as KafkaProducerWrapper + }) + it('can parse a message correctly', async () => { - const parsedMessage = await parseKafkaMessage(validMessage('my-distinct-id'), () => - Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + const parsedMessage = await parseKafkaMessage( + validMessage('my-distinct-id'), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }), + fakeProducer ) expect(parsedMessage).toMatchSnapshot() }) it('can handle numeric distinct_ids', async () => { const numericId = 12345 - const parsedMessage = await parseKafkaMessage(validMessage(numericId), () => - Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + const parsedMessage = await parseKafkaMessage( + validMessage(numericId), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }), + fakeProducer ) expect(parsedMessage).toMatchObject({ distinct_id: String(numericId), @@ -126,7 +138,8 @@ describe('session-recording utils', () => { timestamp: null, }, ]), - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }), + fakeProducer ) expect(parsedMessage).toEqual(undefined) @@ -143,7 +156,8 @@ describe('session-recording utils', () => { timestamp: 123, }, ]), - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }), + fakeProducer ) expect(parsedMessage2).toMatchObject({ eventsByWindowId: { @@ -157,12 +171,85 @@ describe('session-recording utils', () => { }, }) - const parsedMessage3 = await parseKafkaMessage(createMessage([null]), () => - Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + const parsedMessage3 = await parseKafkaMessage( + createMessage([null]), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }), + fakeProducer ) expect(parsedMessage3).toEqual(undefined) }) + function expectedIngestionWarningMessage(details: Record): Record { + return { + value: JSON.stringify({ + team_id: 1, + type: 'replay_lib_version_too_old', + source: 'plugin-server', + details: JSON.stringify(details), + timestamp: '2023-08-30 19:15:54.887', + }), + } + } + + test.each([ + ['absent lib version means no call to capture ingestion warning', [], []], + ['unknown lib version means no call to capture ingestion warning', [{ lib_version: 'unknown' }], []], + ['not-three-part lib version means no call to capture ingestion warning', [{ lib_version: '1.25' }], []], + [ + 'three-part non-numeric lib version means no call to capture ingestion warning', + [{ lib_version: '1.twenty.2' }], + [], + ], + [ + 'three-part lib version that is recent enough means no call to capture ingestion warning', + [{ lib_version: '1.75.0' }], + [], + ], + [ + 'three-part lib version that is too old means call to capture ingestion warning', + [{ lib_version: '1.74.0' }], + [ + [ + { + messages: [ + expectedIngestionWarningMessage({ + libVersion: '1.74.0', + minorVersion: 74, + }), + ], + topic: 'clickhouse_ingestion_warnings_test', + }, + ], + ], + ], + [ + 'another three-part lib version that is too old means call to capture ingestion warning', + [{ lib_version: '1.32.0' }], + [ + [ + { + messages: [ + expectedIngestionWarningMessage({ + libVersion: '1.32.0', + minorVersion: 32, + }), + ], + topic: 'clickhouse_ingestion_warnings_test', + }, + ], + ], + ], + ])('lib_version - captureIngestionWarning - %s', async (_name, headers, expectedCalls) => { + await parseKafkaMessage( + validMessage(12345, [{ token: 'q123' } as MessageHeader].concat(headers), { + $snapshot_consumer: 'v2', + }), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }), + fakeProducer + ) + expect(jest.mocked(fakeProducer.queueMessage).mock.calls).toEqual(expectedCalls) + }) + describe('team token can be in header or body', () => { const mockTeamResolver = jest.fn() @@ -201,7 +288,8 @@ describe('session-recording utils', () => { validMessage(12345, headerToken ? [{ token: Buffer.from(headerToken) }] : undefined, { token: payloadToken, }), - mockTeamResolver + mockTeamResolver, + fakeProducer ) expect(mockTeamResolver.mock.calls).toEqual([expectedCalls]) }) From c13bc9a52926516a5f47db36aa17ee5a721c2184 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 22:46:33 +0000 Subject: [PATCH 03/21] deprecate old body parsing method for reading the token --- .../session-recording/utils.ts | 69 +++++++------------ .../session-recording/utils.test.ts | 15 ++-- 2 files changed, 33 insertions(+), 51 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index d5ff1c0ae4e86..b7d5a32e9c289 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -182,59 +182,23 @@ export const parseKafkaMessage = async ( const headerResult = await readTokenFromHeaders(message.headers, getTeamFn) const token: string | undefined = headerResult.token - let teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig - // NB `==` so we're comparing undefined and null - // if token was in the headers but, we could not load team config - // then, we can return early - if (!!token && (teamIdWithConfig == null || teamIdWithConfig.teamId == null)) { - return dropMessage('header_token_present_team_missing_or_disabled', { - token: token, - }) - } - - let messagePayload: RawEventMessage - let event: PipelineEvent - - try { - messagePayload = JSON.parse(message.value.toString()) - event = JSON.parse(messagePayload.data) - } catch (error) { - return dropMessage('invalid_json', { error }) + if (!token) { + return dropMessage('no_token_in_header') } - const { $snapshot_items, $session_id, $window_id, $snapshot_source } = event.properties || {} - - // NOTE: This is simple validation - ideally we should do proper schema based validation - if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) { - return dropMessage('received_non_snapshot_message') - } - - // TODO this mechanism is deprecated for blobby ingestion, we should remove it - // once we're happy that the new mechanism is working - // if there was not a token in the header then we try to load one from the message payload - if (teamIdWithConfig == null && messagePayload.team_id == null && !messagePayload.token) { - return dropMessage('no_token_in_header_or_payload') - } - - if (teamIdWithConfig == null) { - const token = messagePayload.token - - if (token) { - teamIdWithConfig = await getTeamFn(token) - } - } + const teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig // NB `==` so we're comparing undefined and null + // if token was in the headers but, we could not load team config + // then, we can return early if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { - return dropMessage('token_fallback_team_missing_or_disabled', { - token: messagePayload.token, - teamId: messagePayload.team_id, - payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', + return dropMessage('header_token_present_team_missing_or_disabled', { + token: token, }) } - // end of deprecated mechanism + // this has to be ahead of the payload parsing in case we start dropping traffic from older versions if (!!ingestionWarningProducer && !!teamIdWithConfig.teamId) { const libVersion = readLibVersionFromHeaders(message.headers) const minorVersion = minorVersionFrom(libVersion) @@ -252,6 +216,23 @@ export const parseKafkaMessage = async ( } } + let messagePayload: RawEventMessage + let event: PipelineEvent + + try { + messagePayload = JSON.parse(message.value.toString()) + event = JSON.parse(messagePayload.data) + } catch (error) { + return dropMessage('invalid_json', { error }) + } + + const { $snapshot_items, $session_id, $window_id, $snapshot_source } = event.properties || {} + + // NOTE: This is simple validation - ideally we should do proper schema based validation + if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) { + return dropMessage('received_non_snapshot_message') + } + const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { // we sometimes see events that are null // there will always be some unexpected data but, we should try to filter out the worst of it diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index 7e21c6dd3e0d5..be2e3b9a88a82 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -15,7 +15,7 @@ import { KafkaProducerWrapper } from '../../../../src/utils/db/kafka-producer-wr describe('session-recording utils', () => { const validMessage = (distinctId: number | string, headers?: MessageHeader[], value?: Record) => ({ - headers, + headers: headers || [{ token: 'the_token' }], value: Buffer.from( JSON.stringify({ uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', @@ -103,6 +103,7 @@ describe('session-recording utils', () => { const createMessage = ($snapshot_items: unknown[]) => { return { + headers: [{ token: Buffer.from('the_token') }], value: Buffer.from( JSON.stringify({ uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', @@ -250,7 +251,7 @@ describe('session-recording utils', () => { expect(jest.mocked(fakeProducer.queueMessage).mock.calls).toEqual(expectedCalls) }) - describe('team token can be in header or body', () => { + describe('team token must be in header *not* body', () => { const mockTeamResolver = jest.fn() beforeEach(() => { @@ -260,13 +261,13 @@ describe('session-recording utils', () => { test.each([ [ - 'calls the team id resolver once when token is in header, not in the body', + 'calls the team id resolver once when token is in header, even if not in the body', 'the_token', undefined, ['the_token'], ], [ - 'calls the team id resolver once when token is in header, and in the body', + 'calls the team id resolver once when token is in header, even if it is in the body', 'the_token', 'the body token', ['the_token'], @@ -278,14 +279,14 @@ describe('session-recording utils', () => { undefined, ], [ - 'calls the team id resolver twice when token is not in header, and is in body', + 'calls the team id resolver once when token is not in header, even though it is in the body', undefined, 'the body token', - ['the body token'], + undefined, ], ])('%s', async (_name, headerToken, payloadToken, expectedCalls) => { await parseKafkaMessage( - validMessage(12345, headerToken ? [{ token: Buffer.from(headerToken) }] : undefined, { + validMessage(12345, headerToken ? [{ token: Buffer.from(headerToken) }] : [], { token: payloadToken, }), mockTeamResolver, From a0baa69e3803b6d4a239629283a48ba2cc46ba12 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 22:55:04 +0000 Subject: [PATCH 04/21] add a counter too so we can see this in grafana dashboard --- .../src/main/ingestion-queues/session-recording/utils.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index b7d5a32e9c289..3e28f150b34d0 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -2,6 +2,7 @@ import { captureException } from '@sentry/node' import { DateTime } from 'luxon' import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka' import path from 'path' +import { Counter } from 'prom-client' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types' @@ -13,6 +14,11 @@ import { eventDroppedCounter } from '../metrics' import { TeamIDWithConfig } from './session-recordings-consumer' import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' +const counterLibVersionWarning = new Counter({ + name: 'lib_version_warning_counter', + help: 'the number of times we have seen aa message with a lib version that is too old, each _might_ cause an ingestion warning if not debounced', +}) + // Helper to return now as a milliseconds timestamp export const now = () => DateTime.now().toMillis() @@ -203,6 +209,8 @@ export const parseKafkaMessage = async ( const libVersion = readLibVersionFromHeaders(message.headers) const minorVersion = minorVersionFrom(libVersion) if (minorVersion && minorVersion <= 74) { + counterLibVersionWarning.inc() + await captureIngestionWarning( ingestionWarningProducer, teamIdWithConfig.teamId, From 6db56b8973db32a3b9416dc9d41b2f5faaf0e58b Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 23:00:46 +0000 Subject: [PATCH 05/21] typo --- .../src/main/ingestion-queues/session-recording/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 3e28f150b34d0..d3280ca4a53bb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -16,7 +16,7 @@ import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' const counterLibVersionWarning = new Counter({ name: 'lib_version_warning_counter', - help: 'the number of times we have seen aa message with a lib version that is too old, each _might_ cause an ingestion warning if not debounced', + help: 'the number of times we have seen a message with a lib version that is too old, each _might_ cause an ingestion warning if not debounced', }) // Helper to return now as a milliseconds timestamp From 9970346c2c8c252dc009ec4c02ea561fcd0c8b73 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 23:14:02 +0000 Subject: [PATCH 06/21] fix v3 consumer to ignore ingestion warning --- .../session-recordings-consumer-v3.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index b161a1990ec4b..156d23aa0a67c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -136,7 +136,6 @@ export class SessionRecordingIngesterV3 { */ this.promises.add(promise) - // eslint-disable-next-line @typescript-eslint/no-floating-promises promise.finally(() => this.promises.delete(promise)) return promise @@ -191,11 +190,15 @@ export class SessionRecordingIngesterV3 { for (const message of messages) { counterKafkaMessageReceived.inc({ partition: message.partition }) - const recordingMessage = await parseKafkaMessage(message, (token) => - this.teamsRefresher.get().then((teams) => ({ - teamId: teams[token]?.teamId || null, - consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, - })) + const recordingMessage = await parseKafkaMessage( + message, + (token) => + this.teamsRefresher.get().then((teams) => ({ + teamId: teams[token]?.teamId || null, + consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, + })), + // v3 consumer does not emit ingestion warnings + undefined ) if (recordingMessage) { @@ -456,7 +459,7 @@ export class SessionRecordingIngesterV3 { } private setupHttpRoutes() { - // Mimic the app sever's endpoint + // Mimic the app server's endpoint expressApp.get('/api/projects/:projectId/session_recordings/:sessionId/snapshots', async (req, res) => { await runInstrumentedFunction({ statsKey: `recordingingester.http.getSnapshots`, From 84ec78aacabecfb1b1dda4b9e6631ab93a1d909f Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 23:21:56 +0000 Subject: [PATCH 07/21] fix --- .../session-recordings-consumer-v3.ts | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index 156d23aa0a67c..85c95435b2065 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -460,54 +460,57 @@ export class SessionRecordingIngesterV3 { private setupHttpRoutes() { // Mimic the app server's endpoint - expressApp.get('/api/projects/:projectId/session_recordings/:sessionId/snapshots', async (req, res) => { - await runInstrumentedFunction({ - statsKey: `recordingingester.http.getSnapshots`, - func: async () => { - try { - const startTime = Date.now() - res.on('finish', function () { - status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`) - }) - - // validate that projectId is a number and sessionId is UUID like - const projectId = parseInt(req.params.projectId) - if (isNaN(projectId)) { - res.sendStatus(404) - return - } + expressApp.get( + '/api/projects/:projectId/session_recordings/:sessionId/snapshots', + async (req: any, res: any) => { + await runInstrumentedFunction({ + statsKey: `recordingingester.http.getSnapshots`, + func: async () => { + try { + const startTime = Date.now() + res.on('finish', function () { + status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`) + }) + + // validate that projectId is a number and sessionId is UUID like + const projectId = parseInt(req.params.projectId) + if (isNaN(projectId)) { + res.sendStatus(404) + return + } - const sessionId = req.params.sessionId - if (!/^[0-9a-f-]+$/.test(sessionId)) { - res.sendStatus(404) - return - } + const sessionId = req.params.sessionId + if (!/^[0-9a-f-]+$/.test(sessionId)) { + res.sendStatus(404) + return + } + + status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId }) - status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId }) + // We don't know the partition upfront so we have to recursively check all partitions + const partitions = await readdir(this.rootDir).catch(() => []) - // We don't know the partition upfront so we have to recursively check all partitions - const partitions = await readdir(this.rootDir).catch(() => []) + for (const partition of partitions) { + const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId) + const exists = await stat(sessionDir).catch(() => null) - for (const partition of partitions) { - const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId) - const exists = await stat(sessionDir).catch(() => null) + if (!exists) { + continue + } - if (!exists) { - continue + const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME)) + fileStream.pipe(res) + return } - const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME)) - fileStream.pipe(res) - return + res.sendStatus(404) + } catch (e) { + status.error('🔥', 'session-replay-ingestion - failed to fetch session', e) + res.sendStatus(500) } - - res.sendStatus(404) - } catch (e) { - status.error('🔥', 'session-replay-ingestion - failed to fetch session', e) - res.sendStatus(500) - } - }, - }) - }) + }, + }) + } + ) } } From 1aabc20db4f23c0541fe54f2915c1fab36c08e54 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 23:40:59 +0000 Subject: [PATCH 08/21] fix --- .../session-recording/session-recordings-consumer-v3.ts | 1 + .../session-recording/session-recordings-consumer.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index 85c95435b2065..0bc877d6f7496 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -136,6 +136,7 @@ export class SessionRecordingIngesterV3 { */ this.promises.add(promise) + // eslint-disable-next-line @typescript-eslint/no-floating-promises promise.finally(() => this.promises.delete(promise)) return promise diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 04f9fa5823aeb..cb98e4353381a 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -230,6 +230,7 @@ export class SessionRecordingIngester { */ this.promises.add(promise) + // eslint-disable-next-line @typescript-eslint/no-floating-promises promise.finally(() => this.promises.delete(promise)) return promise From f7365df064132dd449e293dd39966e2185d041c2 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 23:43:27 +0000 Subject: [PATCH 09/21] fix --- .../main/ingestion-queues/session-recording/fixtures.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts index 59a4b2e250dea..008128da24375 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -46,12 +46,13 @@ export function createKafkaMessage( messageOverrides: Partial = {}, eventProperties: Record = {} ): Message { - const message: Message = { + return { partition: 1, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, offset: 0, timestamp: messageOverrides.timestamp ?? Date.now(), size: 1, + headers: [{ token: token.toString() }], ...messageOverrides, value: Buffer.from( @@ -70,8 +71,6 @@ export function createKafkaMessage( }) ), } - - return message } export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) { From 55c8ef5cc8f3f2e2b2da3f32e74d335ef8ae5480 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Wed, 20 Mar 2024 09:56:51 +0000 Subject: [PATCH 10/21] add comment --- .../src/main/ingestion-queues/session-recording/utils.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index d3280ca4a53bb..3970456ef73c8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -208,6 +208,12 @@ export const parseKafkaMessage = async ( if (!!ingestionWarningProducer && !!teamIdWithConfig.teamId) { const libVersion = readLibVersionFromHeaders(message.headers) const minorVersion = minorVersionFrom(libVersion) + /** + * We introduced SVG mutation throttling in version 1.74.0 fix: Recording throttling for SVG-like things (#758) + * and improvements like jitter on retry and better batching in session recording in earlier versions + * So, versions older than 1.75.0 can cause ingestion pressure or incidents + * because they send much more information and more messages for the same recording + */ if (minorVersion && minorVersion <= 74) { counterLibVersionWarning.inc() From b22cdb888358f8d4f31fcf80f0afa5e4b3b50121 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Wed, 20 Mar 2024 12:48:37 +0000 Subject: [PATCH 11/21] disconnect producer --- .../session-recording/session-recordings-consumer.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index cb98e4353381a..c0bf69703dce8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -230,7 +230,6 @@ export class SessionRecordingIngester { */ this.promises.add(promise) - // eslint-disable-next-line @typescript-eslint/no-floating-promises promise.finally(() => this.promises.delete(promise)) return promise @@ -545,6 +544,8 @@ export class SessionRecordingIngester { await this.redisPool.drain() await this.redisPool.clear() + await this.ingestionWarningProducer?.disconnect() + status.info('👍', 'blob_ingester_consumer - stopped!') return promiseResults From c7ee67881c3ee3f2e047bc2de78e3e765be84054 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 21 Mar 2024 13:36:49 +0000 Subject: [PATCH 12/21] inject main cluster produer as a dependency from the recording consumer into its children --- .../services/console-logs-ingester.ts | 26 ++++-------- .../services/replay-events-ingester.ts | 24 ++++------- .../session-recordings-consumer-v3.ts | 37 ++++++++++------ .../session-recordings-consumer.ts | 42 +++++++++---------- .../src/utils/db/kafka-producer-wrapper.ts | 6 ++- .../services/console-log-ingester.test.ts | 18 ++++---- .../services/replay-events-ingester.test.ts | 16 +++---- .../session-recordings-consumer-v3.test.ts | 3 ++ 8 files changed, 81 insertions(+), 91 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index 5729da5cb373e..b29a2e16901d0 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -3,11 +3,9 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node- import { Counter } from 'prom-client' import { KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics' -import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config' import { findOffsetsToCommit } from '../../../../kafka/consumer' import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling' -import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' -import { PluginsServerConfig } from '../../../../types' +import { flushProducer, produce } from '../../../../kafka/producer' import { status } from '../../../../utils/status' import { eventDroppedCounter } from '../../metrics' import { ConsoleLogEntry, gatherConsoleLogEvents, RRWebEventType } from '../process-event' @@ -42,10 +40,8 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons // TODO this is an almost exact duplicate of the replay events ingester // am going to leave this duplication and then collapse it when/if we add a performance events ingester export class ConsoleLogsIngester { - producer?: RdKafkaProducer - constructor( - private readonly serverConfig: PluginsServerConfig, + private readonly producer: RdKafkaProducer, private readonly persistentHighWaterMarker?: OffsetHighWaterMarker ) {} @@ -175,21 +171,13 @@ export class ConsoleLogsIngester { } } - public async start(): Promise { - const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) - - const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig) - - this.producer = await createKafkaProducer(connectionConfig, producerConfig) - this.producer.connect() + public start(): void { + if (!this.producer.isConnected()) { + status.error('🔁', '[console-log-events-ingester] kakfa producer should have been connected by parent') + } } - public async stop(): Promise { + public stop(): void { status.info('🔁', '[console-log-events-ingester] stopping') - - if (this.producer && this.producer.isConnected()) { - status.info('🔁', '[console-log-events-ingester] disconnecting kafka producer in batchConsumer stop') - await disconnectProducer(this.producer) - } } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index 632f695a158f5..f7d270a79efc8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -5,11 +5,9 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node- import { Counter } from 'prom-client' import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics' -import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config' import { findOffsetsToCommit } from '../../../../kafka/consumer' import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling' -import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' -import { PluginsServerConfig } from '../../../../types' +import { flushProducer, produce } from '../../../../kafka/producer' import { KafkaProducerWrapper } from '../../../../utils/db/kafka-producer-wrapper' import { status } from '../../../../utils/status' import { captureIngestionWarning } from '../../../../worker/ingestion/utils' @@ -26,10 +24,8 @@ const replayEventsCounter = new Counter({ }) export class ReplayEventsIngester { - producer?: RdKafkaProducer - constructor( - private readonly serverConfig: PluginsServerConfig, + private readonly producer: RdKafkaProducer, private readonly persistentHighWaterMarker?: OffsetHighWaterMarker ) {} @@ -179,19 +175,13 @@ export class ReplayEventsIngester { }) } } - public async start(): Promise { - const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) - const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig) - this.producer = await createKafkaProducer(connectionConfig, producerConfig) - this.producer.connect() + public start(): void { + if (!this.producer.isConnected()) { + status.error('🔁', '[replay-events] kakfa producer should have been connected by parent') + } } - public async stop(): Promise { + public stop(): void { status.info('🔁', '[replay-events] stopping') - - if (this.producer && this.producer.isConnected()) { - status.info('🔁', '[replay-events] disconnecting kafka producer in batchConsumer stop') - await disconnectProducer(this.producer) - } } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index 0bc877d6f7496..4047739ff5743 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -8,9 +8,11 @@ import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' +import { createKafkaProducer } from '../../../kafka/producer' import { PluginsServerConfig, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' +import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' @@ -82,6 +84,8 @@ export class SessionRecordingIngesterV3 { // this allows us to output more information for that partition private debugPartition: number | undefined = undefined + private mainKafkaClusterProducer?: KafkaProducerWrapper + constructor( private globalServerConfig: PluginsServerConfig, private postgres: PostgresRouter, @@ -136,7 +140,6 @@ export class SessionRecordingIngesterV3 { */ this.promises.add(promise) - // eslint-disable-next-line @typescript-eslint/no-floating-promises promise.finally(() => this.promises.delete(promise)) return promise @@ -277,24 +280,30 @@ export class SessionRecordingIngesterV3 { // Load teams into memory await this.teamsRefresher.refresh() + // this producer uses the default plugin server config to connect to the main kafka cluster + const mainClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.globalServerConfig) + const producerConfig = createRdProducerConfigFromEnvVars(this.globalServerConfig) + const producer = await createKafkaProducer(mainClusterConnectionConfig, producerConfig) + producer.connect() + this.mainKafkaClusterProducer = new KafkaProducerWrapper(producer) + // NOTE: This is the only place where we need to use the shared server config if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) { - this.consoleLogsIngester = new ConsoleLogsIngester(this.globalServerConfig) - await this.consoleLogsIngester.start() + this.consoleLogsIngester = new ConsoleLogsIngester(producer) + this.consoleLogsIngester.start() } if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) { - this.replayEventsIngester = new ReplayEventsIngester(this.globalServerConfig) - await this.replayEventsIngester.start() + this.replayEventsIngester = new ReplayEventsIngester(producer) + this.replayEventsIngester.start() } - const connectionConfig = createRdConnectionConfigFromEnvVars(this.config) - // Create a node-rdkafka consumer that fetches batches of messages, runs // eachBatchWithContext, then commits offsets for the batch. - + // the batch consumer reads from the session replay kafka cluster + const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config) this.batchConsumer = await startBatchConsumer({ - connectionConfig, + connectionConfig: replayClusterConnectionConfig, groupId: KAFKA_CONSUMER_GROUP_ID, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, autoCommit: true, // NOTE: This is the crucial difference between this and the other consumer @@ -344,15 +353,19 @@ export class SessionRecordingIngesterV3 { ) ) + // stop is effectively a no-op on both of these but is kept here + // in case we want to add any cleanup logic in the future if (this.replayEventsIngester) { - void this.scheduleWork(this.replayEventsIngester.stop()) + this.replayEventsIngester.stop() } if (this.consoleLogsIngester) { - void this.scheduleWork(this.consoleLogsIngester!.stop()) + this.consoleLogsIngester.stop() } const promiseResults = await Promise.allSettled(this.promises) + await this.mainKafkaClusterProducer?.disconnect() + status.info('👍', 'session-replay-ingestion - stopped!') return promiseResults diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index c0bf69703dce8..09d3e84105cf1 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -145,7 +145,7 @@ export class SessionRecordingIngester { // if ingestion is lagging on a single partition it is often hard to identify _why_, // this allows us to output more information for that partition private debugPartition: number | undefined = undefined - private ingestionWarningProducer?: KafkaProducerWrapper + private mainKafkaClusterProducer?: KafkaProducerWrapper constructor( private globalServerConfig: PluginsServerConfig, @@ -335,7 +335,7 @@ export class SessionRecordingIngester { teamId: teams[token]?.teamId || null, consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, })), - this.ingestionWarningProducer + this.mainKafkaClusterProducer ) if (recordingMessage) { @@ -431,27 +431,30 @@ export class SessionRecordingIngester { // Load teams into memory await this.teamsRefresher.refresh() + // this producer uses the default plugin server config to connect to the main kafka cluster + const mainClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.globalServerConfig) + const producerConfig = createRdProducerConfigFromEnvVars(this.globalServerConfig) + const producer = await createKafkaProducer(mainClusterConnectionConfig, producerConfig) + producer.connect() + this.mainKafkaClusterProducer = new KafkaProducerWrapper(producer) + // NOTE: This is the only place where we need to use the shared server config if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) { - this.consoleLogsIngester = new ConsoleLogsIngester(this.globalServerConfig, this.persistentHighWaterMarker) - await this.consoleLogsIngester.start() + this.consoleLogsIngester = new ConsoleLogsIngester(producer, this.persistentHighWaterMarker) + this.consoleLogsIngester.start() } if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) { - this.replayEventsIngester = new ReplayEventsIngester( - this.globalServerConfig, - this.persistentHighWaterMarker - ) - await this.replayEventsIngester.start() + this.replayEventsIngester = new ReplayEventsIngester(producer, this.persistentHighWaterMarker) + this.replayEventsIngester.start() } - const connectionConfig = createRdConnectionConfigFromEnvVars(this.config) - // Create a node-rdkafka consumer that fetches batches of messages, runs // eachBatchWithContext, then commits offsets for the batch. - + // the batch consumer reads from the session replay kafka cluster + const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config) this.batchConsumer = await startBatchConsumer({ - connectionConfig, + connectionConfig: replayClusterConnectionConfig, groupId: KAFKA_CONSUMER_GROUP_ID, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, autoCommit: false, @@ -510,11 +513,6 @@ export class SessionRecordingIngester { status.info('🔁', 'blob_ingester_consumer batch consumer disconnected, cleaning up', { err }) await this.stop() }) - - const producerConfig = createRdProducerConfigFromEnvVars(this.config) - const producer = await createKafkaProducer(connectionConfig, producerConfig) - producer.connect() - this.ingestionWarningProducer = new KafkaProducerWrapper(producer) } public async stop(): Promise[]> { @@ -531,11 +529,13 @@ export class SessionRecordingIngester { void this.scheduleWork(this.onRevokePartitions(assignedPartitions)) void this.scheduleWork(this.realtimeManager.unsubscribe()) + // stop is effectively a no-op on both of these but is kept here + // in case we want to add any cleanup logic in the future if (this.replayEventsIngester) { - void this.scheduleWork(this.replayEventsIngester.stop()) + this.replayEventsIngester.stop() } if (this.consoleLogsIngester) { - void this.scheduleWork(this.consoleLogsIngester.stop()) + this.consoleLogsIngester.stop() } const promiseResults = await Promise.allSettled(this.promises) @@ -544,7 +544,7 @@ export class SessionRecordingIngester { await this.redisPool.drain() await this.redisPool.clear() - await this.ingestionWarningProducer?.disconnect() + await this.mainKafkaClusterProducer?.disconnect() status.info('👍', 'blob_ingester_consumer - stopped!') diff --git a/plugin-server/src/utils/db/kafka-producer-wrapper.ts b/plugin-server/src/utils/db/kafka-producer-wrapper.ts index 8f7cef4c06b30..99fcf522d4a14 100644 --- a/plugin-server/src/utils/db/kafka-producer-wrapper.ts +++ b/plugin-server/src/utils/db/kafka-producer-wrapper.ts @@ -104,8 +104,10 @@ export class KafkaProducerWrapper { } public async disconnect(): Promise { - await this.flush() - await disconnectProducer(this.producer) + if (this.producer.isConnected()) { + await this.flush() + await disconnectProducer(this.producer) + } } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts index 42dfb9e55b5c1..b181150e37a14 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts @@ -1,11 +1,9 @@ import { HighLevelProducer } from 'node-rdkafka' -import { defaultConfig } from '../../../../../src/config/config' -import { createKafkaProducer, produce } from '../../../../../src/kafka/producer' +import { produce } from '../../../../../src/kafka/producer' import { ConsoleLogsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/console-logs-ingester' import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker' import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types' -import { PluginsServerConfig } from '../../../../../src/types' import { status } from '../../../../../src/utils/status' jest.mock('../../../../../src/utils/status') @@ -37,17 +35,17 @@ describe('console log ingester', () => { let consoleLogIngester: ConsoleLogsIngester const mockProducer: jest.Mock = jest.fn() - beforeEach(async () => { + beforeEach(() => { mockProducer.mockClear() mockProducer['connect'] = jest.fn() - - jest.mocked(createKafkaProducer).mockImplementation(() => - Promise.resolve(mockProducer as unknown as HighLevelProducer) - ) + mockProducer['isConnected'] = () => true const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker - consoleLogIngester = new ConsoleLogsIngester({ ...defaultConfig } as PluginsServerConfig, mockedHighWaterMarker) - await consoleLogIngester.start() + consoleLogIngester = new ConsoleLogsIngester( + mockProducer as unknown as HighLevelProducer, + mockedHighWaterMarker + ) + consoleLogIngester.start() }) describe('when enabled on team', () => { test('it truncates large console logs', async () => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts index af798504406e6..f66e98f25e134 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts @@ -1,12 +1,11 @@ import { DateTime } from 'luxon' import { HighLevelProducer } from 'node-rdkafka' -import { defaultConfig } from '../../../../../src/config/config' -import { createKafkaProducer, produce } from '../../../../../src/kafka/producer' +import { produce } from '../../../../../src/kafka/producer' import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker' import { ReplayEventsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/replay-events-ingester' import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types' -import { PluginsServerConfig, TimestampFormat } from '../../../../../src/types' +import { TimestampFormat } from '../../../../../src/types' import { status } from '../../../../../src/utils/status' import { castTimestampOrNow } from '../../../../../src/utils/utils' @@ -36,17 +35,14 @@ describe('replay events ingester', () => { let ingester: ReplayEventsIngester const mockProducer: jest.Mock = jest.fn() - beforeEach(async () => { + beforeEach(() => { mockProducer.mockClear() mockProducer['connect'] = jest.fn() - - jest.mocked(createKafkaProducer).mockImplementation(() => - Promise.resolve(mockProducer as unknown as HighLevelProducer) - ) + mockProducer['isConnected'] = () => true const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker - ingester = new ReplayEventsIngester({ ...defaultConfig } as PluginsServerConfig, mockedHighWaterMarker) - await ingester.start() + ingester = new ReplayEventsIngester(mockProducer as unknown as HighLevelProducer, mockedHighWaterMarker) + ingester.start() }) test('does not ingest messages from a month in the future', async () => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts index d0c5d66ff8ec1..2229aea656d63 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -314,6 +314,9 @@ describe('ingester', () => { it('shuts down without error', async () => { await setup() + await ingester.start() + expect(ingester['mainKafkaClusterProducer']?.producer.isConnected()) + await expect(ingester.stop()).resolves.toMatchObject([ // destroy sessions, { status: 'fulfilled' }, From 5edfe8ee76eec326bf21f4891b5b072bd0331863 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 21 Mar 2024 13:38:53 +0000 Subject: [PATCH 13/21] inject main cluster produer as a dependency from the recording consumer into its children --- .../session-recordings-consumer-v3.test.ts | 4 ---- .../session-recordings-consumer.test.ts | 9 ++------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts index 2229aea656d63..b20713d45b801 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -320,10 +320,6 @@ describe('ingester', () => { await expect(ingester.stop()).resolves.toMatchObject([ // destroy sessions, { status: 'fulfilled' }, - // stop replay ingester - { status: 'fulfilled' }, - // stop console ingester - { status: 'fulfilled' }, ]) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 18dc39c7e5b2e..0b4acb021ceeb 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -532,13 +532,8 @@ describe('ingester', () => { it('shuts down without error', async () => { await setup() - // revoke, realtime unsub, replay stop, console ingestion stop - await expect(ingester.stop()).resolves.toMatchObject([ - { status: 'fulfilled' }, - { status: 'fulfilled' }, - { status: 'fulfilled' }, - { status: 'fulfilled' }, - ]) + // revoke, realtime unsub + await expect(ingester.stop()).resolves.toMatchObject([{ status: 'fulfilled' }, { status: 'fulfilled' }]) }) }) From 3533213616a2b5a88e4b451fe8f65fb8a0efe923 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 21 Mar 2024 13:42:25 +0000 Subject: [PATCH 14/21] void the promises that eslint or prettier keeps clearing the comment from --- .../session-recording/session-recordings-consumer-v3.ts | 3 ++- .../session-recording/session-recordings-consumer.ts | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index 4047739ff5743..328ba6c9ffba8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -140,7 +140,8 @@ export class SessionRecordingIngesterV3 { */ this.promises.add(promise) - promise.finally(() => this.promises.delete(promise)) + // we void the promise returned by finally here to avoid the need to await it + void promise.finally(() => this.promises.delete(promise)) return promise } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 09d3e84105cf1..89813c5596f90 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -230,7 +230,8 @@ export class SessionRecordingIngester { */ this.promises.add(promise) - promise.finally(() => this.promises.delete(promise)) + // we void the promise returned by finally here to avoid the need to await it + void promise.finally(() => this.promises.delete(promise)) return promise } From aa694c696dc1905181ec9d0f9182ae706ed23633 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Fri, 22 Mar 2024 11:13:21 +0000 Subject: [PATCH 15/21] does that fix the test --- plugin-server/src/utils/db/kafka-producer-wrapper.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/plugin-server/src/utils/db/kafka-producer-wrapper.ts b/plugin-server/src/utils/db/kafka-producer-wrapper.ts index 99fcf522d4a14..8f7cef4c06b30 100644 --- a/plugin-server/src/utils/db/kafka-producer-wrapper.ts +++ b/plugin-server/src/utils/db/kafka-producer-wrapper.ts @@ -104,10 +104,8 @@ export class KafkaProducerWrapper { } public async disconnect(): Promise { - if (this.producer.isConnected()) { - await this.flush() - await disconnectProducer(this.producer) - } + await this.flush() + await disconnectProducer(this.producer) } } From bf94d77f9058646d662de68bb5f206fd1af5f6a6 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 25 Mar 2024 08:08:05 +0000 Subject: [PATCH 16/21] not safe to call stop twice --- .../session-recordings-consumer-v3.test.ts | 349 +++++++++--------- 1 file changed, 176 insertions(+), 173 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts index b20713d45b801..32f5badb75b60 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -123,13 +123,6 @@ describe('ingester', () => { mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) }) - afterEach(async () => { - jest.setTimeout(10000) - await deleteKeysWithPrefix(hub) - await ingester.stop() - await closeHub() - }) - afterAll(() => { rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) jest.useRealTimers() @@ -151,202 +144,212 @@ describe('ingester', () => { ) } - it('can parse debug partition config', () => { - const config = { - SESSION_RECORDING_DEBUG_PARTITION: '103', - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig + describe('without stop called on teardown', () => { + afterEach(async () => { + jest.setTimeout(10000) + await deleteKeysWithPrefix(hub) + await closeHub() + }) - const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toEqual(103) - }) + describe('stop()', () => { + const setup = async (): Promise => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, noop) + } - it('can parse absence of debug partition config', () => { - const config = { - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig + it('shuts down without error', async () => { + await setup() - const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toBeUndefined() - }) + await ingester.start() + expect(ingester['mainKafkaClusterProducer']?.producer.isConnected()) - it('creates a new session manager if needed', async () => { - const event = createIncomingRecordingMessage() - await ingester.consume(event) - await waitForExpect(() => { - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1__session_id_1']).toBeDefined() + await expect(ingester.stop()).resolves.toMatchObject([ + // destroy sessions, + { status: 'fulfilled' }, + ]) + }) }) }) - it('handles multiple incoming sessions', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage({ - session_id: 'session_id_2', + describe('with stop called on teardown', () => { + it('can parse debug partition config', () => { + const config = { + SESSION_RECORDING_DEBUG_PARTITION: '103', + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig + + const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toEqual(103) }) - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(2) - expect(ingester.sessions['1__session_id_1']).toBeDefined() - expect(ingester.sessions['1__session_id_2']).toBeDefined() - }) - it('handles parallel ingestion of the same session', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage() - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1__session_id_1']).toBeDefined() - }) + it('can parse absence of debug partition config', () => { + const config = { + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig - it('destroys a session manager if finished', async () => { - const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` - const event = createIncomingRecordingMessage({ - session_id: sessionId, + const ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toBeUndefined() }) - await ingester.consume(event) - expect(ingester.sessions[`1__${sessionId}`]).toBeDefined() - ingester.sessions[`1__${sessionId}`].buffer!.createdAt = 0 - await ingester.flushAllReadySessions(() => undefined) + it('creates a new session manager if needed', async () => { + const event = createIncomingRecordingMessage() + await ingester.consume(event) + await waitForExpect(() => { + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['1__session_id_1']).toBeDefined() + }) + }) - await waitForExpect(() => { - expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined() - }, 10000) - }) + it('handles multiple incoming sessions', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage({ + session_id: 'session_id_2', + }) + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(Object.keys(ingester.sessions).length).toBe(2) + expect(ingester.sessions['1__session_id_1']).toBeDefined() + expect(ingester.sessions['1__session_id_2']).toBeDefined() + }) - describe('batch event processing', () => { - it('should batch parse incoming events and batch them to reduce writes', async () => { - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) - await ingester.handleEachBatch( - [ - createMessage('session_id_1', 1), - createMessage('session_id_1', 1), - createMessage('session_id_1', 1), - createMessage('session_id_2', 1), - ], - noop - ) - - expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.count).toBe(1) - expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.count).toBe(1) - - let fileContents = await fs.readFile( - path.join(ingester.sessions[`${team.id}__session_id_1`].context.dir, 'buffer.jsonl'), - 'utf-8' - ) - - expect(JSON.parse(fileContents).data).toHaveLength(3) - - fileContents = await fs.readFile( - path.join(ingester.sessions[`${team.id}__session_id_2`].context.dir, 'buffer.jsonl'), - 'utf-8' - ) - - expect(JSON.parse(fileContents).data).toHaveLength(1) + it('handles parallel ingestion of the same session', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage() + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['1__session_id_1']).toBeDefined() }) - }) - describe('simulated rebalancing', () => { - let otherIngester: SessionRecordingIngesterV3 - jest.setTimeout(5000) // Increased to cover lock delay + it('destroys a session manager if finished', async () => { + const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` + const event = createIncomingRecordingMessage({ + session_id: sessionId, + }) + await ingester.consume(event) + expect(ingester.sessions[`1__${sessionId}`]).toBeDefined() + ingester.sessions[`1__${sessionId}`].buffer!.createdAt = 0 - beforeEach(async () => { - otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) - await otherIngester.start() - }) + await ingester.flushAllReadySessions(() => undefined) - afterEach(async () => { - await otherIngester.stop() + await waitForExpect(() => { + expect(ingester.sessions[`1__${sessionId}`]).not.toBeDefined() + }, 10000) }) - const getSessions = ( - ingester: SessionRecordingIngesterV3 - ): (SessionManagerContext & SessionManagerBufferContext)[] => - Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer! })) - - /** - * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs - * Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up - */ - it('rebalances new consumers', async () => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) - await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) - - expect(getSessions(ingester)).toMatchObject([ - { sessionId: 'session_id_1', partition: 1, count: 1 }, - { sessionId: 'session_id_2', partition: 1, count: 1 }, - { sessionId: 'session_id_3', partition: 2, count: 1 }, - { sessionId: 'session_id_4', partition: 2, count: 1 }, - ]) - - // Call handleEachBatch with both consumers - we simulate the assignments which - // is what is responsible for the actual syncing of the sessions - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) - await otherIngester.handleEachBatch( - [createMessage('session_id_4', 2), createMessage('session_id_5', 2)], - noop - ) - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) - await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop) - - // Should still have the partition 1 sessions that didnt move with added events - expect(getSessions(ingester)).toMatchObject([ - { sessionId: 'session_id_1', partition: 1, count: 2 }, - { sessionId: 'session_id_2', partition: 1, count: 1 }, - ]) - expect(getSessions(otherIngester)).toMatchObject([ - { sessionId: 'session_id_3', partition: 2, count: 1 }, - { sessionId: 'session_id_4', partition: 2, count: 2 }, - { sessionId: 'session_id_5', partition: 2, count: 1 }, - ]) + describe('batch event processing', () => { + it('should batch parse incoming events and batch them to reduce writes', async () => { + mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + await ingester.handleEachBatch( + [ + createMessage('session_id_1', 1), + createMessage('session_id_1', 1), + createMessage('session_id_1', 1), + createMessage('session_id_2', 1), + ], + noop + ) + + expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.count).toBe(1) + expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.count).toBe(1) + + let fileContents = await fs.readFile( + path.join(ingester.sessions[`${team.id}__session_id_1`].context.dir, 'buffer.jsonl'), + 'utf-8' + ) + + expect(JSON.parse(fileContents).data).toHaveLength(3) + + fileContents = await fs.readFile( + path.join(ingester.sessions[`${team.id}__session_id_2`].context.dir, 'buffer.jsonl'), + 'utf-8' + ) + + expect(JSON.parse(fileContents).data).toHaveLength(1) + }) }) - }) - describe('stop()', () => { - const setup = async (): Promise => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, noop) - } + describe('simulated rebalancing', () => { + let otherIngester: SessionRecordingIngesterV3 + jest.setTimeout(5000) // Increased to cover lock delay - it('shuts down without error', async () => { - await setup() + beforeEach(async () => { + otherIngester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) + await otherIngester.start() + }) - await ingester.start() - expect(ingester['mainKafkaClusterProducer']?.producer.isConnected()) + afterEach(async () => { + await otherIngester.stop() + }) - await expect(ingester.stop()).resolves.toMatchObject([ - // destroy sessions, - { status: 'fulfilled' }, - ]) + const getSessions = ( + ingester: SessionRecordingIngesterV3 + ): (SessionManagerContext & SessionManagerBufferContext)[] => + Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer! })) + + /** + * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs + * Simulates the rebalance and tests that the handled sessions are successfully dropped and picked up + */ + it('rebalances new consumers', async () => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] + + mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) + + expect(getSessions(ingester)).toMatchObject([ + { sessionId: 'session_id_1', partition: 1, count: 1 }, + { sessionId: 'session_id_2', partition: 1, count: 1 }, + { sessionId: 'session_id_3', partition: 2, count: 1 }, + { sessionId: 'session_id_4', partition: 2, count: 1 }, + ]) + + // Call handleEachBatch with both consumers - we simulate the assignments which + // is what is responsible for the actual syncing of the sessions + mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + await otherIngester.handleEachBatch( + [createMessage('session_id_4', 2), createMessage('session_id_5', 2)], + noop + ) + mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop) + + // Should still have the partition 1 sessions that didnt move with added events + expect(getSessions(ingester)).toMatchObject([ + { sessionId: 'session_id_1', partition: 1, count: 2 }, + { sessionId: 'session_id_2', partition: 1, count: 1 }, + ]) + expect(getSessions(otherIngester)).toMatchObject([ + { sessionId: 'session_id_3', partition: 2, count: 1 }, + { sessionId: 'session_id_4', partition: 2, count: 2 }, + { sessionId: 'session_id_5', partition: 2, count: 1 }, + ]) + }) }) - }) - describe('when a team is disabled', () => { - it('ignores invalid teams', async () => { - // non-zero offset because the code can't commit offset 0 - await ingester.handleEachBatch( - [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), - ], - noop - ) - - expect(ingester.sessions).toEqual({}) + describe('when a team is disabled', () => { + it('ignores invalid teams', async () => { + // non-zero offset because the code can't commit offset 0 + await ingester.handleEachBatch( + [ + createKafkaMessage('invalid_token', { offset: 12 }), + createKafkaMessage('invalid_token', { offset: 13 }), + ], + noop + ) + + expect(ingester.sessions).toEqual({}) + }) }) - }) - describe('heartbeats', () => { - it('it should send them whilst processing', async () => { - const heartbeat = jest.fn() - // non-zero offset because the code can't commit offset 0 - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, heartbeat) + describe('heartbeats', () => { + it('it should send them whilst processing', async () => { + const heartbeat = jest.fn() + // non-zero offset because the code can't commit offset 0 + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, heartbeat) - expect(heartbeat).toBeCalledTimes(5) + expect(heartbeat).toBeCalledTimes(5) + }) }) }) }) From 75ae215a92098616613476bb171d21b7409f3546 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 25 Mar 2024 08:18:59 +0000 Subject: [PATCH 17/21] not safe to call stop twice --- .../session-recordings-consumer.test.ts | 788 +++++++++--------- 1 file changed, 402 insertions(+), 386 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 0b4acb021ceeb..267c96f800518 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -111,13 +111,6 @@ describe('ingester', () => { mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) }) - afterEach(async () => { - jest.setTimeout(10000) - await deleteKeysWithPrefix(hub) - await ingester.stop() - await closeHub() - }) - afterAll(() => { rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) jest.useRealTimers() @@ -144,442 +137,465 @@ describe('ingester', () => { ) } - it('can parse debug partition config', () => { - const config = { - SESSION_RECORDING_DEBUG_PARTITION: '103', - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig - - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toEqual(103) - }) + describe('without stop called on teardown', () => { + afterEach(async () => { + jest.setTimeout(10000) + await deleteKeysWithPrefix(hub) + await closeHub() + }) - it('can parse absence of debug partition config', () => { - const config = { - KAFKA_HOSTS: 'localhost:9092', - } satisfies Partial as PluginsServerConfig + describe('stop()', () => { + const setup = async (): Promise => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, noop) + } - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) - expect(ingester['debugPartition']).toBeUndefined() - }) + // NOTE: This test is a sanity check for the follow up test. It demonstrates what happens if we shutdown in the wrong order + // It doesn't reliably work though as the onRevoke is called via the kafka lib ending up with dangling promises so rather it is here as a reminder + // demonstation for when we need it + it.skip('shuts down with error if redis forcefully shutdown', async () => { + await setup() + + await ingester.redisPool.drain() + await ingester.redisPool.clear() + + // revoke, realtime unsub, replay stop + await expect(ingester.stop()).resolves.toMatchObject([ + { status: 'rejected' }, + { status: 'fulfilled' }, + { status: 'fulfilled' }, + ]) + }) + it('shuts down without error', async () => { + await setup() - it('creates a new session manager if needed', async () => { - const event = createIncomingRecordingMessage() - await ingester.consume(event) - await waitForExpect(() => { - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['1-session_id_1']).toBeDefined() + // revoke, realtime unsub + await expect(ingester.stop()).resolves.toMatchObject([{ status: 'fulfilled' }, { status: 'fulfilled' }]) + }) }) }) - it('removes sessions on destroy', async () => { - await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_1' })) - await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_2' })) - - expect(Object.keys(ingester.sessions).length).toBe(2) - expect(ingester.sessions['2-session_id_1']).toBeDefined() - expect(ingester.sessions['2-session_id_2']).toBeDefined() - - await ingester.destroySessions([['2-session_id_1', ingester.sessions['2-session_id_1']]]) + describe('with stop called on teardown', () => { + afterEach(async () => { + jest.setTimeout(10000) + await deleteKeysWithPrefix(hub) + await ingester.stop() + await closeHub() + }) - expect(Object.keys(ingester.sessions).length).toBe(1) - expect(ingester.sessions['2-session_id_2']).toBeDefined() - }) + it('can parse debug partition config', () => { + const config = { + SESSION_RECORDING_DEBUG_PARTITION: '103', + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig - it('handles multiple incoming sessions', async () => { - const event = createIncomingRecordingMessage() - const event2 = createIncomingRecordingMessage({ - session_id: 'session_id_2', + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toEqual(103) }) - await Promise.all([ingester.consume(event), ingester.consume(event2)]) - expect(Object.keys(ingester.sessions).length).toBe(2) - expect(ingester.sessions['1-session_id_1']).toBeDefined() - expect(ingester.sessions['1-session_id_2']).toBeDefined() - }) - // This test is flaky and no-one has time to look into it https://posthog.slack.com/archives/C0460HY55M0/p1696437876690329 - it.skip('destroys a session manager if finished', async () => { - const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` - const event = createIncomingRecordingMessage({ - session_id: sessionId, + it('can parse absence of debug partition config', () => { + const config = { + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig + + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) + expect(ingester['debugPartition']).toBeUndefined() }) - await ingester.consume(event) - expect(ingester.sessions[`1-${sessionId}`]).toBeDefined() - // Force the flush - ingester.partitionMetrics[event.metadata.partition] = { - lastMessageTimestamp: Date.now() + defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS, - } - - await ingester.flushAllReadySessions(noop) - - await waitForExpect(() => { - expect(ingester.sessions[`1-${sessionId}`]).not.toBeDefined() - }, 10000) - }) - describe('offset committing', () => { - it('should commit offsets in simple cases', async () => { - await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')], noop) - expect(ingester.partitionMetrics[1]).toMatchObject({ - lastMessageOffset: 2, + it('creates a new session manager if needed', async () => { + const event = createIncomingRecordingMessage() + await ingester.consume(event) + await waitForExpect(() => { + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['1-session_id_1']).toBeDefined() }) - - await commitAllOffsets() - // Doesn't flush if we have a blocking session - expect(mockConsumer.commit).toHaveBeenCalledTimes(0) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() - - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - offset: 2 + 1, - partition: 1, - }) - ) }) - it.skip('should commit higher values but not lower', async () => { - await ingester.handleEachBatch([createMessage('sid1')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - expect(ingester.partitionMetrics[1].lastMessageOffset).toBe(1) - await commitAllOffsets() - - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 2, - }) - ) + it('removes sessions on destroy', async () => { + await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_1' })) + await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'session_id_2' })) - // Repeat commit doesn't do anything - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(Object.keys(ingester.sessions).length).toBe(2) + expect(ingester.sessions['2-session_id_1']).toBeDefined() + expect(ingester.sessions['2-session_id_2']).toBeDefined() - await ingester.handleEachBatch([createMessage('sid1')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() + await ingester.destroySessions([['2-session_id_1', ingester.sessions['2-session_id_1']]]) - expect(mockConsumer.commit).toHaveBeenCalledTimes(2) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 2 + 1, - }) - ) + expect(Object.keys(ingester.sessions).length).toBe(1) + expect(ingester.sessions['2-session_id_2']).toBeDefined() }) - it('should commit the lowest known offset if there is a blocking session', async () => { - await ingester.handleEachBatch( - [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], - noop - ) - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() + it('handles multiple incoming sessions', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage({ + session_id: 'session_id_2', + }) + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(Object.keys(ingester.sessions).length).toBe(2) + expect(ingester.sessions['1-session_id_1']).toBeDefined() + expect(ingester.sessions['1-session_id_2']).toBeDefined() + }) - expect(ingester.partitionMetrics[1]).toMatchObject({ - lastMessageOffset: 4, + // This test is flaky and no-one has time to look into it https://posthog.slack.com/archives/C0460HY55M0/p1696437876690329 + it.skip('destroys a session manager if finished', async () => { + const sessionId = `destroys-a-session-manager-if-finished-${randomUUID()}` + const event = createIncomingRecordingMessage({ + session_id: sessionId, }) + await ingester.consume(event) + expect(ingester.sessions[`1-${sessionId}`]).toBeDefined() + // Force the flush + ingester.partitionMetrics[event.metadata.partition] = { + lastMessageTimestamp: Date.now() + defaultConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS, + } - // No offsets are below the blocking one - expect(mockConsumer.commit).not.toHaveBeenCalled() - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await ingester.flushAllReadySessions(noop) - // Subsequent commit will commit the last known offset - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 4 + 1, - }) - ) + await waitForExpect(() => { + expect(ingester.sessions[`1-${sessionId}`]).not.toBeDefined() + }, 10000) }) - it('should commit one lower than the blocking session if that is the highest', async () => { - await ingester.handleEachBatch( - [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], - noop - ) - // Flush the second session so the first one is still blocking - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() - - // No offsets are below the blocking one - expect(mockConsumer.commit).not.toHaveBeenCalled() - - // Add a new message and session and flush the old one - await ingester.handleEachBatch([createMessage('sid2')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() - - // We should commit the offset of the blocking session - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: ingester.sessions[`${team.id}-sid2`].getLowestOffset(), + describe('offset committing', () => { + it('should commit offsets in simple cases', async () => { + await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')], noop) + expect(ingester.partitionMetrics[1]).toMatchObject({ + lastMessageOffset: 2, }) - ) - }) - it.skip('should not be affected by other partitions ', async () => { - await ingester.handleEachBatch( - [createMessage('sid1', 1), createMessage('sid2', 2), createMessage('sid2', 2)], - noop - ) + await commitAllOffsets() + // Doesn't flush if we have a blocking session + expect(mockConsumer.commit).toHaveBeenCalledTimes(0) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + offset: 2 + 1, + partition: 1, + }) + ) + }) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await ingester.handleEachBatch([createMessage('sid1', 1)], noop) + it.skip('should commit higher values but not lower', async () => { + await ingester.handleEachBatch([createMessage('sid1')], noop) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + expect(ingester.partitionMetrics[1].lastMessageOffset).toBe(1) + await commitAllOffsets() + + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2, + }) + ) + + // Repeat commit doesn't do anything + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + + await ingester.handleEachBatch([createMessage('sid1')], noop) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + + expect(mockConsumer.commit).toHaveBeenCalledTimes(2) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2 + 1, + }) + ) + }) - // We should now have a blocking session on partition 1 and 2 with partition 1 being committable - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - partition: 1, - offset: 2, - }) - ) - - mockConsumer.commit.mockReset() - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(2) - expect(mockConsumer.commit).toHaveBeenCalledWith( - expect.objectContaining({ - partition: 1, - offset: 3, - }) - ) - expect(mockConsumer.commit).toHaveBeenCalledWith( - expect.objectContaining({ - partition: 2, - offset: 3, + it('should commit the lowest known offset if there is a blocking session', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], + noop + ) + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + + expect(ingester.partitionMetrics[1]).toMatchObject({ + lastMessageOffset: 4, }) - ) - }) - }) - describe('watermarkers', () => { - const getSessionWaterMarks = (partition = 1) => - ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition)) - const getPersistentWaterMarks = (partition = 1) => - ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition)) - - it('should update session watermarkers with flushing', async () => { - await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid3')], noop) - await expect(getSessionWaterMarks()).resolves.toEqual({}) - - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1 }) - await ingester.sessions[`${team.id}-sid3`].flush('buffer_age') - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1, sid2: 2, sid3: 3 }) - }) + // No offsets are below the blocking one + expect(mockConsumer.commit).not.toHaveBeenCalled() + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + + // Subsequent commit will commit the last known offset + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 4 + 1, + }) + ) + }) - it('should update partition watermarkers when committing', async () => { - await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid2'), createMessage('sid1')], noop) - await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') - await commitAllOffsets() - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - - // all replay events should be watermarked up until the 3rd message as they HAVE been processed - // whereas the commited kafka offset should be the 1st message as the 2nd message HAS not been processed - await expect(getPersistentWaterMarks()).resolves.toEqual({ - 'session-recordings-blob': 1, - session_replay_console_logs_events_ingester: 3, - session_replay_events_ingester: 3, + it('should commit one lower than the blocking session if that is the highest', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid2'), createMessage('sid2')], + noop + ) + // Flush the second session so the first one is still blocking + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + + // No offsets are below the blocking one + expect(mockConsumer.commit).not.toHaveBeenCalled() + + // Add a new message and session and flush the old one + await ingester.handleEachBatch([createMessage('sid2')], noop) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + + // We should commit the offset of the blocking session + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: ingester.sessions[`${team.id}-sid2`].getLowestOffset(), + }) + ) }) - // sid1 should be watermarked up until the 3rd message as it HAS been processed - await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 3 }) - }) - it('should drop events that are higher than the watermarks', async () => { - const events = [createMessage('sid1'), createMessage('sid2'), createMessage('sid2')] - - await expect(getPersistentWaterMarks()).resolves.toEqual({}) - await ingester.handleEachBatch([events[0], events[1]], noop) - await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') - await commitAllOffsets() - expect(mockConsumer.commit).not.toHaveBeenCalled() - await expect(getPersistentWaterMarks()).resolves.toEqual({ - session_replay_console_logs_events_ingester: 2, - session_replay_events_ingester: 2, + it.skip('should not be affected by other partitions ', async () => { + await ingester.handleEachBatch( + [createMessage('sid1', 1), createMessage('sid2', 2), createMessage('sid2', 2)], + noop + ) + + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await ingester.handleEachBatch([createMessage('sid1', 1)], noop) + + // We should now have a blocking session on partition 1 and 2 with partition 1 being committable + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + partition: 1, + offset: 2, + }) + ) + + mockConsumer.commit.mockReset() + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(2) + expect(mockConsumer.commit).toHaveBeenCalledWith( + expect.objectContaining({ + partition: 1, + offset: 3, + }) + ) + expect(mockConsumer.commit).toHaveBeenCalledWith( + expect.objectContaining({ + partition: 2, + offset: 3, + }) + ) }) - await expect(getSessionWaterMarks()).resolves.toEqual({ - sid2: 2, // only processed the second message so far + }) + + describe('watermarkers', () => { + const getSessionWaterMarks = (partition = 1) => + ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition)) + const getPersistentWaterMarks = (partition = 1) => + ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition)) + + it('should update session watermarkers with flushing', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid3')], + noop + ) + await expect(getSessionWaterMarks()).resolves.toEqual({}) + + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1 }) + await ingester.sessions[`${team.id}-sid3`].flush('buffer_age') + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 1, sid2: 2, sid3: 3 }) }) - // Simulate a re-processing - await ingester.destroySessions(Object.entries(ingester.sessions)) - await ingester.handleEachBatch(events, noop) - expect(ingester.sessions[`${team.id}-sid2`].buffer.count).toBe(1) - expect(ingester.sessions[`${team.id}-sid1`].buffer.count).toBe(1) - }) - }) + it('should update partition watermarkers when committing', async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid1')], + noop + ) + await ingester.sessions[`${team.id}-sid1`].flush('buffer_age') + await commitAllOffsets() + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + + // all replay events should be watermarked up until the 3rd message as they HAVE been processed + // whereas the commited kafka offset should be the 1st message as the 2nd message HAS not been processed + await expect(getPersistentWaterMarks()).resolves.toEqual({ + 'session-recordings-blob': 1, + session_replay_console_logs_events_ingester: 3, + session_replay_events_ingester: 3, + }) + // sid1 should be watermarked up until the 3rd message as it HAS been processed + await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 3 }) + }) - describe('simulated rebalancing', () => { - let otherIngester: SessionRecordingIngester - jest.setTimeout(5000) // Increased to cover lock delay + it('should drop events that are higher than the watermarks', async () => { + const events = [createMessage('sid1'), createMessage('sid2'), createMessage('sid2')] + + await expect(getPersistentWaterMarks()).resolves.toEqual({}) + await ingester.handleEachBatch([events[0], events[1]], noop) + await ingester.sessions[`${team.id}-sid2`].flush('buffer_age') + await commitAllOffsets() + expect(mockConsumer.commit).not.toHaveBeenCalled() + await expect(getPersistentWaterMarks()).resolves.toEqual({ + session_replay_console_logs_events_ingester: 2, + session_replay_events_ingester: 2, + }) + await expect(getSessionWaterMarks()).resolves.toEqual({ + sid2: 2, // only processed the second message so far + }) - beforeEach(async () => { - otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) - await otherIngester.start() + // Simulate a re-processing + await ingester.destroySessions(Object.entries(ingester.sessions)) + await ingester.handleEachBatch(events, noop) + expect(ingester.sessions[`${team.id}-sid2`].buffer.count).toBe(1) + expect(ingester.sessions[`${team.id}-sid1`].buffer.count).toBe(1) + }) }) - afterEach(async () => { - await otherIngester.stop() - }) + describe('simulated rebalancing', () => { + let otherIngester: SessionRecordingIngester + jest.setTimeout(5000) // Increased to cover lock delay - /** - * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs - */ - it('rebalances new consumers', async () => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) - await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) - - expect( - Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['1:session_id_1:1', '1:session_id_2:1', '2:session_id_3:1', '2:session_id_4:1']) - - const rebalancePromises = [ingester.onRevokePartitions([createTP(2), createTP(3)])] - - // Should immediately be removed from the tracked sessions - expect( - Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) - - // Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit - // Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) - await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)], noop) - await Promise.all(rebalancePromises) - - // Should still have the partition 1 sessions that didnt move - expect( - Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) - - // Should have session_id_4 but not session_id_3 as it was flushed - expect( - Object.values(otherIngester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) - ).toEqual(['2:session_id_3:1', '2:session_id_4:1']) - }) + beforeEach(async () => { + otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) + await otherIngester.start() + }) - it("flushes and commits as it's revoked", async () => { - await ingester.handleEachBatch( - [createMessage('sid1'), createMessage('sid2'), createMessage('sid3', 2)], - noop - ) - - expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ - expect.stringContaining(`${team.id}.sid1.`), // gz - expect.stringContaining(`${team.id}.sid1.`), // json - expect.stringContaining(`${team.id}.sid2.`), // gz - expect.stringContaining(`${team.id}.sid2.`), // json - expect.stringContaining(`${team.id}.sid3.`), // gz - expect.stringContaining(`${team.id}.sid3.`), // json - ]) - - const revokePromise = ingester.onRevokePartitions([createTP(1)]) - - expect(Object.keys(ingester.sessions)).toEqual([`${team.id}-sid3`]) - - await revokePromise - - // Only files left on the system should be the sid3 ones - expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ - expect.stringContaining(`${team.id}.sid3.`), // gz - expect.stringContaining(`${team.id}.sid3.`), // json - ]) - - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenLastCalledWith( - expect.objectContaining({ - offset: 2 + 1, - partition: 1, - }) - ) - }) - }) + afterEach(async () => { + await otherIngester.stop() + }) - describe('stop()', () => { - const setup = async (): Promise => { - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, noop) - } - - // NOTE: This test is a sanity check for the follow up test. It demonstrates what happens if we shutdown in the wrong order - // It doesn't reliably work though as the onRevoke is called via the kafka lib ending up with dangling promises so rather it is here as a reminder - // demonstation for when we need it - it.skip('shuts down with error if redis forcefully shutdown', async () => { - await setup() - - await ingester.redisPool.drain() - await ingester.redisPool.clear() - - // revoke, realtime unsub, replay stop - await expect(ingester.stop()).resolves.toMatchObject([ - { status: 'rejected' }, - { status: 'fulfilled' }, - { status: 'fulfilled' }, - ]) - }) - it('shuts down without error', async () => { - await setup() + /** + * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs + */ + it('rebalances new consumers', async () => { + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] + + mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) + + expect( + Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['1:session_id_1:1', '1:session_id_2:1', '2:session_id_3:1', '2:session_id_4:1']) + + const rebalancePromises = [ingester.onRevokePartitions([createTP(2), createTP(3)])] + + // Should immediately be removed from the tracked sessions + expect( + Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) + + // Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit + // Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke + mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)], noop) + await Promise.all(rebalancePromises) + + // Should still have the partition 1 sessions that didnt move + expect( + Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['1:session_id_1:1', '1:session_id_2:1']) + + // Should have session_id_4 but not session_id_3 as it was flushed + expect( + Object.values(otherIngester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) + ).toEqual(['2:session_id_3:1', '2:session_id_4:1']) + }) - // revoke, realtime unsub - await expect(ingester.stop()).resolves.toMatchObject([{ status: 'fulfilled' }, { status: 'fulfilled' }]) + it("flushes and commits as it's revoked", async () => { + await ingester.handleEachBatch( + [createMessage('sid1'), createMessage('sid2'), createMessage('sid3', 2)], + noop + ) + + expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ + expect.stringContaining(`${team.id}.sid1.`), // gz + expect.stringContaining(`${team.id}.sid1.`), // json + expect.stringContaining(`${team.id}.sid2.`), // gz + expect.stringContaining(`${team.id}.sid2.`), // json + expect.stringContaining(`${team.id}.sid3.`), // gz + expect.stringContaining(`${team.id}.sid3.`), // json + ]) + + const revokePromise = ingester.onRevokePartitions([createTP(1)]) + + expect(Object.keys(ingester.sessions)).toEqual([`${team.id}-sid3`]) + + await revokePromise + + // Only files left on the system should be the sid3 ones + expect(readdirSync(config.SESSION_RECORDING_LOCAL_DIRECTORY + '/session-buffer-files')).toEqual([ + expect.stringContaining(`${team.id}.sid3.`), // gz + expect.stringContaining(`${team.id}.sid3.`), // json + ]) + + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenLastCalledWith( + expect.objectContaining({ + offset: 2 + 1, + partition: 1, + }) + ) + }) }) - }) - describe('when a team is disabled', () => { - it('can commit even if an entire batch is disabled', async () => { - // non-zero offset because the code can't commit offset 0 - await ingester.handleEachBatch( - [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), - ], - noop - ) - expect(mockConsumer.commit).toHaveBeenCalledTimes(1) - expect(mockConsumer.commit).toHaveBeenCalledWith({ - offset: 14, - partition: 1, - topic: 'session_recording_snapshot_item_events_test', + describe('when a team is disabled', () => { + it('can commit even if an entire batch is disabled', async () => { + // non-zero offset because the code can't commit offset 0 + await ingester.handleEachBatch( + [ + createKafkaMessage('invalid_token', { offset: 12 }), + createKafkaMessage('invalid_token', { offset: 13 }), + ], + noop + ) + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenCalledWith({ + offset: 14, + partition: 1, + topic: 'session_recording_snapshot_item_events_test', + }) }) }) - }) - describe('lag reporting', () => { - it('should return the latest offsets', async () => { - mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { - cb(null, { highOffset: 1000 + partition, lowOffset: 0 }) - }) + describe('lag reporting', () => { + it('should return the latest offsets', async () => { + mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { + cb(null, { highOffset: 1000 + partition, lowOffset: 0 }) + }) - const results = await ingester.latestOffsetsRefresher.get() + const results = await ingester.latestOffsetsRefresher.get() - expect(results).toEqual({ - 0: 1000, - 1: 1001, + expect(results).toEqual({ + 0: 1000, + 1: 1001, + }) }) }) - }) - describe('heartbeats', () => { - it('it should send them whilst processing', async () => { - const heartbeat = jest.fn() - // non-zero offset because the code can't commit offset 0 - const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] - await ingester.handleEachBatch(partitionMsgs1, heartbeat) + describe('heartbeats', () => { + it('it should send them whilst processing', async () => { + const heartbeat = jest.fn() + // non-zero offset because the code can't commit offset 0 + const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] + await ingester.handleEachBatch(partitionMsgs1, heartbeat) - // NOTE: the number here can change as we change the code. Important is that it is called a number of times - expect(heartbeat).toBeCalledTimes(7) + // NOTE: the number here can change as we change the code. Important is that it is called a number of times + expect(heartbeat).toBeCalledTimes(7) + }) }) }) }) From bcbeb8cc7486d6001cc3e3d22e11cc3620e56a2f Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 25 Mar 2024 11:42:02 +0000 Subject: [PATCH 18/21] remove start and stop --- .../services/console-logs-ingester.ts | 10 ---------- .../services/replay-events-ingester.ts | 9 --------- .../session-recordings-consumer-v3.ts | 11 ----------- .../session-recording/session-recordings-consumer.ts | 11 ----------- .../services/console-log-ingester.test.ts | 2 +- .../services/replay-events-ingester.test.ts | 2 +- 6 files changed, 2 insertions(+), 43 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index b29a2e16901d0..5f7fce87953f2 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -170,14 +170,4 @@ export class ConsoleLogsIngester { }) } } - - public start(): void { - if (!this.producer.isConnected()) { - status.error('🔁', '[console-log-events-ingester] kakfa producer should have been connected by parent') - } - } - - public stop(): void { - status.info('🔁', '[console-log-events-ingester] stopping') - } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index f7d270a79efc8..044befaae3743 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -175,13 +175,4 @@ export class ReplayEventsIngester { }) } } - public start(): void { - if (!this.producer.isConnected()) { - status.error('🔁', '[replay-events] kakfa producer should have been connected by parent') - } - } - - public stop(): void { - status.info('🔁', '[replay-events] stopping') - } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index 328ba6c9ffba8..c872259e9ce25 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -291,12 +291,10 @@ export class SessionRecordingIngesterV3 { // NOTE: This is the only place where we need to use the shared server config if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) { this.consoleLogsIngester = new ConsoleLogsIngester(producer) - this.consoleLogsIngester.start() } if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) { this.replayEventsIngester = new ReplayEventsIngester(producer) - this.replayEventsIngester.start() } // Create a node-rdkafka consumer that fetches batches of messages, runs @@ -354,15 +352,6 @@ export class SessionRecordingIngesterV3 { ) ) - // stop is effectively a no-op on both of these but is kept here - // in case we want to add any cleanup logic in the future - if (this.replayEventsIngester) { - this.replayEventsIngester.stop() - } - if (this.consoleLogsIngester) { - this.consoleLogsIngester.stop() - } - const promiseResults = await Promise.allSettled(this.promises) await this.mainKafkaClusterProducer?.disconnect() diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 00a0289b0b77f..3516db77f5961 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -455,12 +455,10 @@ export class SessionRecordingIngester { // NOTE: This is the only place where we need to use the shared server config if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) { this.consoleLogsIngester = new ConsoleLogsIngester(producer, this.persistentHighWaterMarker) - this.consoleLogsIngester.start() } if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) { this.replayEventsIngester = new ReplayEventsIngester(producer, this.persistentHighWaterMarker) - this.replayEventsIngester.start() } // Create a node-rdkafka consumer that fetches batches of messages, runs @@ -543,15 +541,6 @@ export class SessionRecordingIngester { void this.scheduleWork(this.onRevokePartitions(assignedPartitions)) void this.scheduleWork(this.realtimeManager.unsubscribe()) - // stop is effectively a no-op on both of these but is kept here - // in case we want to add any cleanup logic in the future - if (this.replayEventsIngester) { - this.replayEventsIngester.stop() - } - if (this.consoleLogsIngester) { - this.consoleLogsIngester.stop() - } - const promiseResults = await Promise.allSettled(this.promises) // Finally we clear up redis once we are sure everything else has been handled diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts index b181150e37a14..766093243818a 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts @@ -24,6 +24,7 @@ const makeIncomingMessage = ( topic: 'topic', timestamp: 0, consoleLogIngestionEnabled, + rawSize: 0, }, session_id: '', team_id: 0, @@ -45,7 +46,6 @@ describe('console log ingester', () => { mockProducer as unknown as HighLevelProducer, mockedHighWaterMarker ) - consoleLogIngester.start() }) describe('when enabled on team', () => { test('it truncates large console logs', async () => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts index f66e98f25e134..4ede35c422f70 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts @@ -24,6 +24,7 @@ const makeIncomingMessage = (source: string | null, timestamp: number): Incoming topic: 'topic', timestamp: timestamp, consoleLogIngestionEnabled: true, + rawSize: 0, }, session_id: '', team_id: 0, @@ -42,7 +43,6 @@ describe('replay events ingester', () => { const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker ingester = new ReplayEventsIngester(mockProducer as unknown as HighLevelProducer, mockedHighWaterMarker) - ingester.start() }) test('does not ingest messages from a month in the future', async () => { From 1b37861abb7b4c501e707d752bfb299bc4d0a033 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 28 Mar 2024 13:31:46 +0000 Subject: [PATCH 19/21] fix --- .../session-recording/utils.test.ts | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index 14a6ce48a5d42..a284ae4e08699 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -212,13 +212,16 @@ describe('session-recording utils', () => { [ [ { - messages: [ - expectedIngestionWarningMessage({ - libVersion: '1.74.0', - minorVersion: 74, - }), - ], - topic: 'clickhouse_ingestion_warnings_test', + kafkaMessage: { + messages: [ + expectedIngestionWarningMessage({ + libVersion: '1.74.0', + minorVersion: 74, + }), + ], + topic: 'clickhouse_ingestion_warnings_test', + }, + waitForAck: true, }, ], ], @@ -229,13 +232,16 @@ describe('session-recording utils', () => { [ [ { - messages: [ - expectedIngestionWarningMessage({ - libVersion: '1.32.0', - minorVersion: 32, - }), - ], - topic: 'clickhouse_ingestion_warnings_test', + kafkaMessage: { + messages: [ + expectedIngestionWarningMessage({ + libVersion: '1.32.0', + minorVersion: 32, + }), + ], + topic: 'clickhouse_ingestion_warnings_test', + }, + waitForAck: true, }, ], ], From 04d6fba25a693fee7597fe743cbd35b1ccbc845c Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 28 Mar 2024 14:01:07 +0000 Subject: [PATCH 20/21] the shortest route from a to b --- .../ingestion-queues/session-recording/utils.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 60487a47305cd..6b9f663ec2c14 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -137,23 +137,25 @@ export async function readTokenFromHeaders( } function readLibVersionFromHeaders(headers: MessageHeader[] | undefined): string | undefined { - const libVersionHeader = headers?.find((header: MessageHeader) => { + const libVersionHeader = headers?.find((header) => { return header['lib_version'] })?.['lib_version'] return typeof libVersionHeader === 'string' ? libVersionHeader : libVersionHeader?.toString() } -function minorVersionFrom(libVersion: string | undefined): number | undefined { +function majorAndMinorVersionFrom(libVersion: string | undefined): number | undefined { try { + let majorString: string | undefined = undefined let minorString: string | undefined = undefined if (libVersion && libVersion.includes('.')) { const splat = libVersion.split('.') // very loose check for three part semantic version number if (splat.length === 3) { + majorString = splat[0] minorString = splat[1] } } - return minorString ? parseInt(minorString) : undefined + return majorString && minorString ? parseFloat(`${majorString}.${minorString}`) : undefined } catch (e) { status.warn('⚠️', 'could_not_read_minor_lib_version', { libVersion }) return undefined @@ -206,14 +208,14 @@ export const parseKafkaMessage = async ( // this has to be ahead of the payload parsing in case we start dropping traffic from older versions if (!!ingestionWarningProducer && !!teamIdWithConfig.teamId) { const libVersion = readLibVersionFromHeaders(message.headers) - const minorVersion = minorVersionFrom(libVersion) + const parsedVersion = majorAndMinorVersionFrom(libVersion) /** * We introduced SVG mutation throttling in version 1.74.0 fix: Recording throttling for SVG-like things (#758) * and improvements like jitter on retry and better batching in session recording in earlier versions * So, versions older than 1.75.0 can cause ingestion pressure or incidents * because they send much more information and more messages for the same recording */ - if (minorVersion && minorVersion <= 74) { + if (parsedVersion && parsedVersion <= 1.74) { counterLibVersionWarning.inc() await captureIngestionWarning( @@ -222,9 +224,9 @@ export const parseKafkaMessage = async ( 'replay_lib_version_too_old', { libVersion, - minorVersion, + minorVersion: parsedVersion, }, - { key: libVersion || minorVersion.toString() } + { key: libVersion || parsedVersion.toString() } ) } } From 726891fea86bc115ca8f419a44cfd06ff2c1af60 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 28 Mar 2024 14:39:39 +0000 Subject: [PATCH 21/21] fix --- .../src/main/ingestion-queues/session-recording/utils.ts | 6 ++++-- .../main/ingestion-queues/session-recording/utils.test.ts | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 6b9f663ec2c14..f410674baa17c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -155,7 +155,9 @@ function majorAndMinorVersionFrom(libVersion: string | undefined): number | unde minorString = splat[1] } } - return majorString && minorString ? parseFloat(`${majorString}.${minorString}`) : undefined + const validMajor = majorString && !isNaN(parseInt(majorString)) + const validMinor = minorString && !isNaN(parseInt(minorString)) + return validMajor && validMinor ? parseFloat(`${majorString}.${minorString}`) : undefined } catch (e) { status.warn('⚠️', 'could_not_read_minor_lib_version', { libVersion }) return undefined @@ -224,7 +226,7 @@ export const parseKafkaMessage = async ( 'replay_lib_version_too_old', { libVersion, - minorVersion: parsedVersion, + parsedVersion, }, { key: libVersion || parsedVersion.toString() } ) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index a284ae4e08699..e73774dea2e08 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -216,7 +216,7 @@ describe('session-recording utils', () => { messages: [ expectedIngestionWarningMessage({ libVersion: '1.74.0', - minorVersion: 74, + parsedVersion: 1.74, }), ], topic: 'clickhouse_ingestion_warnings_test', @@ -236,7 +236,7 @@ describe('session-recording utils', () => { messages: [ expectedIngestionWarningMessage({ libVersion: '1.32.0', - minorVersion: 32, + parsedVersion: 1.32, }), ], topic: 'clickhouse_ingestion_warnings_test',