From e94fc6c8f9972a3f6c04be2abfc8c1e0bcc29c96 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 26 Oct 2023 13:29:44 +0100 Subject: [PATCH] feat: drop console log events when disabled --- .../services/console-logs-ingester.ts | 31 +++++++++----- .../session-recordings-consumer.ts | 40 +++++++++++-------- .../session-recording/types.ts | 1 + .../src/worker/ingestion/team-manager.ts | 11 ++--- .../session-recordings-consumer.test.ts | 13 +++--- 5 files changed, 58 insertions(+), 38 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index 95b323a1fcdb1..53160989416b8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -43,6 +43,7 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons export class ConsoleLogsIngester { producer?: RdKafkaProducer enabled: boolean + constructor( private readonly serverConfig: PluginsServerConfig, private readonly persistentHighWaterMarker: OffsetHighWaterMarker @@ -83,9 +84,9 @@ export class ConsoleLogsIngester { status.error('🔁', '[console-log-events-ingester] main_loop_error', { error }) if (error?.isRetriable) { - // We assume the if the error is retriable, then we + // We assume that if the error is retriable, then we // are probably in a state where e.g. Kafka is down - // temporarily and we would rather simply throw and + // temporarily, and we would rather simply throw and // have the process restarted. throw error } @@ -145,16 +146,24 @@ export class ConsoleLogsIngester { gatherConsoleLogEvents(event.team_id, event.session_id, event.events) ) - consoleLogEventsCounter.inc(consoleLogEvents.length) + if (consoleLogEvents.length === 0) { + return + } - return consoleLogEvents.map((cle: ConsoleLogEntry) => - produce({ - producer, - topic: KAFKA_LOG_ENTRIES, - value: Buffer.from(JSON.stringify(cle)), - key: event.session_id, - }) - ) + if (event.metadata.consoleLogIngestionEnabled) { + consoleLogEventsCounter.inc(consoleLogEvents.length) + + return consoleLogEvents.map((cle: ConsoleLogEntry) => + produce({ + producer, + topic: KAFKA_LOG_ENTRIES, + value: Buffer.from(JSON.stringify(cle)), + key: event.session_id, + }) + ) + } else { + return drop('console_log_ingestion_disabled') + } } catch (error) { status.error('⚠️', '[console-log-events-ingester] processing_error', { error: error, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 8af391b834908..ec2d32e878a90 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -8,7 +8,6 @@ import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' -import { runInstrumentedFunction } from '../../../main/utils' import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' @@ -16,6 +15,7 @@ import { status } from '../../../utils/status' import { createRedisPool } from '../../../utils/utils' import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' import { ObjectStorage } from '../../services/object_storage' +import { runInstrumentedFunction } from '../../utils' import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' import { eventDroppedCounter } from '../metrics' import { ConsoleLogsIngester } from './services/console-logs-ingester' @@ -95,6 +95,11 @@ type PartitionMetrics = { lastKnownCommit?: number } +export interface TeamIDWithConfig { + teamId: TeamId | null + consoleLogIngestionEnabled: boolean +} + export class SessionRecordingIngester { redisPool: RedisPool sessions: Record = {} @@ -107,7 +112,7 @@ export class SessionRecordingIngester { batchConsumer?: BatchConsumer partitionAssignments: Record = {} partitionLockInterval: NodeJS.Timer | null = null - teamsRefresher: BackgroundRefresher> + teamsRefresher: BackgroundRefresher> offsetsRefresher: BackgroundRefresher> config: PluginsServerConfig topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS @@ -120,7 +125,7 @@ export class SessionRecordingIngester { private objectStorage: ObjectStorage ) { // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis - // We stil connect to some of the non-dedicated resources such as postgres or the Replay events kafka. + // We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. this.config = sessionRecordingConsumerConfig(globalServerConfig) this.redisPool = createRedisPool(this.config) @@ -198,7 +203,7 @@ export class SessionRecordingIngester { op: 'checkHighWaterMark', }) - // Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking) + // Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking) if ( await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, KAFKA_CONSUMER_GROUP_ID, offset) ) { @@ -228,7 +233,7 @@ export class SessionRecordingIngester { if (!this.sessions[key]) { const { partition, topic } = event.metadata - const sessionManager = new SessionManager( + this.sessions[key] = new SessionManager( this.config, this.objectStorage.s3, this.realtimeManager, @@ -238,8 +243,6 @@ export class SessionRecordingIngester { partition, topic ) - - this.sessions[key] = sessionManager } await this.sessions[key]?.add(event) @@ -250,7 +253,7 @@ export class SessionRecordingIngester { public async parseKafkaMessage( message: Message, - getTeamFn: (s: string) => Promise + getTeamFn: (s: string) => Promise ): Promise { const statusWarn = (reason: string, extra?: Record) => { status.warn('⚠️', 'invalid_message', { @@ -288,14 +291,15 @@ export class SessionRecordingIngester { return statusWarn('no_token') } - let teamId: TeamId | null = null + let teamIdWithConfig: TeamIDWithConfig | null = null const token = messagePayload.token if (token) { - teamId = await getTeamFn(token) + teamIdWithConfig = await getTeamFn(token) } - if (teamId == null) { + // NB `==` so we're comparing undefined and null + if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', @@ -328,7 +332,7 @@ export class SessionRecordingIngester { event, }, tags: { - team_id: teamId, + team_id: teamIdWithConfig.teamId, session_id: $session_id, }, }) @@ -343,22 +347,21 @@ export class SessionRecordingIngester { }) } - const recordingMessage: IncomingRecordingMessage = { + return { metadata: { partition: message.partition, topic: message.topic, offset: message.offset, timestamp: message.timestamp, + consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled, }, - team_id: teamId, + team_id: teamIdWithConfig.teamId, distinct_id: messagePayload.distinct_id, session_id: $session_id, window_id: $window_id, events: events, } - - return recordingMessage } public async handleEachBatch(messages: Message[]): Promise { @@ -408,7 +411,10 @@ export class SessionRecordingIngester { } const recordingMessage = await this.parseKafkaMessage(message, (token) => - this.teamsRefresher.get().then((teams) => teams[token] || null) + this.teamsRefresher.get().then((teams) => ({ + teamId: teams[token]?.teamId || null, + consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, + })) ) if (recordingMessage) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/types.ts b/plugin-server/src/main/ingestion-queues/session-recording/types.ts index eb6df15214ef2..78082160db794 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/types.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/types.ts @@ -7,6 +7,7 @@ import { RRWebEvent } from '../../../types' export type IncomingRecordingMessage = { metadata: TopicPartitionOffset & { timestamp: number + consoleLogIngestionEnabled: boolean } team_id: number diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index 8adb0cac0d610..bd1dec1cd9ddf 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -3,6 +3,7 @@ import { StatsD } from 'hot-shots' import LRU from 'lru-cache' import { ONE_MINUTE } from '../../config/constants' +import { TeamIDWithConfig } from '../../main/ingestion-queues/session-recording/session-recordings-consumer' import { PipelineEvent, PluginsServerConfig, Team, TeamId } from '../../types' import { PostgresRouter, PostgresUse } from '../../utils/db/postgres' import { timeoutGuard } from '../../utils/db/utils' @@ -192,11 +193,11 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P return selectResult.rows[0] ?? null } -export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise> { - const selectResult = await client.query>( +export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise> { + const selectResult = await client.query<{ capture_console_log_opt_in: boolean } & Pick>( PostgresUse.COMMON_READ, ` - SELECT id, api_token + SELECT id, api_token, capture_console_log_opt_in FROM posthog_team WHERE session_recording_opt_in = true `, @@ -205,7 +206,7 @@ export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Pro ) return selectResult.rows.reduce((acc, row) => { - acc[row.api_token] = row.id + acc[row.api_token] = { teamId: row.id, consoleLogIngestionEnabled: row.capture_console_log_opt_in } return acc - }, {} as Record) + }, {} as Record) } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 4b51910d3d3ae..744eedd29a3a1 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -224,7 +224,7 @@ describe('ingester', () => { offset: 1, partition: 1, } satisfies Message, - () => Promise.resolve(1) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) ) expect(parsedMessage).toEqual({ distinct_id: '12345', @@ -234,6 +234,7 @@ describe('ingester', () => { partition: 1, timestamp: 1, topic: 'the_topic', + consoleLogIngestionEnabled: false, }, session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', team_id: 1, @@ -244,7 +245,7 @@ describe('ingester', () => { it('filters out invalid rrweb events', async () => { const numeric_id = 12345 - const createMessage = ($snapshot_items) => { + const createMessage = ($snapshot_items: unknown[]) => { return { value: Buffer.from( JSON.stringify({ @@ -281,7 +282,7 @@ describe('ingester', () => { timestamp: null, }, ]), - () => Promise.resolve(1) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) ) expect(parsedMessage).toEqual(undefined) @@ -298,7 +299,7 @@ describe('ingester', () => { timestamp: 123, }, ]), - () => Promise.resolve(1) + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true }) ) expect(parsedMessage2).toMatchObject({ events: [ @@ -310,7 +311,9 @@ describe('ingester', () => { ], }) - const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () => Promise.resolve(1)) + const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () => + Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + ) expect(parsedMessage3).toEqual(undefined) }) })