Skip to content

Commit

Permalink
feat: V3 ingester - perf improvements (#20587)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Feb 27, 2024
1 parent 29f423c commit 12e01a0
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Upload } from '@aws-sdk/lib-storage'
import { captureException, captureMessage } from '@sentry/node'
import { createReadStream, createWriteStream, WriteStream } from 'fs'
import { mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises'
import { appendFile, mkdir, readdir, readFile, rename, rmdir, stat, unlink, writeFile } from 'fs/promises'
import path from 'path'
import { Counter, Histogram } from 'prom-client'
import { PassThrough } from 'stream'
Expand Down Expand Up @@ -79,11 +79,6 @@ const histogramSessionSize = new Histogram({
buckets: BUCKETS_LINES_WRITTEN,
})

const writeStreamBlocked = new Counter({
name: metricPrefix + 'recording_blob_ingestion_write_stream_blocked',
help: 'Number of times we get blocked by the stream backpressure',
})

const histogramBackpressureBlockedSeconds = new Histogram({
name: metricPrefix + 'recording_blob_ingestion_backpressure_blocked_seconds',
help: 'The time taken to flush a session in seconds',
Expand All @@ -100,11 +95,6 @@ export type SessionManagerBufferContext = {
createdAt: number
}

export type SessionBuffer = {
context: SessionManagerBufferContext
fileStream: WriteStream
}

// Context that is updated and persisted to disk so must be serializable
export type SessionManagerContext = {
dir: string
Expand All @@ -114,7 +104,7 @@ export type SessionManagerContext = {
}

export class SessionManagerV3 {
buffer?: SessionBuffer
buffer?: SessionManagerBufferContext
flushPromise?: Promise<void>
destroying = false
inProgressUpload: Upload | null = null
Expand Down Expand Up @@ -149,7 +139,7 @@ export class SessionManagerV3 {
if (!bufferFileExists) {
status.info('📦', '[session-manager] started new manager', {
...this.context,
...(this.buffer?.context ?? {}),
...(this.buffer ?? {}),
})
return
}
Expand Down Expand Up @@ -204,20 +194,17 @@ export class SessionManagerV3 {
return
}

this.buffer = {
context,
fileStream: this.createFileStreamFor(path.join(this.context.dir, BUFFER_FILE_NAME)),
}
this.buffer = context

status.info('📦', '[session-manager] started new manager from existing file', {
...this.context,
...(this.buffer?.context ?? {}),
...(this.buffer ?? {}),
})
}

private async syncMetadata(): Promise<void> {
if (this.buffer) {
await writeFile(this.file(METADATA_FILE_NAME), JSON.stringify(this.buffer?.context), 'utf-8')
await writeFile(this.file(METADATA_FILE_NAME), JSON.stringify(this.buffer), 'utf-8')
} else {
await unlink(this.file(METADATA_FILE_NAME))
}
Expand Down Expand Up @@ -262,23 +249,18 @@ export class SessionManagerV3 {
return
}

buffer.context.eventsRange = {
firstTimestamp: minDefined(start, buffer.context.eventsRange?.firstTimestamp) ?? start,
lastTimestamp: maxDefined(end, buffer.context.eventsRange?.lastTimestamp) ?? end,
buffer.eventsRange = {
firstTimestamp: minDefined(start, buffer.eventsRange?.firstTimestamp) ?? start,
lastTimestamp: maxDefined(end, buffer.eventsRange?.lastTimestamp) ?? end,
}

const content = JSON.stringify(messageData) + '\n'
buffer.context.count += 1
buffer.context.sizeEstimate += content.length

if (!buffer.fileStream.write(content, 'utf-8')) {
writeStreamBlocked.inc()

const stopTimer = histogramBackpressureBlockedSeconds.startTimer()
await new Promise((r) => buffer.fileStream.once('drain', r))
stopTimer()
}
buffer.count += 1
buffer.sizeEstimate += content.length

const stopTimer = histogramBackpressureBlockedSeconds.startTimer()
await appendFile(this.file(BUFFER_FILE_NAME), content, 'utf-8')
stopTimer()
await this.syncMetadata()
} catch (error) {
this.captureException(error, { message })
Expand All @@ -287,7 +269,7 @@ export class SessionManagerV3 {
}

public async isEmpty(): Promise<boolean> {
return !this.buffer?.context.count && !(await this.getFlushFiles()).length
return !this.buffer?.count && !(await this.getFlushFiles()).length
}

public async flush(force = false): Promise<void> {
Expand All @@ -312,7 +294,7 @@ export class SessionManagerV3 {
return
}

if (this.buffer.context.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) {
if (this.buffer.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) {
return this.markCurrentBufferForFlush('buffer_size')
}

Expand All @@ -325,12 +307,12 @@ export class SessionManagerV3 {
flushThresholdJitteredMs,
}

if (!this.buffer.context.count) {
if (!this.buffer.count) {
status.warn('🚽', `[session-manager] buffer has no items yet`, { logContext })
return
}

const bufferAgeInMemoryMs = now() - this.buffer.context.createdAt
const bufferAgeInMemoryMs = now() - this.buffer.createdAt

// check the in-memory age against a larger value than the flush threshold,
// otherwise we'll flap between reasons for flushing when close to real-time processing
Expand All @@ -340,8 +322,8 @@ export class SessionManagerV3 {
logContext['isSessionAgeOverThreshold'] = isSessionAgeOverThreshold

histogramSessionAgeSeconds.observe(bufferAgeInMemoryMs / 1000)
histogramSessionSize.observe(this.buffer.context.count)
histogramSessionSizeKb.observe(this.buffer.context.sizeEstimate / 1024)
histogramSessionSize.observe(this.buffer.count)
histogramSessionSizeKb.observe(this.buffer.sizeEstimate / 1024)

if (isSessionAgeOverThreshold) {
return this.markCurrentBufferForFlush('buffer_age')
Expand All @@ -355,23 +337,21 @@ export class SessionManagerV3 {
return
}

if (!buffer.context.eventsRange || !buffer.context.count) {
if (!buffer.eventsRange || !buffer.count) {
// Indicates some issue with the buffer so we can close out
this.buffer = undefined
return
}

// ADD FLUSH METRICS HERE

const { firstTimestamp, lastTimestamp } = buffer.context.eventsRange
const { firstTimestamp, lastTimestamp } = buffer.eventsRange
const fileName = `${firstTimestamp}-${lastTimestamp}${FLUSH_FILE_EXTENSION}`

counterS3FilesWritten.labels(reason).inc(1)
histogramS3LinesWritten.observe(buffer.context.count)
histogramS3KbWritten.observe(buffer.context.sizeEstimate / 1024)
histogramS3LinesWritten.observe(buffer.count)
histogramS3KbWritten.observe(buffer.sizeEstimate / 1024)

// NOTE: We simplify everything by keeping the files as the same name for S3
await new Promise((resolve) => buffer.fileStream.end(resolve))
await rename(this.file(BUFFER_FILE_NAME), this.file(fileName))
this.buffer = undefined

Expand Down Expand Up @@ -480,19 +460,15 @@ export class SessionManagerV3 {
}
}

private getOrCreateBuffer(): SessionBuffer {
private getOrCreateBuffer(): SessionManagerBufferContext {
if (!this.buffer) {
try {
const context: SessionManagerBufferContext = {
const buffer: SessionManagerBufferContext = {
sizeEstimate: 0,
count: 0,
eventsRange: null,
createdAt: now(),
}
const buffer: SessionBuffer = {
context,
fileStream: this.createFileStreamFor(this.file(BUFFER_FILE_NAME)),
}

this.buffer = buffer
} catch (error) {
Expand All @@ -501,7 +477,7 @@ export class SessionManagerV3 {
}
}

return this.buffer as SessionBuffer
return this.buffer
}

protected createFileStreamFor(file: string): WriteStream {
Expand All @@ -525,11 +501,6 @@ export class SessionManagerV3 {
this.inProgressUpload = null
}

const buffer = this.buffer
if (buffer) {
await new Promise((resolve) => buffer.fileStream.end(resolve))
}

if (await this.isEmpty()) {
status.info('🧨', '[session-manager] removing empty session directory', {
...this.context,
Expand Down
Loading

0 comments on commit 12e01a0

Please sign in to comment.