Skip to content

Commit

Permalink
feat: drop console log events when disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Oct 26, 2023
1 parent 15e3e85 commit e94fc6c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons
export class ConsoleLogsIngester {
producer?: RdKafkaProducer
enabled: boolean

constructor(
private readonly serverConfig: PluginsServerConfig,
private readonly persistentHighWaterMarker: OffsetHighWaterMarker
Expand Down Expand Up @@ -83,9 +84,9 @@ export class ConsoleLogsIngester {
status.error('🔁', '[console-log-events-ingester] main_loop_error', { error })

if (error?.isRetriable) {
// We assume the if the error is retriable, then we
// We assume that if the error is retriable, then we
// are probably in a state where e.g. Kafka is down
// temporarily and we would rather simply throw and
// temporarily, and we would rather simply throw and
// have the process restarted.
throw error
}
Expand Down Expand Up @@ -145,16 +146,24 @@ export class ConsoleLogsIngester {
gatherConsoleLogEvents(event.team_id, event.session_id, event.events)
)

consoleLogEventsCounter.inc(consoleLogEvents.length)
if (consoleLogEvents.length === 0) {
return
}

return consoleLogEvents.map((cle: ConsoleLogEntry) =>
produce({
producer,
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(cle)),
key: event.session_id,
})
)
if (event.metadata.consoleLogIngestionEnabled) {
consoleLogEventsCounter.inc(consoleLogEvents.length)

return consoleLogEvents.map((cle: ConsoleLogEntry) =>
produce({
producer,
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(cle)),
key: event.session_id,
})
)
} else {
return drop('console_log_ingestion_disabled')
}
} catch (error) {
status.error('⚠️', '[console-log-events-ingester] processing_error', {
error: error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { runInstrumentedFunction } from '../../../main/utils'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
import { createRedisPool } from '../../../utils/utils'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
import { ObjectStorage } from '../../services/object_storage'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
Expand Down Expand Up @@ -95,6 +95,11 @@ type PartitionMetrics = {
lastKnownCommit?: number
}

export interface TeamIDWithConfig {
teamId: TeamId | null
consoleLogIngestionEnabled: boolean
}

export class SessionRecordingIngester {
redisPool: RedisPool
sessions: Record<string, SessionManager> = {}
Expand All @@ -107,7 +112,7 @@ export class SessionRecordingIngester {
batchConsumer?: BatchConsumer
partitionAssignments: Record<number, PartitionMetrics> = {}
partitionLockInterval: NodeJS.Timer | null = null
teamsRefresher: BackgroundRefresher<Record<string, TeamId>>
teamsRefresher: BackgroundRefresher<Record<string, TeamIDWithConfig>>
offsetsRefresher: BackgroundRefresher<Record<number, number>>
config: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
Expand All @@ -120,7 +125,7 @@ export class SessionRecordingIngester {
private objectStorage: ObjectStorage
) {
// NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis
// We stil connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
// We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
this.config = sessionRecordingConsumerConfig(globalServerConfig)
this.redisPool = createRedisPool(this.config)

Expand Down Expand Up @@ -198,7 +203,7 @@ export class SessionRecordingIngester {
op: 'checkHighWaterMark',
})

// Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking)
// Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking)
if (
await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, KAFKA_CONSUMER_GROUP_ID, offset)
) {
Expand Down Expand Up @@ -228,7 +233,7 @@ export class SessionRecordingIngester {
if (!this.sessions[key]) {
const { partition, topic } = event.metadata

const sessionManager = new SessionManager(
this.sessions[key] = new SessionManager(
this.config,
this.objectStorage.s3,
this.realtimeManager,
Expand All @@ -238,8 +243,6 @@ export class SessionRecordingIngester {
partition,
topic
)

this.sessions[key] = sessionManager
}

await this.sessions[key]?.add(event)
Expand All @@ -250,7 +253,7 @@ export class SessionRecordingIngester {

public async parseKafkaMessage(
message: Message,
getTeamFn: (s: string) => Promise<TeamId | null>
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
): Promise<IncomingRecordingMessage | void> {
const statusWarn = (reason: string, extra?: Record<string, any>) => {
status.warn('⚠️', 'invalid_message', {
Expand Down Expand Up @@ -288,14 +291,15 @@ export class SessionRecordingIngester {
return statusWarn('no_token')
}

let teamId: TeamId | null = null
let teamIdWithConfig: TeamIDWithConfig | null = null
const token = messagePayload.token

if (token) {
teamId = await getTeamFn(token)
teamIdWithConfig = await getTeamFn(token)
}

if (teamId == null) {
// NB `==` so we're comparing undefined and null
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
Expand Down Expand Up @@ -328,7 +332,7 @@ export class SessionRecordingIngester {
event,
},
tags: {
team_id: teamId,
team_id: teamIdWithConfig.teamId,
session_id: $session_id,
},
})
Expand All @@ -343,22 +347,21 @@ export class SessionRecordingIngester {
})
}

const recordingMessage: IncomingRecordingMessage = {
return {
metadata: {
partition: message.partition,
topic: message.topic,
offset: message.offset,
timestamp: message.timestamp,
consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled,
},

team_id: teamId,
team_id: teamIdWithConfig.teamId,
distinct_id: messagePayload.distinct_id,
session_id: $session_id,
window_id: $window_id,
events: events,
}

return recordingMessage
}

public async handleEachBatch(messages: Message[]): Promise<void> {
Expand Down Expand Up @@ -408,7 +411,10 @@ export class SessionRecordingIngester {
}

const recordingMessage = await this.parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => teams[token] || null)
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
)

if (recordingMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { RRWebEvent } from '../../../types'
export type IncomingRecordingMessage = {
metadata: TopicPartitionOffset & {
timestamp: number
consoleLogIngestionEnabled: boolean
}

team_id: number
Expand Down
11 changes: 6 additions & 5 deletions plugin-server/src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { StatsD } from 'hot-shots'
import LRU from 'lru-cache'

import { ONE_MINUTE } from '../../config/constants'
import { TeamIDWithConfig } from '../../main/ingestion-queues/session-recording/session-recordings-consumer'
import { PipelineEvent, PluginsServerConfig, Team, TeamId } from '../../types'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
Expand Down Expand Up @@ -192,11 +193,11 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P
return selectResult.rows[0] ?? null
}

export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise<Record<string, TeamId>> {
const selectResult = await client.query<Pick<Team, 'id' | 'api_token'>>(
export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise<Record<string, TeamIDWithConfig>> {
const selectResult = await client.query<{ capture_console_log_opt_in: boolean } & Pick<Team, 'id' | 'api_token'>>(
PostgresUse.COMMON_READ,
`
SELECT id, api_token
SELECT id, api_token, capture_console_log_opt_in
FROM posthog_team
WHERE session_recording_opt_in = true
`,
Expand All @@ -205,7 +206,7 @@ export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Pro
)

return selectResult.rows.reduce((acc, row) => {
acc[row.api_token] = row.id
acc[row.api_token] = { teamId: row.id, consoleLogIngestionEnabled: row.capture_console_log_opt_in }
return acc
}, {} as Record<string, TeamId>)
}, {} as Record<string, TeamIDWithConfig>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ describe('ingester', () => {
offset: 1,
partition: 1,
} satisfies Message,
() => Promise.resolve(1)
() => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false })
)
expect(parsedMessage).toEqual({
distinct_id: '12345',
Expand All @@ -234,6 +234,7 @@ describe('ingester', () => {
partition: 1,
timestamp: 1,
topic: 'the_topic',
consoleLogIngestionEnabled: false,
},
session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070',
team_id: 1,
Expand All @@ -244,7 +245,7 @@ describe('ingester', () => {
it('filters out invalid rrweb events', async () => {
const numeric_id = 12345

const createMessage = ($snapshot_items) => {
const createMessage = ($snapshot_items: unknown[]) => {
return {
value: Buffer.from(
JSON.stringify({
Expand Down Expand Up @@ -281,7 +282,7 @@ describe('ingester', () => {
timestamp: null,
},
]),
() => Promise.resolve(1)
() => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true })
)
expect(parsedMessage).toEqual(undefined)

Expand All @@ -298,7 +299,7 @@ describe('ingester', () => {
timestamp: 123,
},
]),
() => Promise.resolve(1)
() => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: true })
)
expect(parsedMessage2).toMatchObject({
events: [
Expand All @@ -310,7 +311,9 @@ describe('ingester', () => {
],
})

const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () => Promise.resolve(1))
const parsedMessage3 = await ingester.parseKafkaMessage(createMessage([null]), () =>
Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false })
)
expect(parsedMessage3).toEqual(undefined)
})
})
Expand Down

0 comments on commit e94fc6c

Please sign in to comment.