diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts index 59a4b2e250dea..79602befd50f3 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -13,7 +13,7 @@ export function createIncomingRecordingMessage( // that has properties, and they have $snapshot_data // that will have data_items, which are the actual snapshots each individually compressed - const message: IncomingRecordingMessage = { + return { team_id: 1, distinct_id: 'distinct_id', session_id: 'session_id_1', @@ -33,12 +33,11 @@ export function createIncomingRecordingMessage( lowOffset: 1, highOffset: 1, timestamp: 1, + rawSize: 1, ...partialIncomingMessage.metadata, ...partialMetadata, }, } - - return message } export function createKafkaMessage( @@ -46,12 +45,13 @@ export function createKafkaMessage( messageOverrides: Partial = {}, eventProperties: Record = {} ): Message { - const message: Message = { + return { partition: 1, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, offset: 0, timestamp: messageOverrides.timestamp ?? Date.now(), size: 1, + headers: [{ token: token.toString() }], ...messageOverrides, value: Buffer.from( @@ -70,8 +70,6 @@ export function createKafkaMessage( }) ), } - - return message } export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index c5a3851486d93..28c594566eb15 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -65,14 +65,15 @@ describe('session-recording utils', () => { describe('parsing the message', () => { it('can parse a message correctly', async () => { - const parsedMessage = await parseKafkaMessage(validMessage('my-distinct-id'), () => - Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + const parsedMessage = await parseKafkaMessage( + validMessage('my-distinct-id', [{ token: 'something' }]), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) ) expect(parsedMessage).toMatchSnapshot() }) it('can handle numeric distinct_ids', async () => { const numericId = 12345 - const parsedMessage = await parseKafkaMessage(validMessage(numericId), () => + const parsedMessage = await parseKafkaMessage(validMessage(numericId, [{ token: 'something' }]), () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) ) expect(parsedMessage).toMatchObject({ @@ -91,6 +92,7 @@ describe('session-recording utils', () => { const createMessage = ($snapshot_items: unknown[]) => { return { + headers: [{ token: 'the_token' }], value: Buffer.from( JSON.stringify({ uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', @@ -191,10 +193,10 @@ describe('session-recording utils', () => { undefined, ], [ - 'calls the team id resolver twice when token is not in header, and is in body', + 'does not call the team id resolver when token is not in header, but is in body', undefined, 'the body token', - ['the body token'], + undefined, ], ])('%s', async (_name, headerToken, payloadToken, expectedCalls) => { await parseKafkaMessage(