diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index 95b323a1fcdb1..6aab2a1988835 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -9,7 +9,7 @@ import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handl import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' import { PluginsServerConfig } from '../../../../types' import { status } from '../../../../utils/status' -import { ConsoleLogEntry, gatherConsoleLogEvents } from '../../../../worker/ingestion/process-event' +import { ConsoleLogEntry, gatherConsoleLogEvents, RRWebEventType } from '../../../../worker/ingestion/process-event' import { eventDroppedCounter } from '../../metrics' import { IncomingRecordingMessage } from '../types' import { OffsetHighWaterMarker } from './offset-high-water-marker' @@ -30,9 +30,10 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons const deduped: ConsoleLogEntry[] = [] for (const cle of consoleLogEntries) { - if (!seen.has(cle.message)) { + const fingerPrint = `${cle.log_level}-${cle.message}` + if (!seen.has(fingerPrint)) { deduped.push(cle) - seen.add(`${cle.log_level}-${cle.message}`) + seen.add(fingerPrint) } } return deduped @@ -43,6 +44,7 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons export class ConsoleLogsIngester { producer?: RdKafkaProducer enabled: boolean + constructor( private readonly serverConfig: PluginsServerConfig, private readonly persistentHighWaterMarker: OffsetHighWaterMarker @@ -83,9 +85,9 @@ export class ConsoleLogsIngester { status.error('🔁', '[console-log-events-ingester] main_loop_error', { error }) if (error?.isRetriable) { - // We assume the if the error is retriable, then we + // We assume that if the error is retriable, then we // are probably in a state where e.g. Kafka is down - // temporarily and we would rather simply throw and + // temporarily, and we would rather simply throw and // have the process restarted. throw error } @@ -140,11 +142,26 @@ export class ConsoleLogsIngester { return drop('high_water_mark') } + // cheapest possible check for any console logs to avoid parsing the events because... + const hasAnyConsoleLogs = event.events.some( + (e) => !!e && e.type === RRWebEventType.Plugin && e.data?.plugin === 'rrweb/console@1' + ) + + if (!hasAnyConsoleLogs) { + return + } + + // ... we don't want to mark events with no console logs as dropped + // this keeps the signal here clean and makes it easier to debug + // when we disable a team's console log ingestion + if (!event.metadata.consoleLogIngestionEnabled) { + return drop('console_log_ingestion_disabled') + } + try { const consoleLogEvents = deduplicateConsoleLogEvents( gatherConsoleLogEvents(event.team_id, event.session_id, event.events) ) - consoleLogEventsCounter.inc(consoleLogEvents.length) return consoleLogEvents.map((cle: ConsoleLogEntry) => diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 8af391b834908..ec2d32e878a90 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -8,7 +8,6 @@ import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' -import { runInstrumentedFunction } from '../../../main/utils' import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' @@ -16,6 +15,7 @@ import { status } from '../../../utils/status' import { createRedisPool } from '../../../utils/utils' import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' import { ObjectStorage } from '../../services/object_storage' +import { runInstrumentedFunction } from '../../utils' import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' import { eventDroppedCounter } from '../metrics' import { ConsoleLogsIngester } from './services/console-logs-ingester' @@ -95,6 +95,11 @@ type PartitionMetrics = { lastKnownCommit?: number } +export interface TeamIDWithConfig { + teamId: TeamId | null + consoleLogIngestionEnabled: boolean +} + export class SessionRecordingIngester { redisPool: RedisPool sessions: Record = {} @@ -107,7 +112,7 @@ export class SessionRecordingIngester { batchConsumer?: BatchConsumer partitionAssignments: Record = {} partitionLockInterval: NodeJS.Timer | null = null - teamsRefresher: BackgroundRefresher> + teamsRefresher: BackgroundRefresher> offsetsRefresher: BackgroundRefresher> config: PluginsServerConfig topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS @@ -120,7 +125,7 @@ export class SessionRecordingIngester { private objectStorage: ObjectStorage ) { // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis - // We stil connect to some of the non-dedicated resources such as postgres or the Replay events kafka. + // We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. this.config = sessionRecordingConsumerConfig(globalServerConfig) this.redisPool = createRedisPool(this.config) @@ -198,7 +203,7 @@ export class SessionRecordingIngester { op: 'checkHighWaterMark', }) - // Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking) + // Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking) if ( await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, KAFKA_CONSUMER_GROUP_ID, offset) ) { @@ -228,7 +233,7 @@ export class SessionRecordingIngester { if (!this.sessions[key]) { const { partition, topic } = event.metadata - const sessionManager = new SessionManager( + this.sessions[key] = new SessionManager( this.config, this.objectStorage.s3, this.realtimeManager, @@ -238,8 +243,6 @@ export class SessionRecordingIngester { partition, topic ) - - this.sessions[key] = sessionManager } await this.sessions[key]?.add(event) @@ -250,7 +253,7 @@ export class SessionRecordingIngester { public async parseKafkaMessage( message: Message, - getTeamFn: (s: string) => Promise + getTeamFn: (s: string) => Promise ): Promise { const statusWarn = (reason: string, extra?: Record) => { status.warn('⚠️', 'invalid_message', { @@ -288,14 +291,15 @@ export class SessionRecordingIngester { return statusWarn('no_token') } - let teamId: TeamId | null = null + let teamIdWithConfig: TeamIDWithConfig | null = null const token = messagePayload.token if (token) { - teamId = await getTeamFn(token) + teamIdWithConfig = await getTeamFn(token) } - if (teamId == null) { + // NB `==` so we're comparing undefined and null + if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', @@ -328,7 +332,7 @@ export class SessionRecordingIngester { event, }, tags: { - team_id: teamId, + team_id: teamIdWithConfig.teamId, session_id: $session_id, }, }) @@ -343,22 +347,21 @@ export class SessionRecordingIngester { }) } - const recordingMessage: IncomingRecordingMessage = { + return { metadata: { partition: message.partition, topic: message.topic, offset: message.offset, timestamp: message.timestamp, + consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled, }, - team_id: teamId, + team_id: teamIdWithConfig.teamId, distinct_id: messagePayload.distinct_id, session_id: $session_id, window_id: $window_id, events: events, } - - return recordingMessage } public async handleEachBatch(messages: Message[]): Promise { @@ -408,7 +411,10 @@ export class SessionRecordingIngester { } const recordingMessage = await this.parseKafkaMessage(message, (token) => - this.teamsRefresher.get().then((teams) => teams[token] || null) + this.teamsRefresher.get().then((teams) => ({ + teamId: teams[token]?.teamId || null, + consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, + })) ) if (recordingMessage) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/types.ts b/plugin-server/src/main/ingestion-queues/session-recording/types.ts index eb6df15214ef2..78082160db794 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/types.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/types.ts @@ -7,6 +7,7 @@ import { RRWebEvent } from '../../../types' export type IncomingRecordingMessage = { metadata: TopicPartitionOffset & { timestamp: number + consoleLogIngestionEnabled: boolean } team_id: number diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 1517b3ebebac3..92b4e3f7b4d26 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -336,7 +336,7 @@ function safeString(payload: (string | null)[]) { .join(' ') } -enum RRWebEventType { +export enum RRWebEventType { DomContentLoaded = 0, Load = 1, FullSnapshot = 2, diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index 8adb0cac0d610..bd1dec1cd9ddf 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -3,6 +3,7 @@ import { StatsD } from 'hot-shots' import LRU from 'lru-cache' import { ONE_MINUTE } from '../../config/constants' +import { TeamIDWithConfig } from '../../main/ingestion-queues/session-recording/session-recordings-consumer' import { PipelineEvent, PluginsServerConfig, Team, TeamId } from '../../types' import { PostgresRouter, PostgresUse } from '../../utils/db/postgres' import { timeoutGuard } from '../../utils/db/utils' @@ -192,11 +193,11 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P return selectResult.rows[0] ?? null } -export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise> { - const selectResult = await client.query>( +export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise> { + const selectResult = await client.query<{ capture_console_log_opt_in: boolean } & Pick>( PostgresUse.COMMON_READ, ` - SELECT id, api_token + SELECT id, api_token, capture_console_log_opt_in FROM posthog_team WHERE session_recording_opt_in = true `, @@ -205,7 +206,7 @@ export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Pro ) return selectResult.rows.reduce((acc, row) => { - acc[row.api_token] = row.id + acc[row.api_token] = { teamId: row.id, consoleLogIngestionEnabled: row.capture_console_log_opt_in } return acc - }, {} as Record) + }, {} as Record) } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts new file mode 100644 index 0000000000000..c2dd99fb7a157 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts @@ -0,0 +1,209 @@ +import { HighLevelProducer } from 'node-rdkafka' + +import { defaultConfig } from '../../../../../src/config/config' +import { createKafkaProducer, produce } from '../../../../../src/kafka/producer' +import { ConsoleLogsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/console-logs-ingester' +import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker' +import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types' +import { PluginsServerConfig } from '../../../../../src/types' +import { status } from '../../../../../src/utils/status' + +jest.mock('../../../../../src/utils/status') +jest.mock('../../../../../src/kafka/producer') + +const makeIncomingMessage = ( + data: Record[], + consoleLogIngestionEnabled: boolean +): IncomingRecordingMessage => { + return { + distinct_id: '', + events: data.map((d) => ({ type: 6, timestamp: 0, data: { ...d } })), + metadata: { + offset: 0, + partition: 0, + topic: 'topic', + timestamp: 0, + consoleLogIngestionEnabled, + }, + session_id: '', + team_id: 0, + } +} + +describe('console log ingester', () => { + let consoleLogIngester: ConsoleLogsIngester + const mockProducer: jest.Mock = jest.fn() + + beforeEach(async () => { + mockProducer.mockClear() + mockProducer['connect'] = jest.fn() + + jest.mocked(createKafkaProducer).mockImplementation(() => + Promise.resolve(mockProducer as unknown as HighLevelProducer) + ) + + const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker + consoleLogIngester = new ConsoleLogsIngester({ ...defaultConfig } as PluginsServerConfig, mockedHighWaterMarker) + await consoleLogIngester.start() + }) + describe('when enabled on team', () => { + test('it truncates large console logs', async () => { + await consoleLogIngester.consume( + makeIncomingMessage( + [ + { + plugin: 'rrweb/console@1', + payload: { level: 'log', payload: ['a'.repeat(3001)] }, + }, + ], + true + ) + ) + expect(jest.mocked(status.warn).mock.calls).toEqual([]) + expect(jest.mocked(produce).mock.calls).toEqual([ + [ + { + key: '', + producer: mockProducer, + topic: 'log_entries_test', + value: Buffer.from( + JSON.stringify({ + team_id: 0, + message: 'a'.repeat(2999), + log_level: 'log', + log_source: 'session_replay', + log_source_id: '', + instance_id: null, + timestamp: '1970-01-01 00:00:00.000', + }) + ), + }, + ], + ]) + }) + + test('it handles multiple console logs', async () => { + await consoleLogIngester.consume( + makeIncomingMessage( + [ + { + plugin: 'rrweb/console@1', + payload: { level: 'log', payload: ['aaaaa'] }, + }, + { + plugin: 'rrweb/something-else@1', + payload: { level: 'log', payload: ['bbbbb'] }, + }, + { + plugin: 'rrweb/console@1', + payload: { level: 'log', payload: ['ccccc'] }, + }, + ], + true + ) + ) + expect(jest.mocked(status.warn).mock.calls).toEqual([]) + expect(jest.mocked(produce)).toHaveBeenCalledTimes(2) + expect(jest.mocked(produce).mock.calls).toEqual([ + [ + { + key: '', + producer: mockProducer, + topic: 'log_entries_test', + value: Buffer.from( + JSON.stringify({ + team_id: 0, + message: 'aaaaa', + log_level: 'log', + log_source: 'session_replay', + log_source_id: '', + instance_id: null, + timestamp: '1970-01-01 00:00:00.000', + }) + ), + }, + ], + [ + { + key: '', + producer: mockProducer, + topic: 'log_entries_test', + value: Buffer.from( + JSON.stringify({ + team_id: 0, + message: 'ccccc', + log_level: 'log', + log_source: 'session_replay', + log_source_id: '', + instance_id: null, + timestamp: '1970-01-01 00:00:00.000', + }) + ), + }, + ], + ]) + }) + + test('it de-duplicates console logs', async () => { + await consoleLogIngester.consume( + makeIncomingMessage( + [ + { + plugin: 'rrweb/console@1', + payload: { level: 'log', payload: ['aaaaa'] }, + }, + { + plugin: 'rrweb/console@1', + payload: { level: 'log', payload: ['aaaaa'] }, + }, + ], + true + ) + ) + expect(jest.mocked(status.warn).mock.calls).toEqual([]) + expect(jest.mocked(produce).mock.calls).toEqual([ + [ + { + key: '', + producer: mockProducer, + topic: 'log_entries_test', + value: Buffer.from( + JSON.stringify({ + team_id: 0, + message: 'aaaaa', + log_level: 'log', + log_source: 'session_replay', + log_source_id: '', + instance_id: null, + timestamp: '1970-01-01 00:00:00.000', + }) + ), + }, + ], + ]) + }) + }) + + describe('when disabled on team', () => { + test('it drops console logs', async () => { + await consoleLogIngester.consume(makeIncomingMessage([{ plugin: 'rrweb/console@1' }], false)) + expect(jest.mocked(status.warn).mock.calls).toEqual([ + [ + '⚠️', + '[console-log-events-ingester] console_log_ingestion_disabled', + { + offset: 0, + partition: 0, + reason: 'console_log_ingestion_disabled', + }, + ], + ]) + expect(jest.mocked(produce)).not.toHaveBeenCalled() + }) + test('it does not drop events with no console logs', async () => { + await consoleLogIngester.consume(makeIncomingMessage([{ plugin: 'some-other-plugin' }], false)) + expect(jest.mocked(status.warn).mock.calls).toEqual([]) + expect(jest.mocked(produce)).not.toHaveBeenCalled() + }) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 4b51910d3d3ae..744eedd29a3a1 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -224,7 +224,7 @@ describe('ingester', () => { offset: 1, partition: 1, } satisfies Message, - () => Promise.resolve(1) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) ) expect(parsedMessage).toEqual({ distinct_id: '12345', @@ -234,6 +234,7 @@ describe('ingester', () => { partition: 1, timestamp: 1, topic: 'the_topic', + consoleLogIngestionEnabled: false, }, session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', team_id: 1, @@ -244,7 +245,7 @@ describe('ingester', () => { it('filters out invalid rrweb events', async () => { const numeric_id = 12345 - const createMessage = ($snapshot_items) => { + const createMessage = ($snapshot_items: unknown[]) => { return { value: Buffer.from( JSON.stringify({ @@ -281,7 +282,7 @@ describe('ingester', () => { timestamp: null, }, ]), - () => Promise.resolve(1) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) ) expect(parsedMessage).toEqual(undefined) @@ -298,7 +299,7 @@ describe('ingester', () => { timestamp: 123, }, ]), - () => Promise.resolve(1) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) ) expect(parsedMessage2).toMatchObject({ events: [ @@ -310,7 +311,9 @@ describe('ingester', () => { ], }) - const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () => Promise.resolve(1)) + const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () => + Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + ) expect(parsedMessage3).toEqual(undefined) }) })