Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ingestion warning on old version #21025

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
993eba9
feat: ingestion warning on old version
pauldambra Mar 19, 2024
bcd5834
and some tests
pauldambra Mar 19, 2024
c13bc9a
deprecate old body parsing method for reading the token
pauldambra Mar 19, 2024
a0baa69
add a counter too so we can see this in grafana dashboard
pauldambra Mar 19, 2024
6db56b8
typo
pauldambra Mar 19, 2024
9970346
fix v3 consumer to ignore ingestion warning
pauldambra Mar 19, 2024
84ec78a
fix
pauldambra Mar 19, 2024
1aabc20
fix
pauldambra Mar 19, 2024
f7365df
fix
pauldambra Mar 19, 2024
55c8ef5
add comment
pauldambra Mar 20, 2024
b22cdb8
disconnect producer
pauldambra Mar 20, 2024
c7ee678
inject main cluster produer as a dependency from the recording consum…
pauldambra Mar 21, 2024
5edfe8e
inject main cluster produer as a dependency from the recording consum…
pauldambra Mar 21, 2024
3533213
void the promises that eslint or prettier keeps clearing the comment …
pauldambra Mar 21, 2024
92a3119
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 22, 2024
aa694c6
does that fix the test
pauldambra Mar 22, 2024
7cdee27
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 25, 2024
bf94d77
not safe to call stop twice
pauldambra Mar 25, 2024
75ae215
not safe to call stop twice
pauldambra Mar 25, 2024
bcbeb8c
remove start and stop
pauldambra Mar 25, 2024
9092fa1
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 26, 2024
9ea826a
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 28, 2024
5603fd9
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 28, 2024
1b37861
fix
pauldambra Mar 28, 2024
04d6fba
the shortest route from a to b
pauldambra Mar 28, 2024
726891f
fix
pauldambra Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ export class SessionRecordingIngesterV3 {
*/
this.promises.add(promise)

// eslint-disable-next-line @typescript-eslint/no-floating-promises
promise.finally(() => this.promises.delete(promise))
// we void the promise returned by finally here to avoid the need to await it
void promise.finally(() => this.promises.delete(promise))

return promise
}
Expand Down Expand Up @@ -194,11 +194,15 @@ export class SessionRecordingIngesterV3 {
for (const message of messages) {
counterKafkaMessageReceived.inc({ partition: message.partition })

const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
const recordingMessage = await parseKafkaMessage(
message,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
// v3 consumer does not emit ingestion warnings
undefined
)

if (recordingMessage) {
Expand Down Expand Up @@ -294,13 +298,12 @@ export class SessionRecordingIngesterV3 {
this.replayEventsIngester = new ReplayEventsIngester(this.sharedClusterProducerWrapper.producer)
}

const connectionConfig = createRdConnectionConfigFromEnvVars(this.config)

// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.

// the batch consumer reads from the session replay kafka cluster
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config)
this.batchConsumer = await startBatchConsumer({
connectionConfig,
connectionConfig: replayClusterConnectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: true, // NOTE: This is the crucial difference between this and the other consumer
Expand Down Expand Up @@ -463,55 +466,58 @@ export class SessionRecordingIngesterV3 {
}

private setupHttpRoutes() {
// Mimic the app sever's endpoint
expressApp.get('/api/projects/:projectId/session_recordings/:sessionId/snapshots', async (req, res) => {
await runInstrumentedFunction({
statsKey: `recordingingester.http.getSnapshots`,
func: async () => {
try {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})

// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}
// Mimic the app server's endpoint
expressApp.get(
'/api/projects/:projectId/session_recordings/:sessionId/snapshots',
async (req: any, res: any) => {
await runInstrumentedFunction({
statsKey: `recordingingester.http.getSnapshots`,
func: async () => {
try {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})

// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}

const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}
const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}

status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })
status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })

// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])
// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])

for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)
for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)

if (!exists) {
continue
if (!exists) {
continue
}

const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
}

const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
res.sendStatus(404)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to fetch session', e)
res.sendStatus(500)
}

res.sendStatus(404)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to fetch session', e)
res.sendStatus(500)
}
},
})
})
},
})
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ export class SessionRecordingIngester {
*/
this.promises.add(promise)

// eslint-disable-next-line @typescript-eslint/no-floating-promises
promise.finally(() => this.promises.delete(promise))
// we void the promise returned by finally here to avoid the need to await it
void promise.finally(() => this.promises.delete(promise))

return promise
}
Expand Down Expand Up @@ -350,11 +350,14 @@ export class SessionRecordingIngester {

counterKafkaMessageReceived.inc({ partition })

const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
const recordingMessage = await parseKafkaMessage(
message,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
this.sharedClusterProducerWrapper
)

if (recordingMessage) {
Expand Down Expand Up @@ -473,13 +476,12 @@ export class SessionRecordingIngester {
)
}

const connectionConfig = createRdConnectionConfigFromEnvVars(this.config)

// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.

// the batch consumer reads from the session replay kafka cluster
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config)
this.batchConsumer = await startBatchConsumer({
connectionConfig,
connectionConfig: replayClusterConnectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@ import { captureException } from '@sentry/node'
import { DateTime } from 'luxon'
import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka'
import path from 'path'
import { Counter } from 'prom-client'

import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../utils/status'
import { cloneObject } from '../../../utils/utils'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../metrics'
import { TeamIDWithConfig } from './session-recordings-consumer'
import { IncomingRecordingMessage, PersistedRecordingMessage } from './types'

const counterLibVersionWarning = new Counter({
name: 'lib_version_warning_counter',
help: 'the number of times we have seen a message with a lib version that is too old, each _might_ cause an ingestion warning if not debounced',
})

// Helper to return now as a milliseconds timestamp
export const now = () => DateTime.now().toMillis()

Expand Down Expand Up @@ -128,9 +136,38 @@ export async function readTokenFromHeaders(
return { token, teamIdWithConfig }
}

function readLibVersionFromHeaders(headers: MessageHeader[] | undefined): string | undefined {
const libVersionHeader = headers?.find((header) => {
return header['lib_version']
})?.['lib_version']
return typeof libVersionHeader === 'string' ? libVersionHeader : libVersionHeader?.toString()
}

function majorAndMinorVersionFrom(libVersion: string | undefined): number | undefined {
try {
let majorString: string | undefined = undefined
let minorString: string | undefined = undefined
if (libVersion && libVersion.includes('.')) {
const splat = libVersion.split('.')
// very loose check for three part semantic version number
if (splat.length === 3) {
majorString = splat[0]
minorString = splat[1]
}
}
const validMajor = majorString && !isNaN(parseInt(majorString))
const validMinor = minorString && !isNaN(parseInt(minorString))
return validMajor && validMinor ? parseFloat(`${majorString}.${minorString}`) : undefined
} catch (e) {
status.warn('⚠️', 'could_not_read_minor_lib_version', { libVersion })
return undefined
}
}

export const parseKafkaMessage = async (
message: Message,
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>
getTeamFn: (s: string) => Promise<TeamIDWithConfig | null>,
ingestionWarningProducer: KafkaProducerWrapper | undefined
): Promise<IncomingRecordingMessage | void> => {
const dropMessage = (reason: string, extra?: Record<string, any>) => {
eventDroppedCounter
Expand Down Expand Up @@ -170,6 +207,32 @@ export const parseKafkaMessage = async (
})
}

// this has to be ahead of the payload parsing in case we start dropping traffic from older versions
if (!!ingestionWarningProducer && !!teamIdWithConfig.teamId) {
const libVersion = readLibVersionFromHeaders(message.headers)
const parsedVersion = majorAndMinorVersionFrom(libVersion)
/**
* We introduced SVG mutation throttling in version 1.74.0 fix: Recording throttling for SVG-like things (#758)
* and improvements like jitter on retry and better batching in session recording in earlier versions
* So, versions older than 1.75.0 can cause ingestion pressure or incidents
* because they send much more information and more messages for the same recording
*/
if (parsedVersion && parsedVersion <= 1.74) {
counterLibVersionWarning.inc()

await captureIngestionWarning(
benjackwhite marked this conversation as resolved.
Show resolved Hide resolved
ingestionWarningProducer,
teamIdWithConfig.teamId,
'replay_lib_version_too_old',
{
libVersion,
parsedVersion,
},
{ key: libVersion || parsedVersion.toString() }
)
}
}

let messagePayload: RawEventMessage
let event: PipelineEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ describe('console log ingester', () => {
beforeEach(() => {
mockProducer.mockClear()
mockProducer['connect'] = jest.fn()
mockProducer['isConnected'] = () => true

const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker
consoleLogIngester = new ConsoleLogsIngester(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ describe('replay events ingester', () => {
beforeEach(() => {
mockProducer.mockClear()
mockProducer['connect'] = jest.fn()
mockProducer['isConnected'] = () => true

const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker
ingester = new ReplayEventsIngester(mockProducer as unknown as HighLevelProducer, mockedHighWaterMarker)
Expand Down
Loading
Loading