Skip to content

Commit

Permalink
feat: drop console log events when disabled (#18210)
Browse files Browse the repository at this point in the history
* feat: drop console log events when disabled

* don't process logs before dropping them

* write tests and as a result fix de-duplication

* overwriting default with default is unnecessary
  • Loading branch information
pauldambra authored Oct 26, 2023
1 parent eff6dc7 commit a9a4eea
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handl
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
import { ConsoleLogEntry, gatherConsoleLogEvents } from '../../../../worker/ingestion/process-event'
import { ConsoleLogEntry, gatherConsoleLogEvents, RRWebEventType } from '../../../../worker/ingestion/process-event'
import { eventDroppedCounter } from '../../metrics'
import { IncomingRecordingMessage } from '../types'
import { OffsetHighWaterMarker } from './offset-high-water-marker'
Expand All @@ -30,9 +30,10 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons
const deduped: ConsoleLogEntry[] = []

for (const cle of consoleLogEntries) {
if (!seen.has(cle.message)) {
const fingerPrint = `${cle.log_level}-${cle.message}`
if (!seen.has(fingerPrint)) {
deduped.push(cle)
seen.add(`${cle.log_level}-${cle.message}`)
seen.add(fingerPrint)
}
}
return deduped
Expand All @@ -43,6 +44,7 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons
export class ConsoleLogsIngester {
producer?: RdKafkaProducer
enabled: boolean

constructor(
private readonly serverConfig: PluginsServerConfig,
private readonly persistentHighWaterMarker: OffsetHighWaterMarker
Expand Down Expand Up @@ -83,9 +85,9 @@ export class ConsoleLogsIngester {
status.error('🔁', '[console-log-events-ingester] main_loop_error', { error })

if (error?.isRetriable) {
// We assume the if the error is retriable, then we
// We assume that if the error is retriable, then we
// are probably in a state where e.g. Kafka is down
// temporarily and we would rather simply throw and
// temporarily, and we would rather simply throw and
// have the process restarted.
throw error
}
Expand Down Expand Up @@ -140,11 +142,26 @@ export class ConsoleLogsIngester {
return drop('high_water_mark')
}

// cheapest possible check for any console logs to avoid parsing the events because...
const hasAnyConsoleLogs = event.events.some(
(e) => !!e && e.type === RRWebEventType.Plugin && e.data?.plugin === 'rrweb/console@1'
)

if (!hasAnyConsoleLogs) {
return
}

// ... we don't want to mark events with no console logs as dropped
// this keeps the signal here clean and makes it easier to debug
// when we disable a team's console log ingestion
if (!event.metadata.consoleLogIngestionEnabled) {
return drop('console_log_ingestion_disabled')
}

try {
const consoleLogEvents = deduplicateConsoleLogEvents(
gatherConsoleLogEvents(event.team_id, event.session_id, event.events)
)

consoleLogEventsCounter.inc(consoleLogEvents.length)

return consoleLogEvents.map((cle: ConsoleLogEntry) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { runInstrumentedFunction } from '../../../main/utils'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
import { createRedisPool } from '../../../utils/utils'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
import { ObjectStorage } from '../../services/object_storage'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
Expand Down Expand Up @@ -95,6 +95,11 @@ type PartitionMetrics = {
lastKnownCommit?: number
}

export interface TeamIDWithConfig {
teamId: TeamId | null
consoleLogIngestionEnabled: boolean
}

export class SessionRecordingIngester {
redisPool: RedisPool
sessions: Record<string, SessionManager> = {}
Expand All @@ -107,7 +112,7 @@ export class SessionRecordingIngester {
batchConsumer?: BatchConsumer
partitionAssignments: Record<number, PartitionMetrics> = {}
partitionLockInterval: NodeJS.Timer | null = null
teamsRefresher: BackgroundRefresher<Record<string, TeamId>>
teamsRefresher: BackgroundRefresher<Record<string, TeamIDWithConfig>>
offsetsRefresher: BackgroundRefresher<Record<number, number>>
config: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
Expand All @@ -120,7 +125,7 @@ export class SessionRecordingIngester {
private objectStorage: ObjectStorage
) {
// NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis
// We stil connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
// We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
this.config = sessionRecordingConsumerConfig(globalServerConfig)
this.redisPool = createRedisPool(this.config)

Expand Down Expand Up @@ -198,7 +203,7 @@ export class SessionRecordingIngester {
op: 'checkHighWaterMark',
})

// Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking)
// Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking)
if (
await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, KAFKA_CONSUMER_GROUP_ID, offset)
) {
Expand Down Expand Up @@ -228,7 +233,7 @@ export class SessionRecordingIngester {
if (!this.sessions[key]) {
const { partition, topic } = event.metadata

const sessionManager = new SessionManager(
this.sessions[key] = new SessionManager(
this.config,
this.objectStorage.s3,
this.realtimeManager,
Expand All @@ -238,8 +243,6 @@ export class SessionRecordingIngester {
partition,
topic
)

this.sessions[key] = sessionManager
}

await this.sessions[key]?.add(event)
Expand All @@ -250,7 +253,7 @@ export class SessionRecordingIngester {

public async parseKafkaMessage(
message: Message,
getTeamFn: (s: string) => Promise<TeamId | null>
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
): Promise<IncomingRecordingMessage | void> {
const statusWarn = (reason: string, extra?: Record<string, any>) => {
status.warn('⚠️', 'invalid_message', {
Expand Down Expand Up @@ -288,14 +291,15 @@ export class SessionRecordingIngester {
return statusWarn('no_token')
}

let teamId: TeamId | null = null
let teamIdWithConfig: TeamIDWithConfig | null = null
const token = messagePayload.token

if (token) {
teamId = await getTeamFn(token)
teamIdWithConfig = await getTeamFn(token)
}

if (teamId == null) {
// NB `==` so we're comparing undefined and null
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
Expand Down Expand Up @@ -328,7 +332,7 @@ export class SessionRecordingIngester {
event,
},
tags: {
team_id: teamId,
team_id: teamIdWithConfig.teamId,
session_id: $session_id,
},
})
Expand All @@ -343,22 +347,21 @@ export class SessionRecordingIngester {
})
}

const recordingMessage: IncomingRecordingMessage = {
return {
metadata: {
partition: message.partition,
topic: message.topic,
offset: message.offset,
timestamp: message.timestamp,
consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled,
},

team_id: teamId,
team_id: teamIdWithConfig.teamId,
distinct_id: messagePayload.distinct_id,
session_id: $session_id,
window_id: $window_id,
events: events,
}

return recordingMessage
}

public async handleEachBatch(messages: Message[]): Promise<void> {
Expand Down Expand Up @@ -408,7 +411,10 @@ export class SessionRecordingIngester {
}

const recordingMessage = await this.parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => teams[token] || null)
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
)

if (recordingMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { RRWebEvent } from '../../../types'
export type IncomingRecordingMessage = {
metadata: TopicPartitionOffset & {
timestamp: number
consoleLogIngestionEnabled: boolean
}

team_id: number
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ function safeString(payload: (string | null)[]) {
.join(' ')
}

enum RRWebEventType {
export enum RRWebEventType {
DomContentLoaded = 0,
Load = 1,
FullSnapshot = 2,
Expand Down
11 changes: 6 additions & 5 deletions plugin-server/src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { StatsD } from 'hot-shots'
import LRU from 'lru-cache'

import { ONE_MINUTE } from '../../config/constants'
import { TeamIDWithConfig } from '../../main/ingestion-queues/session-recording/session-recordings-consumer'
import { PipelineEvent, PluginsServerConfig, Team, TeamId } from '../../types'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
Expand Down Expand Up @@ -192,11 +193,11 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P
return selectResult.rows[0] ?? null
}

export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise<Record<string, TeamId>> {
const selectResult = await client.query<Pick<Team, 'id' | 'api_token'>>(
export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise<Record<string, TeamIDWithConfig>> {
const selectResult = await client.query<{ capture_console_log_opt_in: boolean } & Pick<Team, 'id' | 'api_token'>>(
PostgresUse.COMMON_READ,
`
SELECT id, api_token
SELECT id, api_token, capture_console_log_opt_in
FROM posthog_team
WHERE session_recording_opt_in = true
`,
Expand All @@ -205,7 +206,7 @@ export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Pro
)

return selectResult.rows.reduce((acc, row) => {
acc[row.api_token] = row.id
acc[row.api_token] = { teamId: row.id, consoleLogIngestionEnabled: row.capture_console_log_opt_in }
return acc
}, {} as Record<string, TeamId>)
}, {} as Record<string, TeamIDWithConfig>)
}
Loading

0 comments on commit a9a4eea

Please sign in to comment.