Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ingestion warning on old version #21025

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
993eba9
feat: ingestion warning on old version
pauldambra Mar 19, 2024
bcd5834
and some tests
pauldambra Mar 19, 2024
c13bc9a
deprecate old body parsing method for reading the token
pauldambra Mar 19, 2024
a0baa69
add a counter too so we can see this in grafana dashboard
pauldambra Mar 19, 2024
6db56b8
typo
pauldambra Mar 19, 2024
9970346
fix v3 consumer to ignore ingestion warning
pauldambra Mar 19, 2024
84ec78a
fix
pauldambra Mar 19, 2024
1aabc20
fix
pauldambra Mar 19, 2024
f7365df
fix
pauldambra Mar 19, 2024
55c8ef5
add comment
pauldambra Mar 20, 2024
b22cdb8
disconnect producer
pauldambra Mar 20, 2024
c7ee678
inject main cluster produer as a dependency from the recording consum…
pauldambra Mar 21, 2024
5edfe8e
inject main cluster produer as a dependency from the recording consum…
pauldambra Mar 21, 2024
3533213
void the promises that eslint or prettier keeps clearing the comment …
pauldambra Mar 21, 2024
92a3119
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 22, 2024
aa694c6
does that fix the test
pauldambra Mar 22, 2024
7cdee27
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 25, 2024
bf94d77
not safe to call stop twice
pauldambra Mar 25, 2024
75ae215
not safe to call stop twice
pauldambra Mar 25, 2024
bcbeb8c
remove start and stop
pauldambra Mar 25, 2024
9092fa1
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 26, 2024
9ea826a
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 28, 2024
5603fd9
Merge branch 'master' into feat/ingestion_warning_on_old_version
pauldambra Mar 28, 2024
1b37861
fix
pauldambra Mar 28, 2024
04d6fba
the shortest route from a to b
pauldambra Mar 28, 2024
726891f
fix
pauldambra Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-
import { Counter } from 'prom-client'

import { KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { flushProducer, produce } from '../../../../kafka/producer'
import { status } from '../../../../utils/status'
import { eventDroppedCounter } from '../../metrics'
import { ConsoleLogEntry, gatherConsoleLogEvents, RRWebEventType } from '../process-event'
Expand Down Expand Up @@ -42,10 +40,8 @@ function deduplicateConsoleLogEvents(consoleLogEntries: ConsoleLogEntry[]): Cons
// TODO this is an almost exact duplicate of the replay events ingester
// am going to leave this duplication and then collapse it when/if we add a performance events ingester
export class ConsoleLogsIngester {
producer?: RdKafkaProducer

constructor(
private readonly serverConfig: PluginsServerConfig,
private readonly producer: RdKafkaProducer,
private readonly persistentHighWaterMarker?: OffsetHighWaterMarker
) {}

Expand Down Expand Up @@ -175,21 +171,13 @@ export class ConsoleLogsIngester {
}
}

public async start(): Promise<void> {
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)

const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig)

this.producer = await createKafkaProducer(connectionConfig, producerConfig)
this.producer.connect()
public start(): void {
if (!this.producer.isConnected()) {
status.error('🔁', '[console-log-events-ingester] kakfa producer should have been connected by parent')
}
}

public async stop(): Promise<void> {
public stop(): void {
status.info('🔁', '[console-log-events-ingester] stopping')

if (this.producer && this.producer.isConnected()) {
status.info('🔁', '[console-log-events-ingester] disconnecting kafka producer in batchConsumer stop')
await disconnectProducer(this.producer)
}
}
}
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-
import { Counter } from 'prom-client'

import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { flushProducer, produce } from '../../../../kafka/producer'
import { KafkaProducerWrapper } from '../../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../../utils/status'
import { captureIngestionWarning } from '../../../../worker/ingestion/utils'
Expand All @@ -26,10 +24,8 @@ const replayEventsCounter = new Counter({
})

export class ReplayEventsIngester {
producer?: RdKafkaProducer

constructor(
private readonly serverConfig: PluginsServerConfig,
private readonly producer: RdKafkaProducer,
private readonly persistentHighWaterMarker?: OffsetHighWaterMarker
) {}

Expand Down Expand Up @@ -179,19 +175,13 @@ export class ReplayEventsIngester {
})
}
}
public async start(): Promise<void> {
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)
const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig, producerConfig)
this.producer.connect()
public start(): void {
if (!this.producer.isConnected()) {
status.error('🔁', '[replay-events] kakfa producer should have been connected by parent')
}
}

public async stop(): Promise<void> {
public stop(): void {
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
status.info('🔁', '[replay-events] stopping')

if (this.producer && this.producer.isConnected()) {
status.info('🔁', '[replay-events] disconnecting kafka producer in batchConsumer stop')
await disconnectProducer(this.producer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import { Counter, Gauge, Histogram } from 'prom-client'
import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer } from '../../../kafka/producer'
import { PluginsServerConfig, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
Expand Down Expand Up @@ -82,6 +84,8 @@ export class SessionRecordingIngesterV3 {
// this allows us to output more information for that partition
private debugPartition: number | undefined = undefined

private mainKafkaClusterProducer?: KafkaProducerWrapper

constructor(
private globalServerConfig: PluginsServerConfig,
private postgres: PostgresRouter,
Expand Down Expand Up @@ -136,8 +140,8 @@ export class SessionRecordingIngesterV3 {
*/
this.promises.add(promise)

// eslint-disable-next-line @typescript-eslint/no-floating-promises
promise.finally(() => this.promises.delete(promise))
// we void the promise returned by finally here to avoid the need to await it
void promise.finally(() => this.promises.delete(promise))

return promise
}
Expand Down Expand Up @@ -191,11 +195,15 @@ export class SessionRecordingIngesterV3 {
for (const message of messages) {
counterKafkaMessageReceived.inc({ partition: message.partition })

const recordingMessage = await parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
}))
const recordingMessage = await parseKafkaMessage(
message,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
// v3 consumer does not emit ingestion warnings
undefined
)

if (recordingMessage) {
Expand Down Expand Up @@ -273,24 +281,30 @@ export class SessionRecordingIngesterV3 {
// Load teams into memory
await this.teamsRefresher.refresh()

// this producer uses the default plugin server config to connect to the main kafka cluster
const mainClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.globalServerConfig)
const producerConfig = createRdProducerConfigFromEnvVars(this.globalServerConfig)
const producer = await createKafkaProducer(mainClusterConnectionConfig, producerConfig)
producer.connect()
this.mainKafkaClusterProducer = new KafkaProducerWrapper(producer)

// NOTE: This is the only place where we need to use the shared server config
if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) {
this.consoleLogsIngester = new ConsoleLogsIngester(this.globalServerConfig)
await this.consoleLogsIngester.start()
this.consoleLogsIngester = new ConsoleLogsIngester(producer)
this.consoleLogsIngester.start()
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
}

if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) {
this.replayEventsIngester = new ReplayEventsIngester(this.globalServerConfig)
await this.replayEventsIngester.start()
this.replayEventsIngester = new ReplayEventsIngester(producer)
this.replayEventsIngester.start()
}

const connectionConfig = createRdConnectionConfigFromEnvVars(this.config)

// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.

// the batch consumer reads from the session replay kafka cluster
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config)
this.batchConsumer = await startBatchConsumer({
connectionConfig,
connectionConfig: replayClusterConnectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: true, // NOTE: This is the crucial difference between this and the other consumer
Expand Down Expand Up @@ -340,15 +354,19 @@ export class SessionRecordingIngesterV3 {
)
)

// stop is effectively a no-op on both of these but is kept here
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
// in case we want to add any cleanup logic in the future
if (this.replayEventsIngester) {
void this.scheduleWork(this.replayEventsIngester.stop())
this.replayEventsIngester.stop()
}
if (this.consoleLogsIngester) {
void this.scheduleWork(this.consoleLogsIngester!.stop())
this.consoleLogsIngester.stop()
}

const promiseResults = await Promise.allSettled(this.promises)

await this.mainKafkaClusterProducer?.disconnect()

status.info('👍', 'session-replay-ingestion - stopped!')

return promiseResults
Expand Down Expand Up @@ -456,55 +474,58 @@ export class SessionRecordingIngesterV3 {
}

private setupHttpRoutes() {
// Mimic the app sever's endpoint
expressApp.get('/api/projects/:projectId/session_recordings/:sessionId/snapshots', async (req, res) => {
await runInstrumentedFunction({
statsKey: `recordingingester.http.getSnapshots`,
func: async () => {
try {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})

// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}
// Mimic the app server's endpoint
expressApp.get(
'/api/projects/:projectId/session_recordings/:sessionId/snapshots',
async (req: any, res: any) => {
await runInstrumentedFunction({
statsKey: `recordingingester.http.getSnapshots`,
func: async () => {
try {
const startTime = Date.now()
res.on('finish', function () {
status.info('⚡️', `GET ${req.url} - ${res.statusCode} - ${Date.now() - startTime}ms`)
})

// validate that projectId is a number and sessionId is UUID like
const projectId = parseInt(req.params.projectId)
if (isNaN(projectId)) {
res.sendStatus(404)
return
}

const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}
const sessionId = req.params.sessionId
if (!/^[0-9a-f-]+$/.test(sessionId)) {
res.sendStatus(404)
return
}

status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })

status.info('🔁', 'session-replay-ingestion - fetching session', { projectId, sessionId })
// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])

// We don't know the partition upfront so we have to recursively check all partitions
const partitions = await readdir(this.rootDir).catch(() => [])
for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)

for (const partition of partitions) {
const sessionDir = this.dirForSession(parseInt(partition), projectId, sessionId)
const exists = await stat(sessionDir).catch(() => null)
if (!exists) {
continue
}

if (!exists) {
continue
const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
}

const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
res.sendStatus(404)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to fetch session', e)
res.sendStatus(500)
}

res.sendStatus(404)
} catch (e) {
status.error('🔥', 'session-replay-ingestion - failed to fetch session', e)
res.sendStatus(500)
}
},
})
})
},
})
}
)
}
}
Loading
Loading