Skip to content

Commit

Permalink
fix: Added handling for v3 ingester metadata.json file being corrupted (
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Feb 27, 2024
1 parent c09812e commit fbe120f
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const metricPrefix = 'v3_'
export const FILE_EXTENSION = '.jsonl'
export const BUFFER_FILE_NAME = `buffer${FILE_EXTENSION}`
export const FLUSH_FILE_EXTENSION = `.flush${FILE_EXTENSION}`
export const METADATA_FILE_NAME = `metadata.json`

const counterS3FilesWritten = new Counter({
name: metricPrefix + 'recording_s3_files_written',
Expand All @@ -37,6 +38,11 @@ const counterS3WriteErrored = new Counter({
help: 'Indicates that we failed to flush to S3 without recovering',
})

const bufferLoadFailedCounter = new Counter({
name: metricPrefix + 'recording_load_from_file_failed',
help: 'Indicates that we failed to load the file from disk',
})

const histogramS3LinesWritten = new Histogram({
name: metricPrefix + 'recording_s3_lines_written_histogram',
help: 'The number of lines in a file we send to s3',
Expand Down Expand Up @@ -133,39 +139,87 @@ export class SessionManagerV3 {
private async setup(): Promise<void> {
await mkdir(this.context.dir, { recursive: true })

const bufferFileExists = await stat(this.file(BUFFER_FILE_NAME))
.then(() => true)
.catch(() => false)

let metadataFileContent: string | undefined
let context: SessionManagerBufferContext | undefined

if (!bufferFileExists) {
status.info('📦', '[session-manager] started new manager', {
...this.context,
...(this.buffer?.context ?? {}),
})
return
}

try {
const fileExists = await stat(this.file('metadata.json')).then(
() => true,
() => false
)
if (fileExists) {
const bufferMetadata: SessionManagerBufferContext = JSON.parse(
await readFile(this.file('metadata.json'), 'utf-8')
)
this.buffer = {
context: bufferMetadata,
fileStream: this.createFileStreamFor(path.join(this.context.dir, BUFFER_FILE_NAME)),
}
}
metadataFileContent = await readFile(this.file(METADATA_FILE_NAME), 'utf-8')
context = JSON.parse(metadataFileContent)
} catch (error) {
// Indicates no buffer metadata file or it's corrupted
status.error('🧨', '[session-manager] failed to read buffer metadata', {
status.error('🧨', '[session-manager] failed to read buffer metadata.json', {
...this.context,
error,
})

this.captureMessage('Failed to read buffer metadata.json', { error })

// NOTE: This is not ideal... we fallback to loading the buffer.jsonl and deriving metadata from that as best as possible
// If that still fails then we have to bail out and drop the buffer.jsonl (data loss...)

try {
const stats = await stat(this.file(BUFFER_FILE_NAME))

context = {
sizeEstimate: stats.size,
count: 1, // We can't afford to load the whole file into memory so we assume 1 line
eventsRange: {
firstTimestamp: Math.round(stats.birthtimeMs),
// This is really less than ideal but we don't have much choice
lastTimestamp: Date.now(),
},
createdAt: Math.round(stats.birthtimeMs),
}
} catch (error) {
status.error('🧨', '[session-manager] failed to determine metadata from buffer file', {
...this.context,
error,
})
}
}

if (!context) {
// Indicates we couldn't successfully read the metadata file
await unlink(this.file(METADATA_FILE_NAME)).catch(() => null)
await unlink(this.file(BUFFER_FILE_NAME)).catch(() => null)

bufferLoadFailedCounter.inc()

this.captureException(new Error('Failed to read buffer metadata. Resorted to hard deletion'), {
metadataFileContent,
})

return
}

this.buffer = {
context,
fileStream: this.createFileStreamFor(path.join(this.context.dir, BUFFER_FILE_NAME)),
}

status.info('📦', '[session-manager] started new manager', {
status.info('📦', '[session-manager] started new manager from existing file', {
...this.context,
...(this.buffer?.context ?? {}),
})
}

private async syncMetadata(): Promise<void> {
if (this.buffer) {
await writeFile(this.file('metadata.json'), JSON.stringify(this.buffer?.context), 'utf-8')
await writeFile(this.file(METADATA_FILE_NAME), JSON.stringify(this.buffer?.context), 'utf-8')
} else {
await unlink(this.file('metadata.json'))
await unlink(this.file(METADATA_FILE_NAME))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,39 @@ describe('session-manager', () => {
'{"window_id":"window_id_1","data":[{"timestamp":170000000,"type":4,"data":{"href":"http://localhost:3001/"}}]}\n'
)
})

it('handles a corrupted metadata.json file', async () => {
const sm1 = await createSessionManager('session_id_2', 2, 2)

await sm1.add(
createIncomingRecordingMessage({
events: [
{ timestamp: 170000000, type: 4, data: { href: 'http://localhost:3001/' } },
{ timestamp: 170000000 + 1000, type: 4, data: { href: 'http://localhost:3001/' } },
],
})
)

await sm1.stop()

await fs.writeFile(`${sm1.context.dir}/metadata.json`, 'CORRUPTEDDD', 'utf-8')

const sm2 = await createSessionManager('session_id_2', 2, 2)

expect(sm2.buffer?.context).toEqual({
count: 1,
createdAt: expect.any(Number),
eventsRange: {
firstTimestamp: expect.any(Number),
lastTimestamp: expect.any(Number),
},
sizeEstimate: 185,
})

expect(sm2.buffer?.context.createdAt).toBeGreaterThanOrEqual(0)
expect(sm2.buffer?.context.eventsRange?.firstTimestamp).toBe(sm2.buffer!.context.createdAt)
expect(sm2.buffer?.context.eventsRange?.lastTimestamp).toBeGreaterThanOrEqual(
sm2.buffer!.context.eventsRange!.firstTimestamp
)
})
})

0 comments on commit fbe120f

Please sign in to comment.