Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ingestion warning on old version #21025

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
993eba9
feat: ingestion warning on old version
pauldambra Mar 19, 2024
bcd5834
and some tests
pauldambra Mar 19, 2024
c13bc9a
deprecate old body parsing method for reading the token
pauldambra Mar 19, 2024
a0baa69
add a counter too so we can see this in grafana dashboard
pauldambra Mar 19, 2024
6db56b8
typo
pauldambra Mar 19, 2024
9970346
fix v3 consumer to ignore ingestion warning
pauldambra Mar 19, 2024
84ec78a
fix
pauldambra Mar 19, 2024
1aabc20
fix
pauldambra Mar 19, 2024
f7365df
fix
pauldambra Mar 19, 2024
55c8ef5
add comment
pauldambra Mar 20, 2024
b22cdb8
disconnect producer
pauldambra Mar 20, 2024
c7ee678
inject main cluster produer as a dependency from the recording consum…
pauldambra Mar 21, 2024
5edfe8e
inject main cluster produer as a dependency from the recording consum…
pauldambra Mar 21, 2024
3533213
void the promises that eslint or prettier keeps clearing the comment …
pauldambra Mar 21, 2024
92a3119
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 22, 2024
aa694c6
does that fix the test
pauldambra Mar 22, 2024
7cdee27
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 25, 2024
bf94d77
not safe to call stop twice
pauldambra Mar 25, 2024
75ae215
not safe to call stop twice
pauldambra Mar 25, 2024
bcbeb8c
remove start and stop
pauldambra Mar 25, 2024
9092fa1
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 26, 2024
9ea826a
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 28, 2024
5603fd9
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 28, 2024
1b37861
fix
pauldambra Mar 28, 2024
04d6fba
the shortest route from a to b
pauldambra Mar 28, 2024
726891f
fix
pauldambra Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,15 @@ export class SessionRecordingIngesterV3 {
for (const message of messages) {
counterKafkaMessageReceived.inc({ partition: message.partition })

const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
const recordingMessage = await parseKafkaMessage(
message,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
// v3 consumer does not emit ingestion warnings
undefined
)

if (recordingMessage) {
Expand Down Expand Up @@ -456,55 +460,58 @@ export class SessionRecordingIngesterV3 {
}

private setupHttpRoutes() {
// Mimic the app sever's endpoint
expressApp.get('/api/projects/:projectId/session_recordings/:sessionId/snapshots', async (req, res) => {
await runInstrumentedFunction({
statsKey: `recordingingester.http.getSnapshots`,
func: async () => {
try {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})

// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}
// Mimic the app server's endpoint
expressApp.get(
'/api/projects/:projectId/session_recordings/:sessionId/snapshots',
async (req: any, res: any) => {
await runInstrumentedFunction({
statsKey: `recordingingester.http.getSnapshots`,
func: async () => {
try {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})

// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}

const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}
const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}

status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })

status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })
// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])

// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])
for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)

for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)
if (!exists) {
continue
}

if (!exists) {
continue
const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
}

const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
res.sendStatus(404)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to fetch session', e)
res.sendStatus(500)
}

res.sendStatus(404)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to fetch session', e)
res.sendStatus(500)
}
},
})
})
},
})
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import { Counter, Gauge, Histogram } from 'prom-client'
import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer } from '../../../kafka/producer'
import { PluginsServerConfig, RedisPool, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
import { createRedisPool } from '../../../utils/utils'
Expand Down Expand Up @@ -143,6 +145,7 @@ export class SessionRecordingIngester {
// if ingestion is lagging on a single partition it is often hard to identify _why_,
// this allows us to output more information for that partition
private debugPartition: number | undefined = undefined
private ingestionWarningProducer?: KafkaProducerWrapper

constructor(
private globalServerConfig: PluginsServerConfig,
Expand Down Expand Up @@ -326,11 +329,14 @@ export class SessionRecordingIngester {

counterKafkaMessageReceived.inc({ partition })

const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
const recordingMessage = await parseKafkaMessage(
message,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
this.ingestionWarningProducer
)

if (recordingMessage) {
Expand Down Expand Up @@ -505,6 +511,11 @@ export class SessionRecordingIngester {
status.info('🔁', 'blob_ingester_consumer batch consumer disconnected, cleaning up', { err })
await this.stop()
})

const producerConfig = createRdProducerConfigFromEnvVars(this.config)
const producer = await createKafkaProducer(connectionConfig, producerConfig)
producer.connect()
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
this.ingestionWarningProducer = new KafkaProducerWrapper(producer)
}

public async stop(): Promise<PromiseSettledResult<any>[]> {
Expand Down
89 changes: 61 additions & 28 deletions plugin-server/src/main/ingestion-queues/session-recording/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@ import { captureException } from '@sentry/node'
import { DateTime } from 'luxon'
import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka'
import path from 'path'
import { Counter } from 'prom-client'

import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../utils/status'
import { cloneObject } from '../../../utils/utils'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
import { IncomingRecordingMessage, PersistedRecordingMessage } from './types'

const counterLibVersionWarning = new Counter({
name: 'lib_version_warning_counter',
help: 'the number of times we have seen a message with a lib version that is too old, each _might_ cause an ingestion warning if not debounced',
})

// Helper to return now as a milliseconds timestamp
export const now = () => DateTime.now().toMillis()

Expand Down Expand Up @@ -128,9 +136,34 @@ export async function readTokenFromHeaders(
return { token, teamIdWithConfig }
}

function readLibVersionFromHeaders(headers: MessageHeader[] | undefined): string | undefined {
const libVersionHeader = headers?.find((header: MessageHeader) => {
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
return header['lib_version']
})?.['lib_version']
return typeof libVersionHeader === 'string' ? libVersionHeader : libVersionHeader?.toString()
}

function minorVersionFrom(libVersion: string | undefined): number | undefined {
try {
let minorString: string | undefined = undefined
if (libVersion && libVersion.includes('.')) {
const splat = libVersion.split('.')
// very loose check for three part semantic version number
if (splat.length === 3) {
minorString = splat[1]
}
}
return minorString ? parseInt(minorString) : undefined
} catch (e) {
status.warn('⚠️', 'could_not_read_minor_lib_version', { libVersion })
return undefined
}
}

export const parseKafkaMessage = async (
message: Message,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>,
ingestionWarningProducer: KafkaProducerWrapper | undefined
): Promise<IncomingRecordingMessage | void> => {
const dropMessage = (reason: string, extra?: Record<string, any>) => {
eventDroppedCounter
Expand All @@ -155,17 +188,42 @@ export const parseKafkaMessage = async (

const headerResult = await readTokenFromHeaders(message.headers, getTeamFn)
const token: string | undefined = headerResult.token
let teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig

if (!token) {
return dropMessage('no_token_in_header')
}

const teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig

// NB `==` so we're comparing undefined and null
// if token was in the headers but, we could not load team config
// then, we can return early
if (!!token && (teamIdWithConfig == null || teamIdWithConfig.teamId == null)) {
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
return dropMessage('header_token_present_team_missing_or_disabled', {
token: token,
})
}

// this has to be ahead of the payload parsing in case we start dropping traffic from older versions
if (!!ingestionWarningProducer && !!teamIdWithConfig.teamId) {
const libVersion = readLibVersionFromHeaders(message.headers)
const minorVersion = minorVersionFrom(libVersion)
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
if (minorVersion && minorVersion <= 74) {
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
counterLibVersionWarning.inc()

await captureIngestionWarning(
benjackwhite marked this conversation as resolved.
Show resolved Hide resolved
ingestionWarningProducer,
teamIdWithConfig.teamId,
'replay_lib_version_too_old',
{
libVersion,
minorVersion,
},
{ key: libVersion || minorVersion.toString() }
)
}
}

let messagePayload: RawEventMessage
let event: PipelineEvent

Expand All @@ -183,31 +241,6 @@ export const parseKafkaMessage = async (
return dropMessage('received_non_snapshot_message')
}

// TODO this mechanism is deprecated for blobby ingestion, we should remove it
// once we're happy that the new mechanism is working
// if there was not a token in the header then we try to load one from the message payload
if (teamIdWithConfig == null && messagePayload.team_id == null && !messagePayload.token) {
return dropMessage('no_token_in_header_or_payload')
}

if (teamIdWithConfig == null) {
const token = messagePayload.token

if (token) {
teamIdWithConfig = await getTeamFn(token)
}
}

// NB `==` so we're comparing undefined and null
if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) {
return dropMessage('token_fallback_team_missing_or_disabled', {
token: messagePayload.token,
teamId: messagePayload.team_id,
payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown',
})
}
// end of deprecated mechanism

const events: RRWebEvent[] = $snapshot_items.filter((event: any) => {
// we sometimes see events that are null
// there will always be some unexpected data but, we should try to filter out the worst of it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ export function createKafkaMessage(
messageOverrides: Partial<Message> = {},
eventProperties: Record<string, any> = {}
): Message {
const message: Message = {
return {
partition: 1,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
offset: 0,
timestamp: messageOverrides.timestamp ?? Date.now(),
size: 1,
headers: [{ token: token.toString() }],
...messageOverrides,

value: Buffer.from(
Expand All @@ -70,8 +71,6 @@ export function createKafkaMessage(
})
),
}

return message
}

export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) {
Expand Down
Loading
Loading