From cf4ba98a7e0f9641906a3398200582570f820b60 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Wed, 8 Nov 2023 14:59:18 +0000 Subject: [PATCH] feat: read team token from kafka headers in blobby (#18337) Recently we read tens of millions of messages knowing the only action was to drop them. For every message we had to parse the body to get the token so we could check if we wanted to drop it. Let's put the team token into the kafka headers since if there's an incident we always care about those values. And then we can read the team token from the header and drop the message without parsing it when the team does not have session recording enabled. This still also reads the body to check for team token as a fallback. If nothing else this lets us deploy this without worrying about rolling out a switcheroo So token in headers, team not found - return early (new behaviour) token in headers, team found - does parse the body, but doesn't check the team again token not in headers, team not found - return early (as before) token not in headers, team found - (as before) --- .../services/session-manager.ts | 4 +- .../session-recordings-consumer.ts | 123 +-------- .../session-recording/utils.ts | 158 ++++++++++- .../session-recordings-consumer.test.ts | 153 +---------- .../session-recording/utils.test.ts | 245 ++++++++++++++++-- posthog/api/capture.py | 11 +- .../api/test/__snapshots__/test_cohort.ambr | 16 +- posthog/api/test/test_capture.py | 13 +- 8 files changed, 416 insertions(+), 307 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts index 2a384b86c23bd..c0869e773ca78 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts @@ -17,7 +17,7 @@ import { status } from '../../../../utils/status' import { asyncTimeoutGuard } from '../../../../utils/timing' import { ObjectStorage } from '../../../services/object_storage' import { IncomingRecordingMessage } from '../types' -import { bufferFileDir, convertToPersistedMessage, getLagMultipler, maxDefined, minDefined, now } from '../utils' +import { bufferFileDir, convertToPersistedMessage, getLagMultiplier, maxDefined, minDefined, now } from '../utils' import { OffsetHighWaterMarker } from './offset-high-water-marker' import { RealtimeManager } from './realtime-manager' @@ -217,7 +217,7 @@ export class SessionManager { return } - const lagMultiplier = getLagMultipler(partitionLag) + const lagMultiplier = getLagMultiplier(partitionLag) const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000 const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 530814010efb9..470827f569422 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -1,4 +1,4 @@ -import { captureException, captureMessage } from '@sentry/node' +import { captureException } from '@sentry/node' import { mkdirSync, rmSync } from 'node:fs' import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' import { Counter, Gauge, Histogram } from 'prom-client' @@ -7,7 +7,7 @@ import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' -import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types' +import { PluginsServerConfig, RedisPool, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' @@ -23,7 +23,7 @@ import { RealtimeManager } from './services/realtime-manager' import { ReplayEventsIngester } from './services/replay-events-ingester' import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager' import { IncomingRecordingMessage } from './types' -import { bufferFileDir, getPartitionsForTopic, now, queryWatermarkOffsets } from './utils' +import { bufferFileDir, getPartitionsForTopic, now, parseKafkaMessage, queryWatermarkOffsets } from './utils' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') @@ -251,119 +251,6 @@ export class SessionRecordingIngester { await this.sessions[key]?.add(event) } - public async parseKafkaMessage( - message: Message, - getTeamFn: (s: string) => Promise - ): Promise { - const statusWarn = (reason: string, extra?: Record) => { - status.warn('⚠️', 'invalid_message', { - reason, - partition: message.partition, - offset: message.offset, - ...(extra || {}), - }) - } - - if (!message.value || !message.timestamp) { - // Typing says this can happen but in practice it shouldn't - return statusWarn('message value or timestamp is empty') - } - - let messagePayload: RawEventMessage - let event: PipelineEvent - - try { - messagePayload = JSON.parse(message.value.toString()) - event = JSON.parse(messagePayload.data) - } catch (error) { - return statusWarn('invalid_json', { error }) - } - - const { $snapshot_items, $session_id, $window_id } = event.properties || {} - - // NOTE: This is simple validation - ideally we should do proper schema based validation - if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) { - status.warn('🙈', 'Received non-snapshot message, ignoring') - return - } - - if (messagePayload.team_id == null && !messagePayload.token) { - return statusWarn('no_token') - } - - let teamIdWithConfig: TeamIDWithConfig | null = null - const token = messagePayload.token - - if (token) { - teamIdWithConfig = await getTeamFn(token) - } - - // NB `==` so we're comparing undefined and null - if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { - eventDroppedCounter - .labels({ - event_type: 'session_recordings_blob_ingestion', - drop_cause: 'team_missing_or_disabled', - }) - .inc() - - return statusWarn('team_missing_or_disabled', { - token: messagePayload.token, - teamId: messagePayload.team_id, - payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', - }) - } - - const invalidEvents: any[] = [] - const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { - if (!event || !event.timestamp) { - invalidEvents.push(event) - return false - } - return true - }) - - if (invalidEvents.length) { - captureMessage('[session-manager]: invalid rrweb events filtered out from message', { - extra: { - invalidEvents, - eventsCount: events.length, - invalidEventsCount: invalidEvents.length, - event, - }, - tags: { - team_id: teamIdWithConfig.teamId, - session_id: $session_id, - }, - }) - } - - if (!events.length) { - status.warn('🙈', 'Event contained no valid rrweb events, ignoring') - - return statusWarn('invalid_rrweb_events', { - token: messagePayload.token, - teamId: messagePayload.team_id, - }) - } - - return { - metadata: { - partition: message.partition, - topic: message.topic, - offset: message.offset, - timestamp: message.timestamp, - consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled, - }, - - team_id: teamIdWithConfig.teamId, - distinct_id: messagePayload.distinct_id, - session_id: $session_id, - window_id: $window_id, - events: events, - } - } - public async handleEachBatch(messages: Message[]): Promise { status.info('🔁', `blob_ingester_consumer - handling batch`, { size: messages.length, @@ -395,7 +282,7 @@ export class SessionRecordingIngester { counterKafkaMessageReceived.inc({ partition }) - const recordingMessage = await this.parseKafkaMessage(message, (token) => + const recordingMessage = await parseKafkaMessage(message, (token) => this.teamsRefresher.get().then((teams) => ({ teamId: teams[token]?.teamId || null, consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, @@ -646,7 +533,7 @@ export class SessionRecordingIngester { partitionsToDrop[partition] = this.partitionMetrics[partition] ?? {} delete this.partitionMetrics[partition] - // Revoke the high water mark for this partition so we are essentially "reset" + // Revoke the high watermark for this partition, so we are essentially "reset" this.sessionHighWaterMarker.revoke(topicPartition) this.persistentHighWaterMarker.revoke(topicPartition) }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index a13fed116b5bf..64476d57c3cee 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -1,10 +1,13 @@ -import { captureException } from '@sentry/node' +import { captureException, captureMessage } from '@sentry/node' import { DateTime } from 'luxon' -import { KafkaConsumer, PartitionMetadata, TopicPartition } from 'node-rdkafka' +import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka' import path from 'path' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' +import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types' import { status } from '../../../utils/status' +import { eventDroppedCounter } from '../metrics' +import { TeamIDWithConfig } from './session-recordings-consumer' import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => { @@ -102,10 +105,159 @@ export const getPartitionsForTopic = ( }) } -export const getLagMultipler = (lag: number, threshold = 1000000) => { +export const getLagMultiplier = (lag: number, threshold = 1000000) => { if (lag < threshold) { return 1 } return Math.max(0.1, 1 - (lag - threshold) / (threshold * 10)) } + +export async function readTokenFromHeaders( + headers: MessageHeader[] | undefined, + getTeamFn: (s: string) => Promise +) { + const tokenHeader = headers?.find((header: MessageHeader) => { + // each header in the array is an object of key to value + // because it's possible to have multiple headers with the same key + // but, we don't support that. the first truthy match we find is the one we use + return header.token + })?.token + + const token = typeof tokenHeader === 'string' ? tokenHeader : tokenHeader?.toString() + + let teamIdWithConfig: TeamIDWithConfig | null = null + + if (token) { + teamIdWithConfig = await getTeamFn(token) + } + return { token, teamIdWithConfig } +} + +export const parseKafkaMessage = async ( + message: Message, + getTeamFn: (s: string) => Promise +): Promise => { + const dropMessage = (reason: string, extra?: Record) => { + eventDroppedCounter + .labels({ + event_type: 'session_recordings_blob_ingestion', + drop_cause: reason, + }) + .inc() + + status.warn('⚠️', 'invalid_message', { + reason, + partition: message.partition, + offset: message.offset, + ...(extra || {}), + }) + } + + if (!message.value || !message.timestamp) { + // Typing says this can happen but in practice it shouldn't + return dropMessage('message_value_or_timestamp_is_empty') + } + + const headerResult = await readTokenFromHeaders(message.headers, getTeamFn) + const token: string | undefined = headerResult.token + let teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig + + // NB `==` so we're comparing undefined and null + // if token was in the headers but, we could not load team config + // then, we can return early + if (!!token && (teamIdWithConfig == null || teamIdWithConfig.teamId == null)) { + return dropMessage('header_token_present_team_missing_or_disabled', { + token: token, + }) + } + + let messagePayload: RawEventMessage + let event: PipelineEvent + + try { + messagePayload = JSON.parse(message.value.toString()) + event = JSON.parse(messagePayload.data) + } catch (error) { + return dropMessage('invalid_json', { error }) + } + + const { $snapshot_items, $session_id, $window_id } = event.properties || {} + + // NOTE: This is simple validation - ideally we should do proper schema based validation + if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) { + return dropMessage('received_non_snapshot_message') + } + + // TODO this mechanism is deprecated for blobby ingestion, we should remove it + // once we're happy that the new mechanism is working + // if there was not a token in the header then we try to load one from the message payload + if (teamIdWithConfig == null && messagePayload.team_id == null && !messagePayload.token) { + return dropMessage('no_token_in_header_or_payload') + } + + if (teamIdWithConfig == null) { + const token = messagePayload.token + + if (token) { + teamIdWithConfig = await getTeamFn(token) + } + } + + // NB `==` so we're comparing undefined and null + if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { + return dropMessage('token_fallback_team_missing_or_disabled', { + token: messagePayload.token, + teamId: messagePayload.team_id, + payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', + }) + } + // end of deprecated mechanism + + const invalidEvents: any[] = [] + const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { + if (!event || !event.timestamp) { + invalidEvents.push(event) + return false + } + return true + }) + + if (invalidEvents.length) { + captureMessage('[session-manager]: invalid rrweb events filtered out from message', { + extra: { + invalidEvents, + eventsCount: events.length, + invalidEventsCount: invalidEvents.length, + event, + }, + tags: { + team_id: teamIdWithConfig.teamId, + session_id: $session_id, + }, + }) + } + + if (!events.length) { + return dropMessage('message_contained_no_valid_rrweb_events', { + token: messagePayload.token, + teamId: messagePayload.team_id, + }) + } + + return { + metadata: { + partition: message.partition, + topic: message.topic, + offset: message.offset, + timestamp: message.timestamp, + consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled, + }, + + team_id: teamIdWithConfig.teamId, + distinct_id: messagePayload.distinct_id, + session_id: $session_id, + window_id: $window_id, + events: events, + } +} diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index e247f7c8f1dfe..508e85faa7ebf 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -1,6 +1,6 @@ import { randomUUID } from 'crypto' import { mkdirSync, readdirSync, rmSync } from 'node:fs' -import { Message, TopicPartition, TopicPartitionOffset } from 'node-rdkafka' +import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka' import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' @@ -89,7 +89,7 @@ describe('ingester', () => { }) }) - mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], timeout, cb) => { + mockConsumer.committed.mockImplementation((topicPartitions: TopicPartition[], _timeout, cb) => { const tpos: TopicPartitionOffset[] = topicPartitions.map((tp) => ({ topic: tp.topic, partition: tp.partition, @@ -196,153 +196,6 @@ describe('ingester', () => { }, 10000) }) - describe('parsing the message', () => { - it('can handle numeric distinct_ids', async () => { - const numeric_id = 12345 - - const parsedMessage = await ingester.parseKafkaMessage( - { - value: Buffer.from( - JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - distinct_id: String(numeric_id), - ip: '127.0.0.1', - site_url: 'http://127.0.0.1:8000', - data: JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - event: '$snapshot_items', - properties: { - distinct_id: numeric_id, - $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', - $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', - $snapshot_items: [ - { - type: 6, - data: { - plugin: 'rrweb/console@1', - payload: { - level: 'log', - trace: [ - 'HedgehogActor.setAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105543:17)', - 'HedgehogActor.setRandomAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105550:14)', - 'HedgehogActor.update (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105572:16)', - 'loop (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105754:15)', - ], - payload: ['"Hedgehog: Will \'jump\' for 2916.6666666666665ms"'], - }, - }, - timestamp: 1693422950693, - }, - ], - $snapshot_consumer: 'v2', - }, - offset: 2187, - }), - now: '2023-08-30T19:15:54.887316+00:00', - sent_at: '2023-08-30T19:15:54.882000+00:00', - token: 'the_token', - }) - ), - timestamp: 1, - size: 1, - topic: 'the_topic', - offset: 1, - partition: 1, - } satisfies Message, - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) - ) - expect(parsedMessage).toEqual({ - distinct_id: '12345', - events: expect.any(Array), - metadata: { - offset: 1, - partition: 1, - timestamp: 1, - topic: 'the_topic', - consoleLogIngestionEnabled: false, - }, - session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', - team_id: 1, - window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', - }) - }) - - it('filters out invalid rrweb events', async () => { - const numeric_id = 12345 - - const createMessage = ($snapshot_items: unknown[]) => { - return { - value: Buffer.from( - JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - distinct_id: String(numeric_id), - ip: '127.0.0.1', - site_url: 'http://127.0.0.1:8000', - data: JSON.stringify({ - uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', - event: '$snapshot_items', - properties: { - distinct_id: numeric_id, - $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', - $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', - $snapshot_items: $snapshot_items, - }, - }), - token: 'the_token', - }) - ), - timestamp: 1, - size: 1, - topic: 'the_topic', - offset: 1, - partition: 1, - } satisfies Message - } - - const parsedMessage = await ingester.parseKafkaMessage( - createMessage([ - { - type: 6, - data: {}, - timestamp: null, - }, - ]), - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) - ) - expect(parsedMessage).toEqual(undefined) - - const parsedMessage2 = await ingester.parseKafkaMessage( - createMessage([ - { - type: 6, - data: {}, - timestamp: null, - }, - { - type: 6, - data: {}, - timestamp: 123, - }, - ]), - () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) - ) - expect(parsedMessage2).toMatchObject({ - events: [ - { - data: {}, - timestamp: 123, - type: 6, - }, - ], - }) - - const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () => - Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) - ) - expect(parsedMessage3).toEqual(undefined) - }) - }) - describe('offset committing', () => { it('should commit offsets in simple cases', async () => { await ingester.handleEachBatch([createMessage('sid1'), createMessage('sid1')]) @@ -687,7 +540,7 @@ describe('ingester', () => { describe('lag reporting', () => { it('should return the latest offsets', async () => { - mockConsumer.queryWatermarkOffsets.mockImplementation((topic, partition, timeout, cb) => { + mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => { cb(null, { highOffset: 1000 + partition, lowOffset: 0 }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index 4c6d77cc750d8..44d6fa2c2fb5e 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -1,6 +1,207 @@ -import { getLagMultipler, maxDefined, minDefined } from '../../../../src/main/ingestion-queues/session-recording/utils' +import { Message, MessageHeader } from 'node-rdkafka' + +import { + getLagMultiplier, + maxDefined, + minDefined, + parseKafkaMessage, +} from '../../../../src/main/ingestion-queues/session-recording/utils' describe('session-recording utils', () => { + const validMessage = (distinctId: number | string, headers?: MessageHeader[], value?: Record) => + ({ + headers, + value: Buffer.from( + JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + distinct_id: String(distinctId), + ip: '127.0.0.1', + site_url: 'http://127.0.0.1:8000', + data: JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + event: '$snapshot_items', + properties: { + distinct_id: distinctId, + $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', + $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', + $snapshot_items: [ + { + type: 6, + data: { + plugin: 'rrweb/console@1', + payload: { + level: 'log', + trace: [ + 'HedgehogActor.setAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105543:17)', + 'HedgehogActor.setRandomAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105550:14)', + 'HedgehogActor.update (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105572:16)', + 'loop (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105754:15)', + ], + payload: ['"Hedgehog: Will \'jump\' for 2916.6666666666665ms"'], + }, + }, + timestamp: 1693422950693, + }, + ], + $snapshot_consumer: 'v2', + }, + offset: 2187, + }), + now: '2023-08-30T19:15:54.887316+00:00', + sent_at: '2023-08-30T19:15:54.882000+00:00', + token: 'the_token', + ...value, + }) + ), + timestamp: 1, + size: 1, + topic: 'the_topic', + offset: 1, + partition: 1, + } satisfies Message) + + describe('parsing the message', () => { + it('can handle numeric distinct_ids', async () => { + const numericId = 12345 + const parsedMessage = await parseKafkaMessage(validMessage(numericId), () => + Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + ) + expect(parsedMessage).toEqual({ + distinct_id: String(numericId), + events: expect.any(Array), + metadata: { + offset: 1, + partition: 1, + timestamp: 1, + topic: 'the_topic', + consoleLogIngestionEnabled: false, + }, + session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', + team_id: 1, + window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', + }) + }) + + it('filters out invalid rrweb events', async () => { + const numeric_id = 12345 + + const createMessage = ($snapshot_items: unknown[]) => { + return { + value: Buffer.from( + JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + distinct_id: String(numeric_id), + ip: '127.0.0.1', + site_url: 'http://127.0.0.1:8000', + data: JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + event: '$snapshot_items', + properties: { + distinct_id: numeric_id, + $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', + $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', + $snapshot_items: $snapshot_items, + }, + }), + token: 'the_token', + }) + ), + timestamp: 1, + size: 1, + topic: 'the_topic', + offset: 1, + partition: 1, + } satisfies Message + } + + const parsedMessage = await parseKafkaMessage( + createMessage([ + { + type: 6, + data: {}, + timestamp: null, + }, + ]), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) + ) + expect(parsedMessage).toEqual(undefined) + + const parsedMessage2 = await parseKafkaMessage( + createMessage([ + { + type: 6, + data: {}, + timestamp: null, + }, + { + type: 6, + data: {}, + timestamp: 123, + }, + ]), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) + ) + expect(parsedMessage2).toMatchObject({ + events: [ + { + data: {}, + timestamp: 123, + type: 6, + }, + ], + }) + + const parsedMessage3 = await parseKafkaMessage(createMessage([null]), () => + Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + ) + expect(parsedMessage3).toEqual(undefined) + }) + + describe('team token can be in header or body', () => { + const mockTeamResolver = jest.fn() + + beforeEach(() => { + mockTeamResolver.mockReset() + mockTeamResolver.mockResolvedValue({ teamId: 1, consoleLogIngestionEnabled: false }) + }) + + test.each([ + [ + 'calls the team id resolver once when token is in header, not in the body', + 'the_token', + undefined, + ['the_token'], + ], + [ + 'calls the team id resolver once when token is in header, and in the body', + 'the_token', + 'the body token', + ['the_token'], + ], + [ + 'does not call the team id resolver when token is not in header, and not in body', + undefined, + undefined, + undefined, + ], + [ + 'calls the team id resolver twice when token is not in header, and is in body', + undefined, + 'the body token', + ['the body token'], + ], + ])('%s', async (_name, headerToken, payloadToken, expectedCalls) => { + await parseKafkaMessage( + validMessage(12345, headerToken ? [{ token: Buffer.from(headerToken) }] : undefined, { + token: payloadToken, + }), + mockTeamResolver + ) + expect(mockTeamResolver.mock.calls).toEqual([expectedCalls]) + }) + }) + }) + it('minDefined', () => { expect(minDefined(1, 2, 3)).toEqual(1) expect(minDefined(1, undefined, 3)).toEqual(1) @@ -14,32 +215,32 @@ describe('session-recording utils', () => { expect(maxDefined(undefined, undefined, undefined)).toEqual(undefined) expect(maxDefined()).toEqual(undefined) }) -}) -describe('getLagMultipler', () => { - const threshold = 1000 - it('returns 1 when lag is 0', () => { - expect(getLagMultipler(0, threshold)).toEqual(1) - }) + describe('getLagMultiplier', () => { + const threshold = 1000 + it('returns 1 when lag is 0', () => { + expect(getLagMultiplier(0, threshold)).toEqual(1) + }) - it('returns 1 when lag is under threshold', () => { - expect(getLagMultipler(threshold - 1, threshold)).toEqual(1) - }) + it('returns 1 when lag is under threshold', () => { + expect(getLagMultiplier(threshold - 1, threshold)).toEqual(1) + }) - it('returns 0.9 when lag is double threshold', () => { - expect(getLagMultipler(threshold * 2, threshold)).toEqual(0.9) - }) + it('returns 0.9 when lag is double threshold', () => { + expect(getLagMultiplier(threshold * 2, threshold)).toEqual(0.9) + }) - it('returns 0.6 when lag is 5 times the threshold', () => { - expect(getLagMultipler(threshold * 5, threshold)).toEqual(0.6) - }) + it('returns 0.6 when lag is 5 times the threshold', () => { + expect(getLagMultiplier(threshold * 5, threshold)).toEqual(0.6) + }) - it('returns 0.9 when lag is 9 times the threshold', () => { - expect(getLagMultipler(threshold * 9, threshold)).toBeGreaterThanOrEqual(0.19) - expect(getLagMultipler(threshold * 9, threshold)).toBeLessThanOrEqual(0.2) - }) + it('returns 0.9 when lag is 9 times the threshold', () => { + expect(getLagMultiplier(threshold * 9, threshold)).toBeGreaterThanOrEqual(0.19) + expect(getLagMultiplier(threshold * 9, threshold)).toBeLessThanOrEqual(0.2) + }) - it('returns 0.1 when lag is 100 times the threshold', () => { - expect(getLagMultipler(threshold * 100, threshold)).toEqual(0.1) + it('returns 0.1 when lag is 100 times the threshold', () => { + expect(getLagMultiplier(threshold * 100, threshold)).toEqual(0.1) + }) }) }) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 98c52d1781380..a7d72f9ca1f3e 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -155,7 +155,9 @@ def _kafka_topic(event_name: str, data: Dict) -> str: return settings.KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC -def log_event(data: Dict, event_name: str, partition_key: Optional[str]): +def log_event( + data: Dict, event_name: str, partition_key: Optional[str], headers: Optional[List] = None +) -> FutureRecordMetadata: kafka_topic = _kafka_topic(event_name, data) logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic) @@ -167,7 +169,7 @@ def log_event(data: Dict, event_name: str, partition_key: Optional[str]): else: producer = KafkaProducer() - future = producer.produce(topic=kafka_topic, data=data, key=partition_key) + future = producer.produce(topic=kafka_topic, data=data, key=partition_key, headers=headers) statsd.incr("posthog_cloud_plugin_server_ingestion") return future except Exception as e: @@ -558,7 +560,10 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid= if event["event"] in SESSION_RECORDING_EVENT_NAMES: kafka_partition_key = event["properties"]["$session_id"] - return log_event(parsed_event, event["event"], partition_key=kafka_partition_key) + headers = [ + ("token", token), + ] + return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers) candidate_partition_key = f"{token}:{distinct_id}" diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index 206638c65e4b9..e8c0b7d444e0d 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -1,6 +1,6 @@ # name: TestCohort.test_async_deletion_of_cohort ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -10,7 +10,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.1 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -83,7 +83,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.2 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -93,7 +93,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.3 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 @@ -103,7 +103,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.4 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -113,7 +113,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.5 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -147,7 +147,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.6 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -157,7 +157,7 @@ --- # name: TestCohort.test_async_deletion_of_cohort.7 ' - /* user_id:118 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:119 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 9e68b72072e5e..7d39039b9fdfd 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -292,7 +292,9 @@ def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produ HTTP_ORIGIN="https://localhost", ) - kafka_produce.assert_called_with(topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=ANY, key=None) + kafka_produce.assert_called_with( + topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=ANY, key=None, headers=None + ) def test_cached_is_randomly_partitioned(self): """Assert the behavior of is_randomly_partitioned under certain cache settings. @@ -1592,6 +1594,15 @@ def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produ assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}) + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_produce: MagicMock) -> None: + with self.settings( + SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, + ): + self._send_august_2023_version_session_recording_event() + + assert kafka_produce.mock_calls[0].kwargs["headers"] == [("token", "token123")] + @patch("posthog.kafka_client.client.SessionRecordingKafkaProducer") def test_create_session_recording_kafka_with_expected_hosts( self,