Skip to content

Commit

Permalink
feat: read team token from kafka headers in blobby (#18337)
Browse files Browse the repository at this point in the history
Recently we read tens of millions of messages knowing the only action was to drop them. For every message we had to parse the body to get the token so we could check if we wanted to drop it.

Let's put the team token into the kafka headers since if there's an incident we always care about those values.

And then we can read the team token from the header and drop the message without parsing it when the team does not have session recording enabled.

This still also reads the body to check for team token as a fallback. If nothing else this lets us deploy this without worrying about rolling out a switcheroo

So

token in headers, team not found - return early (new behaviour)
token in headers, team found - does parse the body, but doesn't check the team again
token not in headers, team not found - return early (as before)
token not in headers, team found - (as before)
  • Loading branch information
pauldambra authored Nov 8, 2023
1 parent 292e3db commit cf4ba98
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { status } from '../../../../utils/status'
import { asyncTimeoutGuard } from '../../../../utils/timing'
import { ObjectStorage } from '../../../services/object_storage'
import { IncomingRecordingMessage } from '../types'
import { bufferFileDir, convertToPersistedMessage, getLagMultipler, maxDefined, minDefined, now } from '../utils'
import { bufferFileDir, convertToPersistedMessage, getLagMultiplier, maxDefined, minDefined, now } from '../utils'
import { OffsetHighWaterMarker } from './offset-high-water-marker'
import { RealtimeManager } from './realtime-manager'

Expand Down Expand Up @@ -217,7 +217,7 @@ export class SessionManager {
return
}

const lagMultiplier = getLagMultipler(partitionLag)
const lagMultiplier = getLagMultiplier(partitionLag)

const flushThresholdMs = this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000
const flushThresholdJitteredMs = flushThresholdMs * this.flushJitterMultiplier
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { captureException, captureMessage } from '@sentry/node'
import { captureException } from '@sentry/node'
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import { Counter, Gauge, Histogram } from 'prom-client'
Expand All @@ -7,7 +7,7 @@ import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types'
import { PluginsServerConfig, RedisPool, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
Expand All @@ -23,7 +23,7 @@ import { RealtimeManager } from './services/realtime-manager'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager'
import { IncomingRecordingMessage } from './types'
import { bufferFileDir, getPartitionsForTopic, now, queryWatermarkOffsets } from './utils'
import { bufferFileDir, getPartitionsForTopic, now, parseKafkaMessage, queryWatermarkOffsets } from './utils'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
Expand Down Expand Up @@ -251,119 +251,6 @@ export class SessionRecordingIngester {
await this.sessions[key]?.add(event)
}

public async parseKafkaMessage(
message: Message,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
): Promise<IncomingRecordingMessage | void> {
const statusWarn = (reason: string, extra?: Record<string, any>) => {
status.warn('⚠️', 'invalid_message', {
reason,
partition: message.partition,
offset: message.offset,
...(extra || {}),
})
}

if (!message.value || !message.timestamp) {
// Typing says this can happen but in practice it shouldn't
return statusWarn('message value or timestamp is empty')
}

let messagePayload: RawEventMessage
let event: PipelineEvent

try {
messagePayload = JSON.parse(message.value.toString())
event = JSON.parse(messagePayload.data)
} catch (error) {
return statusWarn('invalid_json', { error })
}

const { $snapshot_items, $session_id, $window_id } = event.properties || {}

// NOTE: This is simple validation - ideally we should do proper schema based validation
if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) {
status.warn('🙈', 'Received non-snapshot message, ignoring')
return
}

if (messagePayload.team_id == null && !messagePayload.token) {
return statusWarn('no_token')
}

let teamIdWithConfig: TeamIDWithConfig | null = null
const token = messagePayload.token

if (token) {
teamIdWithConfig = await getTeamFn(token)
}

// NB `==` so we're comparing undefined and null
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: 'team_missing_or_disabled',
})
.inc()

return statusWarn('team_missing_or_disabled', {
token: messagePayload.token,
teamId: messagePayload.team_id,
payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown',
})
}

const invalidEvents: any[] = []
const events: RRWebEvent[] = $snapshot_items.filter((event: any) => {
if (!event || !event.timestamp) {
invalidEvents.push(event)
return false
}
return true
})

if (invalidEvents.length) {
captureMessage('[session-manager]: invalid rrweb events filtered out from message', {
extra: {
invalidEvents,
eventsCount: events.length,
invalidEventsCount: invalidEvents.length,
event,
},
tags: {
team_id: teamIdWithConfig.teamId,
session_id: $session_id,
},
})
}

if (!events.length) {
status.warn('🙈', 'Event contained no valid rrweb events, ignoring')

return statusWarn('invalid_rrweb_events', {
token: messagePayload.token,
teamId: messagePayload.team_id,
})
}

return {
metadata: {
partition: message.partition,
topic: message.topic,
offset: message.offset,
timestamp: message.timestamp,
consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled,
},

team_id: teamIdWithConfig.teamId,
distinct_id: messagePayload.distinct_id,
session_id: $session_id,
window_id: $window_id,
events: events,
}
}

public async handleEachBatch(messages: Message[]): Promise<void> {
status.info('🔁', `blob_ingester_consumer - handling batch`, {
size: messages.length,
Expand Down Expand Up @@ -395,7 +282,7 @@ export class SessionRecordingIngester {

counterKafkaMessageReceived.inc({ partition })

const recordingMessage = await this.parseKafkaMessage(message, (token) =>
const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
Expand Down Expand Up @@ -646,7 +533,7 @@ export class SessionRecordingIngester {
partitionsToDrop[partition] = this.partitionMetrics[partition] ?? {}
delete this.partitionMetrics[partition]

// Revoke the high water mark for this partition so we are essentially "reset"
// Revoke the high watermark for this partition, so we are essentially "reset"
this.sessionHighWaterMarker.revoke(topicPartition)
this.persistentHighWaterMarker.revoke(topicPartition)
})
Expand Down
158 changes: 155 additions & 3 deletions plugin-server/src/main/ingestion-queues/session-recording/utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { captureException } from '@sentry/node'
import { captureException, captureMessage } from '@sentry/node'
import { DateTime } from 'luxon'
import { KafkaConsumer, PartitionMetadata, TopicPartition } from 'node-rdkafka'
import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka'
import path from 'path'

import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { status } from '../../../utils/status'
import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
import { IncomingRecordingMessage, PersistedRecordingMessage } from './types'

export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => {
Expand Down Expand Up @@ -102,10 +105,159 @@ export const getPartitionsForTopic = (
})
}

export const getLagMultipler = (lag: number, threshold = 1000000) => {
export const getLagMultiplier = (lag: number, threshold = 1000000) => {
if (lag < threshold) {
return 1
}

return Math.max(0.1, 1 - (lag - threshold) / (threshold * 10))
}

export async function readTokenFromHeaders(
headers: MessageHeader[] | undefined,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
) {
const tokenHeader = headers?.find((header: MessageHeader) => {
// each header in the array is an object of key to value
// because it's possible to have multiple headers with the same key
// but, we don't support that. the first truthy match we find is the one we use
return header.token
})?.token

const token = typeof tokenHeader === 'string' ? tokenHeader : tokenHeader?.toString()

let teamIdWithConfig: TeamIDWithConfig | null = null

if (token) {
teamIdWithConfig = await getTeamFn(token)
}
return { token, teamIdWithConfig }
}

export const parseKafkaMessage = async (
message: Message,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
): Promise<IncomingRecordingMessage | void> => {
const dropMessage = (reason: string, extra?: Record<string, any>) => {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: reason,
})
.inc()

status.warn('⚠️', 'invalid_message', {
reason,
partition: message.partition,
offset: message.offset,
...(extra || {}),
})
}

if (!message.value || !message.timestamp) {
// Typing says this can happen but in practice it shouldn't
return dropMessage('message_value_or_timestamp_is_empty')
}

const headerResult = await readTokenFromHeaders(message.headers, getTeamFn)
const token: string | undefined = headerResult.token
let teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig

// NB `==` so we're comparing undefined and null
// if token was in the headers but, we could not load team config
// then, we can return early
if (!!token && (teamIdWithConfig == null || teamIdWithConfig.teamId == null)) {
return dropMessage('header_token_present_team_missing_or_disabled', {
token: token,
})
}

let messagePayload: RawEventMessage
let event: PipelineEvent

try {
messagePayload = JSON.parse(message.value.toString())
event = JSON.parse(messagePayload.data)
} catch (error) {
return dropMessage('invalid_json', { error })
}

const { $snapshot_items, $session_id, $window_id } = event.properties || {}

// NOTE: This is simple validation - ideally we should do proper schema based validation
if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) {
return dropMessage('received_non_snapshot_message')
}

// TODO this mechanism is deprecated for blobby ingestion, we should remove it
// once we're happy that the new mechanism is working
// if there was not a token in the header then we try to load one from the message payload
if (teamIdWithConfig == null && messagePayload.team_id == null && !messagePayload.token) {
return dropMessage('no_token_in_header_or_payload')
}

if (teamIdWithConfig == null) {
const token = messagePayload.token

if (token) {
teamIdWithConfig = await getTeamFn(token)
}
}

// NB `==` so we're comparing undefined and null
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
return dropMessage('token_fallback_team_missing_or_disabled', {
token: messagePayload.token,
teamId: messagePayload.team_id,
payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown',
})
}
// end of deprecated mechanism

const invalidEvents: any[] = []
const events: RRWebEvent[] = $snapshot_items.filter((event: any) => {
if (!event || !event.timestamp) {
invalidEvents.push(event)
return false
}
return true
})

if (invalidEvents.length) {
captureMessage('[session-manager]: invalid rrweb events filtered out from message', {
extra: {
invalidEvents,
eventsCount: events.length,
invalidEventsCount: invalidEvents.length,
event,
},
tags: {
team_id: teamIdWithConfig.teamId,
session_id: $session_id,
},
})
}

if (!events.length) {
return dropMessage('message_contained_no_valid_rrweb_events', {
token: messagePayload.token,
teamId: messagePayload.team_id,
})
}

return {
metadata: {
partition: message.partition,
topic: message.topic,
offset: message.offset,
timestamp: message.timestamp,
consoleLogIngestionEnabled: teamIdWithConfig.consoleLogIngestionEnabled,
},

team_id: teamIdWithConfig.teamId,
distinct_id: messagePayload.distinct_id,
session_id: $session_id,
window_id: $window_id,
events: events,
}
}
Loading

0 comments on commit cf4ba98

Please sign in to comment.