Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read team token from kafka headers in blobby #18337

Merged
merged 21 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { status } from '../../../../utils/status'
import { asyncTimeoutGuard } from '../../../../utils/timing'
import { ObjectStorage } from '../../../services/object_storage'
import { IncomingRecordingMessage } from '../types'
import { bufferFileDir, convertToPersistedMessage, getLagMultipler, maxDefined, minDefined, now } from '../utils'
import { bufferFileDir, convertToPersistedMessage, getLagMultiplier, maxDefined, minDefined, now } from '../utils'
import { OffsetHighWaterMarker } from './offset-high-water-marker'
import { RealtimeManager } from './realtime-manager'

Expand Down Expand Up @@ -217,7 +217,7 @@ export class SessionManager {
return
}

const lagMultiplier = getLagMultipler(partitionLag)
const lagMultiplier = getLagMultiplier(partitionLag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multipler is the formal version of the word... 🙈


const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000
const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { captureException, captureMessage } from '@sentry/node'
import { captureException } from '@sentry/node'
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import { Counter, Gauge, Histogram } from 'prom-client'
Expand All @@ -7,7 +7,7 @@ import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types'
import { PluginsServerConfig, RedisPool, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
Expand All @@ -23,7 +23,7 @@ import { RealtimeManager } from './services/realtime-manager'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager'
import { IncomingRecordingMessage } from './types'
import { bufferFileDir, getPartitionsForTopic, now, queryWatermarkOffsets } from './utils'
import { bufferFileDir, getPartitionsForTopic, now, parseKafkaMessage, queryWatermarkOffsets } from './utils'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
Expand Down Expand Up @@ -251,119 +251,6 @@ export class SessionRecordingIngester {
await this.sessions[key]?.add(event)
}

public async parseKafkaMessage(
message: Message,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
): Promise<IncomingRecordingMessage | void> {
const statusWarn = (reason: string, extra?: Record<string, any>) => {
status.warn('⚠️', 'invalid_message', {
reason,
partition: message.partition,
offset: message.offset,
...(extra || {}),
})
}

if (!message.value || !message.timestamp) {
// Typing says this can happen but in practice it shouldn't
return statusWarn('message value or timestamp is empty')
}

let messagePayload: RawEventMessage
let event: PipelineEvent

try {
messagePayload = JSON.parse(message.value.toString())
event = JSON.parse(messagePayload.data)
} catch (error) {
return statusWarn('invalid_json', { error })
}

const { $snapshot_items, $session_id, $window_id } = event.properties || {}

// NOTE: This is simple validation - ideally we should do proper schema based validation
if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) {
status.warn('🙈', 'Received non-snapshot message, ignoring')
return
}

if (messagePayload.team_id == null && !messagePayload.token) {
return statusWarn('no_token')
}

let teamIdWithConfig: TeamIDWithConfig | null = null
const token = messagePayload.token

if (token) {
teamIdWithConfig = await getTeamFn(token)
}

// NB `==` so we're comparing undefined and null
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: 'team_missing_or_disabled',
})
.inc()

return statusWarn('team_missing_or_disabled', {
token: messagePayload.token,
teamId: messagePayload.team_id,
payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown',
})
}

const invalidEvents: any[] = []
const events: RRWebEvent[] = $snapshot_items.filter((event: any) => {
if (!event || !event.timestamp) {
invalidEvents.push(event)
return false
}
return true
})

if (invalidEvents.length) {
captureMessage('[session-manager]: invalid rrweb events filtered out from message', {
extra: {
invalidEvents,
eventsCount: events.length,
invalidEventsCount: invalidEvents.length,
event,
},
tags: {
team_id: teamIdWithConfig.teamId,
session_id: $session_id,
},
})
}

if (!events.length) {
status.warn('🙈', 'Event contained no valid rrweb events, ignoring')

return statusWarn('invalid_rrweb_events', {
token: messagePayload.token,
teamId: messagePayload.team_id,
})
}

return {
metadata: {
partition: message.partition,
topic: message.topic,
offset: message.offset,
timestamp: message.timestamp,
consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled,
},

team_id: teamIdWithConfig.teamId,
distinct_id: messagePayload.distinct_id,
session_id: $session_id,
window_id: $window_id,
events: events,
}
}

public async handleEachBatch(messages: Message[]): Promise<void> {
status.info('🔁', `blob_ingester_consumer - handling batch`, {
size: messages.length,
Expand Down Expand Up @@ -395,7 +282,7 @@ export class SessionRecordingIngester {

counterKafkaMessageReceived.inc({ partition })

const recordingMessage = await this.parseKafkaMessage(message, (token) =>
const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
Expand Down Expand Up @@ -646,7 +533,7 @@ export class SessionRecordingIngester {
partitionsToDrop[partition] = this.partitionMetrics[partition] ?? {}
delete this.partitionMetrics[partition]

// Revoke the high water mark for this partition so we are essentially "reset"
// Revoke the high watermark for this partition, so we are essentially "reset"
this.sessionHighWaterMarker.revoke(topicPartition)
this.persistentHighWaterMarker.revoke(topicPartition)
})
Expand Down
168 changes: 165 additions & 3 deletions plugin-server/src/main/ingestion-queues/session-recording/utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { captureException } from '@sentry/node'
import { captureException, captureMessage } from '@sentry/node'
import { DateTime } from 'luxon'
import { KafkaConsumer, PartitionMetadata, TopicPartition } from 'node-rdkafka'
import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka'
import path from 'path'

import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { status } from '../../../utils/status'
import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
import { IncomingRecordingMessage, PersistedRecordingMessage } from './types'

export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => {
Expand Down Expand Up @@ -102,10 +105,169 @@ export const getPartitionsForTopic = (
})
}

export const getLagMultipler = (lag: number, threshold = 1000000) => {
export const getLagMultiplier = (lag: number, threshold = 1000000) => {
if (lag < threshold) {
return 1
}

return Math.max(0.1, 1 - (lag - threshold) / (threshold * 10))
}

export async function readTokenFromHeaders(
headers: MessageHeader[] | undefined,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
) {
const tokenHeader = headers?.find((header: MessageHeader) => {
// each header in the array is an object of key to value
// because it's possible to have multiple headers with the same key
// but, we don't support that. the first truthy match we find is the one we use
return header.token
})?.token

const token = typeof tokenHeader === 'string' ? tokenHeader : tokenHeader?.toString()

let teamIdWithConfig: TeamIDWithConfig | null = null

if (token) {
teamIdWithConfig = await getTeamFn(token)
}
return { token, teamIdWithConfig }
}

export const parseKafkaMessage = async (
message: Message,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
): Promise<IncomingRecordingMessage | void> => {
const statusWarn = (reason: string, extra?: Record<string, any>) => {
status.warn('⚠️', 'invalid_message', {
reason,
partition: message.partition,
offset: message.offset,
...(extra || {}),
})
}

if (!message.value || !message.timestamp) {
// Typing says this can happen but in practice it shouldn't
return statusWarn('message value or timestamp is empty')
}

const headerResult = await readTokenFromHeaders(message.headers, getTeamFn)
const token: string | undefined = headerResult.token
let teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig

// NB `==` so we're comparing undefined and null
// if token was in the headers but, we could not load team config
// then, we can return early
if (!!token && (teamIdWithConfig == null || teamIdWithConfig.teamId == null)) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: 'team_missing_or_disabled',
})
.inc()

return statusWarn('team_missing_or_disabled', {
token: token,
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an opportunity to drop here if !teamIdWithConfig.session_recording_opt_in, don't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's implicit in this check. We only load teams that exist and have session_recording_opt_in set to true.

I guess the assumption was it wasn't valuable enough to know the difference in the event dropped counter to stand the cost of loading more teams...


let messagePayload: RawEventMessage
let event: PipelineEvent

try {
messagePayload = JSON.parse(message.value.toString())
event = JSON.parse(messagePayload.data)
} catch (error) {
return statusWarn('invalid_json', { error })
}

const { $snapshot_items, $session_id, $window_id } = event.properties || {}

// NOTE: This is simple validation - ideally we should do proper schema based validation
if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) {
status.warn('🙈', 'Received non-snapshot message, ignoring')
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's increment eventDroppedCounter too here for visibility. Ideally, I'd love for all drop scenarios to increment this metric.

}

// TODO this mechanism is deprecated for blobby ingestion, we should remove it
// once we're happy that the new mechanism is working
// if there was not a token in the header then we try to load one from the message payload
if (teamIdWithConfig == null && messagePayload.team_id == null && !messagePayload.token) {
return statusWarn('no_token')
}

if (teamIdWithConfig == null) {
const token = messagePayload.token

if (token) {
teamIdWithConfig = await getTeamFn(token)
}
}

// NB `==` so we're comparing undefined and null
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: 'token_fallback_team_missing_or_disabled',
})
.inc()

return statusWarn('token_fallback_team_missing_or_disabled', {
token: messagePayload.token,
teamId: messagePayload.team_id,
payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown',
})
}
// end of deprecated mechanism

const invalidEvents: any[] = []
const events: RRWebEvent[] = $snapshot_items.filter((event: any) => {
if (!event || !event.timestamp) {
invalidEvents.push(event)
return false
}
return true
})

if (invalidEvents.length) {
captureMessage('[session-manager]: invalid rrweb events filtered out from message', {
extra: {
invalidEvents,
eventsCount: events.length,
invalidEventsCount: invalidEvents.length,
event,
},
tags: {
team_id: teamIdWithConfig.teamId,
session_id: $session_id,
},
})
}

if (!events.length) {
status.warn('🙈', 'Event contained no valid rrweb events, ignoring')

return statusWarn('invalid_rrweb_events', {
token: messagePayload.token,
teamId: messagePayload.team_id,
})
}

return {
metadata: {
partition: message.partition,
topic: message.topic,
offset: message.offset,
timestamp: message.timestamp,
consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled,
},

team_id: teamIdWithConfig.teamId,
distinct_id: messagePayload.distinct_id,
session_id: $session_id,
window_id: $window_id,
events: events,
}
}
Loading
Loading