From 12e01a0edecc6a2438cf5da1fd29a6eff3d8692e Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 27 Feb 2024 21:10:39 +0100 Subject: [PATCH] feat: V3 ingester - perf improvements (#20587) --- .../services/session-manager-v3.ts | 83 +++++--------- .../session-recordings-consumer-v3.ts | 106 ++++++++++-------- .../services/session-manager-v3.test.ts | 14 +-- .../session-recordings-consumer-v3.test.ts | 8 +- 4 files changed, 96 insertions(+), 115 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts index d34a3a291b74a..b4074d01db5bb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager-v3.ts @@ -1,7 +1,7 @@ import { Upload } from '@aws-sdk/lib-storage' import { captureException, captureMessage } from '@sentry/node' import { createReadStream, createWriteStream, WriteStream } from 'fs' -import { mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises' +import { appendFile, mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises' import path from 'path' import { Counter, Histogram } from 'prom-client' import { PassThrough } from 'stream' @@ -79,11 +79,6 @@ const histogramSessionSize = new Histogram({ buckets: BUCKETS_LINES_WRITTEN, }) -const writeStreamBlocked = new Counter({ - name: metricPrefix + 'recording_blob_ingestion_write_stream_blocked', - help: 'Number of times we get blocked by the stream backpressure', -}) - const histogramBackpressureBlockedSeconds = new Histogram({ name: metricPrefix + 'recording_blob_ingestion_backpressure_blocked_seconds', help: 'The time taken to flush a session in seconds', @@ -100,11 +95,6 @@ export type SessionManagerBufferContext = { createdAt: number } -export type SessionBuffer = { - context: SessionManagerBufferContext - fileStream: WriteStream -} - // Context that is updated and persisted to disk so must be serializable export type SessionManagerContext = { dir: string @@ -114,7 +104,7 @@ export type SessionManagerContext = { } export class SessionManagerV3 { - buffer?: SessionBuffer + buffer?: SessionManagerBufferContext flushPromise?: Promise destroying = false inProgressUpload: Upload | null = null @@ -149,7 +139,7 @@ export class SessionManagerV3 { if (!bufferFileExists) { status.info('๐Ÿ“ฆ', '[session-manager] started new manager', { ...this.context, - ...(this.buffer?.context ?? {}), + ...(this.buffer ?? {}), }) return } @@ -204,20 +194,17 @@ export class SessionManagerV3 { return } - this.buffer = { - context, - fileStream: this.createFileStreamFor(path.join(this.context.dir, BUFFER_FILE_NAME)), - } + this.buffer = context status.info('๐Ÿ“ฆ', '[session-manager] started new manager from existing file', { ...this.context, - ...(this.buffer?.context ?? {}), + ...(this.buffer ?? {}), }) } private async syncMetadata(): Promise { if (this.buffer) { - await writeFile(this.file(METADATA_FILE_NAME), JSON.stringify(this.buffer?.context), 'utf-8') + await writeFile(this.file(METADATA_FILE_NAME), JSON.stringify(this.buffer), 'utf-8') } else { await unlink(this.file(METADATA_FILE_NAME)) } @@ -262,23 +249,18 @@ export class SessionManagerV3 { return } - buffer.context.eventsRange = { - firstTimestamp: minDefined(start, buffer.context.eventsRange?.firstTimestamp) ?? start, - lastTimestamp: maxDefined(end, buffer.context.eventsRange?.lastTimestamp) ?? end, + buffer.eventsRange = { + firstTimestamp: minDefined(start, buffer.eventsRange?.firstTimestamp) ?? start, + lastTimestamp: maxDefined(end, buffer.eventsRange?.lastTimestamp) ?? end, } const content = JSON.stringify(messageData) + '\n' - buffer.context.count += 1 - buffer.context.sizeEstimate += content.length - - if (!buffer.fileStream.write(content, 'utf-8')) { - writeStreamBlocked.inc() - - const stopTimer = histogramBackpressureBlockedSeconds.startTimer() - await new Promise((r) => buffer.fileStream.once('drain', r)) - stopTimer() - } + buffer.count += 1 + buffer.sizeEstimate += content.length + const stopTimer = histogramBackpressureBlockedSeconds.startTimer() + await appendFile(this.file(BUFFER_FILE_NAME), content, 'utf-8') + stopTimer() await this.syncMetadata() } catch (error) { this.captureException(error, { message }) @@ -287,7 +269,7 @@ export class SessionManagerV3 { } public async isEmpty(): Promise { - return !this.buffer?.context.count && !(await this.getFlushFiles()).length + return !this.buffer?.count && !(await this.getFlushFiles()).length } public async flush(force = false): Promise { @@ -312,7 +294,7 @@ export class SessionManagerV3 { return } - if (this.buffer.context.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) { + if (this.buffer.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) { return this.markCurrentBufferForFlush('buffer_size') } @@ -325,12 +307,12 @@ export class SessionManagerV3 { flushThresholdJitteredMs, } - if (!this.buffer.context.count) { + if (!this.buffer.count) { status.warn('๐Ÿšฝ', `[session-manager] buffer has no items yet`, { logContext }) return } - const bufferAgeInMemoryMs = now() - this.buffer.context.createdAt + const bufferAgeInMemoryMs = now() - this.buffer.createdAt // check the in-memory age against a larger value than the flush threshold, // otherwise we'll flap between reasons for flushing when close to real-time processing @@ -340,8 +322,8 @@ export class SessionManagerV3 { logContext['isSessionAgeOverThreshold'] = isSessionAgeOverThreshold histogramSessionAgeSeconds.observe(bufferAgeInMemoryMs / 1000) - histogramSessionSize.observe(this.buffer.context.count) - histogramSessionSizeKb.observe(this.buffer.context.sizeEstimate / 1024) + histogramSessionSize.observe(this.buffer.count) + histogramSessionSizeKb.observe(this.buffer.sizeEstimate / 1024) if (isSessionAgeOverThreshold) { return this.markCurrentBufferForFlush('buffer_age') @@ -355,7 +337,7 @@ export class SessionManagerV3 { return } - if (!buffer.context.eventsRange || !buffer.context.count) { + if (!buffer.eventsRange || !buffer.count) { // Indicates some issue with the buffer so we can close out this.buffer = undefined return @@ -363,15 +345,13 @@ export class SessionManagerV3 { // ADD FLUSH METRICS HERE - const { firstTimestamp, lastTimestamp } = buffer.context.eventsRange + const { firstTimestamp, lastTimestamp } = buffer.eventsRange const fileName = `${firstTimestamp}-${lastTimestamp}${FLUSH_FILE_EXTENSION}` counterS3FilesWritten.labels(reason).inc(1) - histogramS3LinesWritten.observe(buffer.context.count) - histogramS3KbWritten.observe(buffer.context.sizeEstimate / 1024) + histogramS3LinesWritten.observe(buffer.count) + histogramS3KbWritten.observe(buffer.sizeEstimate / 1024) - // NOTE: We simplify everything by keeping the files as the same name for S3 - await new Promise((resolve) => buffer.fileStream.end(resolve)) await rename(this.file(BUFFER_FILE_NAME), this.file(fileName)) this.buffer = undefined @@ -480,19 +460,15 @@ export class SessionManagerV3 { } } - private getOrCreateBuffer(): SessionBuffer { + private getOrCreateBuffer(): SessionManagerBufferContext { if (!this.buffer) { try { - const context: SessionManagerBufferContext = { + const buffer: SessionManagerBufferContext = { sizeEstimate: 0, count: 0, eventsRange: null, createdAt: now(), } - const buffer: SessionBuffer = { - context, - fileStream: this.createFileStreamFor(this.file(BUFFER_FILE_NAME)), - } this.buffer = buffer } catch (error) { @@ -501,7 +477,7 @@ export class SessionManagerV3 { } } - return this.buffer as SessionBuffer + return this.buffer } protected createFileStreamFor(file: string): WriteStream { @@ -525,11 +501,6 @@ export class SessionManagerV3 { this.inProgressUpload = null } - const buffer = this.buffer - if (buffer) { - await new Promise((resolve) => buffer.fileStream.end(resolve)) - } - if (await this.isEmpty()) { status.info('๐Ÿงจ', '[session-manager] removing empty session directory', { ...this.context, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index 3523609b8af1a..8e5178822255b 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -27,7 +27,7 @@ require('@sentry/tracing') // WARNING: Do not change this - it will essentially reset the consumer const KAFKA_CONSUMER_GROUP_ID = 'session-replay-ingester' -const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 +const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 60000 // NOTE: To remove once released const metricPrefix = 'v3_' @@ -66,20 +66,13 @@ export interface TeamIDWithConfig { * as the persistent volume for both blob data and the metadata around ingestion. */ export class SessionRecordingIngesterV3 { - // redisPool: RedisPool sessions: Record = {} - // sessionHighWaterMarker: OffsetHighWaterMarker - // persistentHighWaterMarker: OffsetHighWaterMarker - // realtimeManager: RealtimeManager // replayEventsIngester: ReplayEventsIngester // consoleLogsIngester: ConsoleLogsIngester batchConsumer?: BatchConsumer - // partitionMetrics: Record = {} teamsRefresher: BackgroundRefresher> - // latestOffsetsRefresher: BackgroundRefresher> config: PluginsServerConfig topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS - // totalNumPartitions = 0 private promises: Set> = new Set() // if ingestion is lagging on a single partition it is often hard to identify _why_, @@ -98,7 +91,6 @@ export class SessionRecordingIngesterV3 { // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis // We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. this.config = sessionRecordingConsumerConfig(globalServerConfig) - // this.redisPool = createRedisPool(this.config) // NOTE: This is the only place where we need to use the shared server config // TODO: Uncomment when we swap to using this service as the ingester for it @@ -183,8 +175,13 @@ export class SessionRecordingIngesterV3 { sessionsHandled: Object.keys(this.sessions).length, }) - // TODO: For all assigned partitions, load up any sessions on disk that we don't already have in memory // TODO: Add a timer or something to fire this "handleEachBatch" with an empty batch for quite partitions + const startTime = Date.now() + const timeRemaining = () => { + // We try to force steps that can be slow to finish before the session timeout + const elapsed = Date.now() - startTime + return Math.max(KAFKA_CONSUMER_SESSION_TIMEOUT_MS * 0.9 - elapsed, 0) + } await runInstrumentedFunction({ statsKey: `recordingingester.handleEachBatch`, @@ -217,12 +214,10 @@ export class SessionRecordingIngesterV3 { }, }) - // await this.reportPartitionMetrics() - await runInstrumentedFunction({ statsKey: `recordingingester.handleEachBatch.ensureSessionsAreLoaded`, func: async () => { - await this.syncSessionsWithDisk() + await this.syncSessionsWithDisk(timeRemaining()) }, }) @@ -243,7 +238,7 @@ export class SessionRecordingIngesterV3 { statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`, func: async () => { // TODO: This can time out if it ends up being overloaded - we should have a max limit here - await this.flushAllReadySessions() + await this.flushAllReadySessions(timeRemaining()) }, }) @@ -336,10 +331,6 @@ export class SessionRecordingIngesterV3 { const promiseResults = await Promise.allSettled(this.promises) - // Finally we clear up redis once we are sure everything else has been handled - // await this.redisPool.drain() - // await this.redisPool.clear() - status.info('๐Ÿ‘', 'session-replay-ingestion - stopped!') return promiseResults @@ -350,17 +341,28 @@ export class SessionRecordingIngesterV3 { return this.batchConsumer?.isHealthy() } - async flushAllReadySessions(): Promise { - const promises: Promise[] = [] + async flushAllReadySessions(timeLimit: number = KAFKA_CONSUMER_SESSION_TIMEOUT_MS): Promise { + const startTime = Date.now() const assignedPartitions = this.assignedTopicPartitions.map((x) => x.partition) + const sessions = Object.entries(this.sessions) + let flushedCount = 0 + + // TODO: We could probably parallelize this to some extent + for (const [key, sessionManager] of sessions) { + if (Date.now() - startTime > timeLimit) { + status.warn( + 'โš ๏ธ', + `session-replay-ingestion - flushing sessions took too long, stopping at ${flushedCount} of ${sessions.length} sessions` + ) + return + } - for (const [key, sessionManager] of Object.entries(this.sessions)) { if (!assignedPartitions.includes(sessionManager.context.partition)) { - promises.push(this.destroySession(key, sessionManager)) + await this.destroySession(key, sessionManager) continue } - const flushPromise = sessionManager + await sessionManager .flush() .catch((err) => { status.error( @@ -380,37 +382,47 @@ export class SessionRecordingIngesterV3 { await this.destroySession(key, sessionManager) } }) - promises.push(flushPromise) + flushedCount++ } - await Promise.allSettled(promises) gaugeSessionsHandled.set(Object.keys(this.sessions).length) } - private async syncSessionsWithDisk(): Promise { + private async syncSessionsWithDisk(timeLimit: number = KAFKA_CONSUMER_SESSION_TIMEOUT_MS): Promise { // As we may get assigned and reassigned partitions, we want to make sure that we have all sessions loaded into memory - await Promise.all( - this.assignedTopicPartitions.map(async ({ partition }) => { - const keys = await readdir(path.join(this.rootDir, `${partition}`)).catch(() => { - // This happens if there are no files on disk for that partition yet - return [] - }) + const startTime = Date.now() - // TODO: Below regex is a little crude. We should fix it - keys.filter((x) => /\d+__[a-zA-Z0-9\-]+/.test(x)).forEach((key) => { - // TODO: Ensure sessionId can only be a uuid - const [teamId, sessionId] = key.split('__') - - if (!this.sessions[key]) { - this.sessions[key] = new SessionManagerV3(this.config, this.objectStorage.s3, { - teamId: parseInt(teamId), - sessionId, - dir: this.dirForSession(partition, parseInt(teamId), sessionId), - partition, - }) - } - }) + for (const { partition } of this.assignedTopicPartitions) { + const keys = await readdir(path.join(this.rootDir, `${partition}`)).catch(() => { + // This happens if there are no files on disk for that partition yet + return [] }) - ) + + const relatedKeys = keys.filter((x) => /\d+__[a-zA-Z0-9\-]+/.test(x)) + + for (const key of relatedKeys) { + if (Date.now() - startTime > timeLimit) { + status.warn( + 'โš ๏ธ', + `session-replay-ingestion - syncing sessions from disk is taking too long, stopping.` + ) + return + } + + // TODO: Ensure sessionId can only be a uuid + const [teamId, sessionId] = key.split('__') + + if (!this.sessions[key]) { + this.sessions[key] = new SessionManagerV3(this.config, this.objectStorage.s3, { + teamId: parseInt(teamId), + sessionId, + dir: this.dirForSession(partition, parseInt(teamId), sessionId), + partition, + }) + + await this.sessions[key].setupPromise + } + } + } } private async destroySession(key: string, sessionManager: SessionManagerV3): Promise { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts index 7c307e99a01b4..3b36b4d6ee299 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/session-manager-v3.test.ts @@ -80,7 +80,7 @@ describe('session-manager', () => { await sessionManager.add(event) - expect(sessionManager.buffer?.context).toEqual({ + expect(sessionManager.buffer).toEqual({ sizeEstimate: 193, count: 1, eventsRange: { firstTimestamp: timestamp, lastTimestamp: timestamp + 1000 }, @@ -120,7 +120,7 @@ describe('session-manager', () => { await sessionManager.add(eventOne) await sessionManager.add(eventTwo) - sessionManager.buffer!.context.createdAt = now() - flushThreshold - 1 + sessionManager.buffer!.createdAt = now() - flushThreshold - 1 await sessionManager.flush() @@ -270,7 +270,7 @@ describe('session-manager', () => { const sm2 = await createSessionManager('session_id_2', 2, 2) - expect(sm2.buffer?.context).toEqual({ + expect(sm2.buffer).toEqual({ count: 1, createdAt: expect.any(Number), eventsRange: { @@ -280,10 +280,8 @@ describe('session-manager', () => { sizeEstimate: 185, }) - expect(sm2.buffer?.context.createdAt).toBeGreaterThanOrEqual(0) - expect(sm2.buffer?.context.eventsRange?.firstTimestamp).toBe(sm2.buffer!.context.createdAt) - expect(sm2.buffer?.context.eventsRange?.lastTimestamp).toBeGreaterThanOrEqual( - sm2.buffer!.context.eventsRange!.firstTimestamp - ) + expect(sm2.buffer?.createdAt).toBeGreaterThanOrEqual(0) + expect(sm2.buffer?.eventsRange?.firstTimestamp).toBe(sm2.buffer!.createdAt) + expect(sm2.buffer?.eventsRange?.lastTimestamp).toBeGreaterThanOrEqual(sm2.buffer!.eventsRange!.firstTimestamp) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts index 6f5738e2ed410..0c391f0ceebab 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -203,7 +203,7 @@ describe('ingester', () => { }) await ingester.consume(event) expect(ingester.sessions[`1__${sessionId}`]).toBeDefined() - ingester.sessions[`1__${sessionId}`].buffer!.context.createdAt = 0 + ingester.sessions[`1__${sessionId}`].buffer!.createdAt = 0 await ingester.flushAllReadySessions() @@ -222,8 +222,8 @@ describe('ingester', () => { createMessage('session_id_2', 1), ]) - expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.context.count).toBe(1) - expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.context.count).toBe(1) + expect(ingester.sessions[`${team.id}__session_id_1`].buffer?.count).toBe(1) + expect(ingester.sessions[`${team.id}__session_id_2`].buffer?.count).toBe(1) let fileContents = await fs.readFile( path.join(ingester.sessions[`${team.id}__session_id_1`].context.dir, 'buffer.jsonl'), @@ -257,7 +257,7 @@ describe('ingester', () => { const getSessions = ( ingester: SessionRecordingIngesterV3 ): (SessionManagerContext & SessionManagerBufferContext)[] => - Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer!.context })) + Object.values(ingester.sessions).map((x) => ({ ...x.context, ...x.buffer! })) /** * It is really hard to actually do rebalance tests against kafka, so we instead simulate the various methods and ensure the correct logic occurs