diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts index 2a384b86c23bd..c0869e773ca78 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts @@ -17,7 +17,7 @@ import { status } from '../../../../utils/status' import { asyncTimeoutGuard } from '../../../../utils/timing' import { ObjectStorage } from '../../../services/object_storage' import { IncomingRecordingMessage } from '../types' -import { bufferFileDir, convertToPersistedMessage, getLagMultipler, maxDefined, minDefined, now } from '../utils' +import { bufferFileDir, convertToPersistedMessage, getLagMultiplier, maxDefined, minDefined, now } from '../utils' import { OffsetHighWaterMarker } from './offset-high-water-marker' import { RealtimeManager } from './realtime-manager' @@ -217,7 +217,7 @@ export class SessionManager { return } - const lagMultiplier = getLagMultipler(partitionLag) + const lagMultiplier = getLagMultiplier(partitionLag) const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000 const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 530814010efb9..470827f569422 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -1,4 +1,4 @@ -import { captureException, captureMessage } from '@sentry/node' +import { captureException } from '@sentry/node' import { mkdirSync, rmSync } from 'node:fs' import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' import { Counter, Gauge, Histogram } from 'prom-client' @@ -7,7 +7,7 @@ import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' -import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types' +import { PluginsServerConfig, RedisPool, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' @@ -23,7 +23,7 @@ import { RealtimeManager } from './services/realtime-manager' import { ReplayEventsIngester } from './services/replay-events-ingester' import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager' import { IncomingRecordingMessage } from './types' -import { bufferFileDir, getPartitionsForTopic, now, queryWatermarkOffsets } from './utils' +import { bufferFileDir, getPartitionsForTopic, now, parseKafkaMessage, queryWatermarkOffsets } from './utils' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') @@ -251,119 +251,6 @@ export class SessionRecordingIngester { await this.sessions[key]?.add(event) } - public async parseKafkaMessage( - message: Message, - getTeamFn: (s: string) => Promise - ): Promise { - const statusWarn = (reason: string, extra?: Record) => { - status.warn('⚠️', 'invalid_message', { - reason, - partition: message.partition, - offset: message.offset, - ...(extra || {}), - }) - } - - if (!message.value || !message.timestamp) { - // Typing says this can happen but in practice it shouldn't - return statusWarn('message value or timestamp is empty') - } - - let messagePayload: RawEventMessage - let event: PipelineEvent - - try { - messagePayload = JSON.parse(message.value.toString()) - event = JSON.parse(messagePayload.data) - } catch (error) { - return statusWarn('invalid_json', { error }) - } - - const { $snapshot_items, $session_id, $window_id } = event.properties || {} - - // NOTE: This is simple validation - ideally we should do proper schema based validation - if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) { - status.warn('🙈', 'Received non-snapshot message, ignoring') - return - } - - if (messagePayload.team_id == null && !messagePayload.token) { - return statusWarn('no_token') - } - - let teamIdWithConfig: TeamIDWithConfig | null = null - const token = messagePayload.token - - if (token) { - teamIdWithConfig = await getTeamFn(token) - } - - // NB `==` so we're comparing undefined and null - if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { - eventDroppedCounter - .labels({ - event_type: 'session_recordings_blob_ingestion', - drop_cause: 'team_missing_or_disabled', - }) - .inc() - - return statusWarn('team_missing_or_disabled', { - token: messagePayload.token, - teamId: messagePayload.team_id, - payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', - }) - } - - const invalidEvents: any[] = [] - const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { - if (!event || !event.timestamp) { - invalidEvents.push(event) - return false - } - return true - }) - - if (invalidEvents.length) { - captureMessage('[session-manager]: invalid rrweb events filtered out from message', { - extra: { - invalidEvents, - eventsCount: events.length, - invalidEventsCount: invalidEvents.length, - event, - }, - tags: { - team_id: teamIdWithConfig.teamId, - session_id: $session_id, - }, - }) - } - - if (!events.length) { - status.warn('🙈', 'Event contained no valid rrweb events, ignoring') - - return statusWarn('invalid_rrweb_events', { - token: messagePayload.token, - teamId: messagePayload.team_id, - }) - } - - return { - metadata: { - partition: message.partition, - topic: message.topic, - offset: message.offset, - timestamp: message.timestamp, - consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled, - }, - - team_id: teamIdWithConfig.teamId, - distinct_id: messagePayload.distinct_id, - session_id: $session_id, - window_id: $window_id, - events: events, - } - } - public async handleEachBatch(messages: Message[]): Promise { status.info('🔁', `blob_ingester_consumer - handling batch`, { size: messages.length, @@ -395,7 +282,7 @@ export class SessionRecordingIngester { counterKafkaMessageReceived.inc({ partition }) - const recordingMessage = await this.parseKafkaMessage(message, (token) => + const recordingMessage = await parseKafkaMessage(message, (token) => this.teamsRefresher.get().then((teams) => ({ teamId: teams[token]?.teamId || null, consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, @@ -646,7 +533,7 @@ export class SessionRecordingIngester { partitionsToDrop[partition] = this.partitionMetrics[partition] ?? {} delete this.partitionMetrics[partition] - // Revoke the high water mark for this partition so we are essentially "reset" + // Revoke the high watermark for this partition, so we are essentially "reset" this.sessionHighWaterMarker.revoke(topicPartition) this.persistentHighWaterMarker.revoke(topicPartition) }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index a13fed116b5bf..64476d57c3cee 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -1,10 +1,13 @@ -import { captureException } from '@sentry/node' +import { captureException, captureMessage } from '@sentry/node' import { DateTime } from 'luxon' -import { KafkaConsumer, PartitionMetadata, TopicPartition } from 'node-rdkafka' +import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka' import path from 'path' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' +import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types' import { status } from '../../../utils/status' +import { eventDroppedCounter } from '../metrics' +import { TeamIDWithConfig } from './session-recordings-consumer' import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => { @@ -102,10 +105,159 @@ export const getPartitionsForTopic = ( }) } -export const getLagMultipler = (lag: number, threshold = 1000000) => { +export const getLagMultiplier = (lag: number, threshold = 1000000) => { if (lag < threshold) { return 1 } return Math.max(0.1, 1 - (lag - threshold) / (threshold * 10)) } + +export async function readTokenFromHeaders( + headers: MessageHeader[] | undefined, + getTeamFn: (s: string) => Promise +) { + const tokenHeader = headers?.find((header: MessageHeader) => { + // each header in the array is an object of key to value + // because it's possible to have multiple headers with the same key + // but, we don't support that. the first truthy match we find is the one we use + return header.token + })?.token + + const token = typeof tokenHeader === 'string' ? tokenHeader : tokenHeader?.toString() + + let teamIdWithConfig: TeamIDWithConfig | null = null + + if (token) { + teamIdWithConfig = await getTeamFn(token) + } + return { token, teamIdWithConfig } +} + +export const parseKafkaMessage = async ( + message: Message, + getTeamFn: (s: string) => Promise +): Promise => { + const dropMessage = (reason: string, extra?: Record) => { + eventDroppedCounter + .labels({ + event_type: 'session_recordings_blob_ingestion', + drop_cause: reason, + }) + .inc() + + status.warn('⚠️', 'invalid_message', { + reason, + partition: message.partition, + offset: message.offset, + ...(extra || {}), + }) + } + + if (!message.value || !message.timestamp) { + // Typing says this can happen but in practice it shouldn't + return dropMessage('message_value_or_timestamp_is_empty') + } + + const headerResult = await readTokenFromHeaders(message.headers, getTeamFn) + const token: string | undefined = headerResult.token + let teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig + + // NB `==` so we're comparing undefined and null + // if token was in the headers but, we could not load team config + // then, we can return early + if (!!token && (teamIdWithConfig == null || teamIdWithConfig.teamId == null)) { + return dropMessage('header_token_present_team_missing_or_disabled', { + token: token, + }) + } + + let messagePayload: RawEventMessage + let event: PipelineEvent + + try { + messagePayload = JSON.parse(message.value.toString()) + event = JSON.parse(messagePayload.data) + } catch (error) { + return dropMessage('invalid_json', { error }) + } + + const { $snapshot_items, $session_id, $window_id } = event.properties || {} + + // NOTE: This is simple validation - ideally we should do proper schema based validation + if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) { + return dropMessage('received_non_snapshot_message') + } + + // TODO this mechanism is deprecated for blobby ingestion, we should remove it + // once we're happy that the new mechanism is working + // if there was not a token in the header then we try to load one from the message payload + if (teamIdWithConfig == null && messagePayload.team_id == null && !messagePayload.token) { + return dropMessage('no_token_in_header_or_payload') + } + + if (teamIdWithConfig == null) { + const token = messagePayload.token + + if (token) { + teamIdWithConfig = await getTeamFn(token) + } + } + + // NB `==` so we're comparing undefined and null + if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { + return dropMessage('token_fallback_team_missing_or_disabled', { + token: messagePayload.token, + teamId: messagePayload.team_id, + payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', + }) + } + // end of deprecated mechanism + + const invalidEvents: any[] = [] + const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { + if (!event || !event.timestamp) { + invalidEvents.push(event) + return false + } + return true + }) + + if (invalidEvents.length) { + captureMessage('[session-manager]: invalid rrweb events filtered out from message', { + extra: { + invalidEvents, + eventsCount: events.length, + invalidEventsCount: invalidEvents.length, + event, + }, + tags: { + team_id: teamIdWithConfig.teamId, + session_id: $session_id, + }, + }) + } + + if (!events.length) { + return dropMessage('message_contained_no_valid_rrweb_events', { + token: messagePayload.token, + teamId: messagePayload.team_id, + }) + } + + return { + metadata: { + partition: message.partition, + topic: message.topic, + offset: message.offset, + timestamp: message.timestamp, + consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled, + }, + + team_id: teamIdWithConfig.teamId, + distinct_id: messagePayload.distinct_id, + session_id: $session_id, + window_id: $window_id, + events: events, + } +} diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index e247f7c8f1dfe..508e85faa7ebf 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -1,6 +1,6 @@ import { randomUUID } from 'crypto' import { mkdirSync, readdirSync, rmSync } from 'node:fs' -import { Message, TopicPartition, TopicPartitionOffset } from 'node-rdkafka' +import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka' import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' @@ -89,7 +89,7 @@ describe('ingester', () => { }) }) - mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], timeout, cb) => { + mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], _timeout, cb) => { const tpos: TopicPartitionOffset[] = topicPartitions.map((tp) => ({ topic: tp.topic, partition: tp.partition, @@ -196,153 +196,6 @@ describe('ingester', () => { }, 10000) }) - describe('parsing the message', () => { - it('can handle numeric distinct_ids', async () => { - const numeric_id = 12345 - - const parsedMessage = await ingester.parseKafkaMessage( - { - value: Buffer.from( - JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - distinct_id: String(numeric_id), - ip: '127.0.0.1', - site_url: 'http://127.0.0.1:8000', - data: JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - event: '$snapshot_items', - properties: { - distinct_id: numeric_id, - $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', - $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', - $snapshot_items: [ - { - type: 6, - data: { - plugin: 'rrweb/console@1', - payload: { - level: 'log', - trace: [ - 'HedgehogActor.setAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105543:17)', - 'HedgehogActor.setRandomAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105550:14)', - 'HedgehogActor.update (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105572:16)', - 'loop (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105754:15)', - ], - payload: ['"Hedgehog: Will \'jump\' for 2916.6666666666665ms"'], - }, - }, - timestamp: 1693422950693, - }, - ], - $snapshot_consumer: 'v2', - }, - offset: 2187, - }), - now: '2023-08-30T19:15:54.887316+00:00', - sent_at: '2023-08-30T19:15:54.882000+00:00', - token: 'the_token', - }) - ), - timestamp: 1, - size: 1, - topic: 'the_topic', - offset: 1, - partition: 1, - } satisfies Message, - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) - ) - expect(parsedMessage).toEqual({ - distinct_id: '12345', - events: expect.any(Array), - metadata: { - offset: 1, - partition: 1, - timestamp: 1, - topic: 'the_topic', - consoleLogIngestionEnabled: false, - }, - session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', - team_id: 1, - window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', - }) - }) - - it('filters out invalid rrweb events', async () => { - const numeric_id = 12345 - - const createMessage = ($snapshot_items: unknown[]) => { - return { - value: Buffer.from( - JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - distinct_id: String(numeric_id), - ip: '127.0.0.1', - site_url: 'http://127.0.0.1:8000', - data: JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - event: '$snapshot_items', - properties: { - distinct_id: numeric_id, - $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', - $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', - $snapshot_items: $snapshot_items, - }, - }), - token: 'the_token', - }) - ), - timestamp: 1, - size: 1, - topic: 'the_topic', - offset: 1, - partition: 1, - } satisfies Message - } - - const parsedMessage = await ingester.parseKafkaMessage( - createMessage([ - { - type: 6, - data: {}, - timestamp: null, - }, - ]), - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) - ) - expect(parsedMessage).toEqual(undefined) - - const parsedMessage2 = await ingester.parseKafkaMessage( - createMessage([ - { - type: 6, - data: {}, - timestamp: null, - }, - { - type: 6, - data: {}, - timestamp: 123, - }, - ]), - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) - ) - expect(parsedMessage2).toMatchObject({ - events: [ - { - data: {}, - timestamp: 123, - type: 6, - }, - ], - }) - - const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () => - Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) - ) - expect(parsedMessage3).toEqual(undefined) - }) - }) - describe('offset committing', () => { it('should commit offsets in simple cases', async () => { await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')]) @@ -687,7 +540,7 @@ describe('ingester', () => { describe('lag reporting', () => { it('should return the latest offsets', async () => { - mockConsumer.queryWatermarkOffsets.mockImplementation((topic, partition, timeout, cb) => { + mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { cb(null, { highOffset: 1000 + partition, lowOffset: 0 }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index 4c6d77cc750d8..44d6fa2c2fb5e 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -1,6 +1,207 @@ -import { getLagMultipler, maxDefined, minDefined } from '../../../../src/main/ingestion-queues/session-recording/utils' +import { Message, MessageHeader } from 'node-rdkafka' + +import { + getLagMultiplier, + maxDefined, + minDefined, + parseKafkaMessage, +} from '../../../../src/main/ingestion-queues/session-recording/utils' describe('session-recording utils', () => { + const validMessage = (distinctId: number | string, headers?: MessageHeader[], value?: Record) => + ({ + headers, + value: Buffer.from( + JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + distinct_id: String(distinctId), + ip: '127.0.0.1', + site_url: 'http://127.0.0.1:8000', + data: JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + event: '$snapshot_items', + properties: { + distinct_id: distinctId, + $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', + $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', + $snapshot_items: [ + { + type: 6, + data: { + plugin: 'rrweb/console@1', + payload: { + level: 'log', + trace: [ + 'HedgehogActor.setAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105543:17)', + 'HedgehogActor.setRandomAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105550:14)', + 'HedgehogActor.update (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105572:16)', + 'loop (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105754:15)', + ], + payload: ['"Hedgehog: Will \'jump\' for 2916.6666666666665ms"'], + }, + }, + timestamp: 1693422950693, + }, + ], + $snapshot_consumer: 'v2', + }, + offset: 2187, + }), + now: '2023-08-30T19:15:54.887316+00:00', + sent_at: '2023-08-30T19:15:54.882000+00:00', + token: 'the_token', + ...value, + }) + ), + timestamp: 1, + size: 1, + topic: 'the_topic', + offset: 1, + partition: 1, + } satisfies Message) + + describe('parsing the message', () => { + it('can handle numeric distinct_ids', async () => { + const numericId = 12345 + const parsedMessage = await parseKafkaMessage(validMessage(numericId), () => + Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + ) + expect(parsedMessage).toEqual({ + distinct_id: String(numericId), + events: expect.any(Array), + metadata: { + offset: 1, + partition: 1, + timestamp: 1, + topic: 'the_topic', + consoleLogIngestionEnabled: false, + }, + session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', + team_id: 1, + window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', + }) + }) + + it('filters out invalid rrweb events', async () => { + const numeric_id = 12345 + + const createMessage = ($snapshot_items: unknown[]) => { + return { + value: Buffer.from( + JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + distinct_id: String(numeric_id), + ip: '127.0.0.1', + site_url: 'http://127.0.0.1:8000', + data: JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + event: '$snapshot_items', + properties: { + distinct_id: numeric_id, + $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', + $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', + $snapshot_items: $snapshot_items, + }, + }), + token: 'the_token', + }) + ), + timestamp: 1, + size: 1, + topic: 'the_topic', + offset: 1, + partition: 1, + } satisfies Message + } + + const parsedMessage = await parseKafkaMessage( + createMessage([ + { + type: 6, + data: {}, + timestamp: null, + }, + ]), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) + ) + expect(parsedMessage).toEqual(undefined) + + const parsedMessage2 = await parseKafkaMessage( + createMessage([ + { + type: 6, + data: {}, + timestamp: null, + }, + { + type: 6, + data: {}, + timestamp: 123, + }, + ]), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) + ) + expect(parsedMessage2).toMatchObject({ + events: [ + { + data: {}, + timestamp: 123, + type: 6, + }, + ], + }) + + const parsedMessage3 = await parseKafkaMessage(createMessage([null]), () => + Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + ) + expect(parsedMessage3).toEqual(undefined) + }) + + describe('team token can be in header or body', () => { + const mockTeamResolver = jest.fn() + + beforeEach(() => { + mockTeamResolver.mockReset() + mockTeamResolver.mockResolvedValue({ teamId: 1, consoleLogIngestionEnabled: false }) + }) + + test.each([ + [ + 'calls the team id resolver once when token is in header, not in the body', + 'the_token', + undefined, + ['the_token'], + ], + [ + 'calls the team id resolver once when token is in header, and in the body', + 'the_token', + 'the body token', + ['the_token'], + ], + [ + 'does not call the team id resolver when token is not in header, and not in body', + undefined, + undefined, + undefined, + ], + [ + 'calls the team id resolver twice when token is not in header, and is in body', + undefined, + 'the body token', + ['the body token'], + ], + ])('%s', async (_name, headerToken, payloadToken, expectedCalls) => { + await parseKafkaMessage( + validMessage(12345, headerToken ? [{ token: Buffer.from(headerToken) }] : undefined, { + token: payloadToken, + }), + mockTeamResolver + ) + expect(mockTeamResolver.mock.calls).toEqual([expectedCalls]) + }) + }) + }) + it('minDefined', () => { expect(minDefined(1, 2, 3)).toEqual(1) expect(minDefined(1, undefined, 3)).toEqual(1) @@ -14,32 +215,32 @@ describe('session-recording utils', () => { expect(maxDefined(undefined, undefined, undefined)).toEqual(undefined) expect(maxDefined()).toEqual(undefined) }) -}) -describe('getLagMultipler', () => { - const threshold = 1000 - it('returns 1 when lag is 0', () => { - expect(getLagMultipler(0, threshold)).toEqual(1) - }) + describe('getLagMultiplier', () => { + const threshold = 1000 + it('returns 1 when lag is 0', () => { + expect(getLagMultiplier(0, threshold)).toEqual(1) + }) - it('returns 1 when lag is under threshold', () => { - expect(getLagMultipler(threshold - 1, threshold)).toEqual(1) - }) + it('returns 1 when lag is under threshold', () => { + expect(getLagMultiplier(threshold - 1, threshold)).toEqual(1) + }) - it('returns 0.9 when lag is double threshold', () => { - expect(getLagMultipler(threshold * 2, threshold)).toEqual(0.9) - }) + it('returns 0.9 when lag is double threshold', () => { + expect(getLagMultiplier(threshold * 2, threshold)).toEqual(0.9) + }) - it('returns 0.6 when lag is 5 times the threshold', () => { - expect(getLagMultipler(threshold * 5, threshold)).toEqual(0.6) - }) + it('returns 0.6 when lag is 5 times the threshold', () => { + expect(getLagMultiplier(threshold * 5, threshold)).toEqual(0.6) + }) - it('returns 0.9 when lag is 9 times the threshold', () => { - expect(getLagMultipler(threshold * 9, threshold)).toBeGreaterThanOrEqual(0.19) - expect(getLagMultipler(threshold * 9, threshold)).toBeLessThanOrEqual(0.2) - }) + it('returns 0.9 when lag is 9 times the threshold', () => { + expect(getLagMultiplier(threshold * 9, threshold)).toBeGreaterThanOrEqual(0.19) + expect(getLagMultiplier(threshold * 9, threshold)).toBeLessThanOrEqual(0.2) + }) - it('returns 0.1 when lag is 100 times the threshold', () => { - expect(getLagMultipler(threshold * 100, threshold)).toEqual(0.1) + it('returns 0.1 when lag is 100 times the threshold', () => { + expect(getLagMultiplier(threshold * 100, threshold)).toEqual(0.1) + }) }) }) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 98c52d1781380..a7d72f9ca1f3e 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -155,7 +155,9 @@ def _kafka_topic(event_name: str, data: Dict) -> str: return settings.KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC -def log_event(data: Dict, event_name: str, partition_key: Optional[str]): +def log_event( + data: Dict, event_name: str, partition_key: Optional[str], headers: Optional[List] = None +) -> FutureRecordMetadata: kafka_topic = _kafka_topic(event_name, data) logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic) @@ -167,7 +169,7 @@ def log_event(data: Dict, event_name: str, partition_key: Optional[str]): else: producer = KafkaProducer() - future = producer.produce(topic=kafka_topic, data=data, key=partition_key) + future = producer.produce(topic=kafka_topic, data=data, key=partition_key, headers=headers) statsd.incr("posthog_cloud_plugin_server_ingestion") return future except Exception as e: @@ -558,7 +560,10 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid= if event["event"] in SESSION_RECORDING_EVENT_NAMES: kafka_partition_key = event["properties"]["$session_id"] - return log_event(parsed_event, event["event"], partition_key=kafka_partition_key) + headers = [ + ("token", token), + ] + return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers) candidate_partition_key = f"{token}:{distinct_id}" diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index 206638c65e4b9..e8c0b7d444e0d 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -1,6 +1,6 @@ # name: TestCohort.test_async_deletion_of_cohort ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -10,7 +10,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.1 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -83,7 +83,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.2 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -93,7 +93,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.3 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 @@ -103,7 +103,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.4 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -113,7 +113,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.5 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -147,7 +147,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.6 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -157,7 +157,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.7 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 9e68b72072e5e..7d39039b9fdfd 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -292,7 +292,9 @@ def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produ HTTP_ORIGIN="https://localhost", ) - kafka_produce.assert_called_with(topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=ANY, key=None) + kafka_produce.assert_called_with( + topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=ANY, key=None, headers=None + ) def test_cached_is_randomly_partitioned(self): """Assert the behavior of is_randomly_partitioned under certain cache settings. @@ -1592,6 +1594,15 @@ def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produ assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}) + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_produce: MagicMock) -> None: + with self.settings( + SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, + ): + self._send_august_2023_version_session_recording_event() + + assert kafka_produce.mock_calls[0].kwargs["headers"] == [("token", "token123")] + @patch("posthog.kafka_client.client.SessionRecordingKafkaProducer") def test_create_session_recording_kafka_with_expected_hosts( self,