Skip to content

Commit

Permalink
fix: no swallowing flush errors (#27619)
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Jan 17, 2025
1 parent e65f779 commit 6eeae8e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ export class SessionManager {
private async _flush(
reason: 'buffer_size' | 'buffer_age' | 'buffer_age_realtime' | 'partition_shutdown'
): Promise<void> {
// NOTE: The below checks don't need to throw really but we do so to help debug what might be blocking things
// NOTE: The below checks don't need to throw really, but we do so to help debug what might be blocking things
if (this.flushBuffer) {
this.debugLog('🚽', '[session-manager] flush called but we already have a flush buffer', {
...this.logContext(),
Expand Down Expand Up @@ -337,7 +337,7 @@ export class SessionManager {
this.flushBuffer = this.buffer
this.buffer = this.createBuffer()
this.stopRealtime()
// We don't want to keep writing unecessarily...
// We don't want to keep writing unnecessarily...
const { fileStream, file, count, eventsRange, sizeEstimate } = this.flushBuffer

if (count === 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ export class SessionRecordingIngester {
const sessions = Object.entries(this.sessions)

// NOTE: We want to avoid flushing too many sessions at once as it can cause a lot of disk backpressure stalling the consumer
await allSettledWithConcurrency(
const results = await allSettledWithConcurrency(
this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES,
sessions,
async ([key, sessionManager], ctx) => {
Expand Down Expand Up @@ -763,8 +763,9 @@ export class SessionRecordingIngester {
}
)
captureException(err, {
tags: { session_id: sessionManager.sessionId },
tags: { session_id: sessionManager.sessionId, error_context: 'failed-on-flush' },
})
throw err
})
.then(async () => {
// If the SessionManager is done (flushed and with no more queued events) then we remove it to free up memory
Expand All @@ -774,6 +775,13 @@ export class SessionRecordingIngester {
})
}
)
const errors = results.filter((r) => !!r.error).map((r) => r.error)
if (errors.length) {
status.error('🌶️', 'blob_ingester_consumer - failed to flush sessions', { errors })
throw new Error(
'Failed to flush sessions. With ' + errors.length + ' errors out of ' + results.length + ' sessions.'
)
}

gaugeSessionsHandled.set(Object.keys(this.sessions).length)
gaugeRealtimeSessions.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ const mockConsumer = {
getMetadata: jest.fn(),
}

// Mock the Upload class
jest.mock('@aws-sdk/lib-storage', () => {
return {
Upload: jest.fn().mockImplementation(({ params }) => {
const { Key } = params
if (Key.includes('throw')) {
throw new Error('Mocked error for key: ' + Key)
}
return {
done: jest.fn().mockResolvedValue(undefined),
abort: jest.fn().mockResolvedValue(undefined),
}
}),
}
})

jest.mock('../../../../src/kafka/batch-consumer', () => {
return {
startBatchConsumer: jest.fn(() =>
Expand Down Expand Up @@ -98,7 +114,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY)
await deleteKeys(hub)

ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, consumeOverflow, redisConn)
ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage!, consumeOverflow, redisConn)
await ingester.start()

mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)])
Expand Down Expand Up @@ -139,6 +155,20 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
)
}

it('when there is an S3 error', async () => {
await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'sid1-throw' }))
await ingester.consume(createIncomingRecordingMessage({ team_id: 2, session_id: 'sid2' }))
ingester.partitionMetrics[1] = { lastMessageTimestamp: 1000000, offsetLag: 0 }

expect(Object.keys(ingester.sessions).length).toBe(2)
expect(ingester.sessions['2-sid1-throw']).toBeDefined()
expect(ingester.sessions['2-sid2']).toBeDefined()

await expect(() => ingester.flushAllReadySessions(noop)).rejects.toThrow(
'Failed to flush sessions. With 1 errors out of 2 sessions.'
)
})

// disconnecting a producer is not safe to call multiple times
// in order to let us test stopping the ingester elsewhere
// in most tests we automatically stop the ingester during teardown
Expand All @@ -163,7 +193,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
const ingester = new SessionRecordingIngester(
config,
hub.postgres,
hub.objectStorage,
hub.objectStorage!,
consumeOverflow,
undefined
)
Expand All @@ -178,7 +208,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
const ingester = new SessionRecordingIngester(
config,
hub.postgres,
hub.objectStorage,
hub.objectStorage!,
consumeOverflow,
undefined
)
Expand Down Expand Up @@ -468,7 +498,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve
otherIngester = new SessionRecordingIngester(
config,
hub.postgres,
hub.objectStorage,
hub.objectStorage!,
consumeOverflow,
undefined
)
Expand Down

0 comments on commit 6eeae8e

Please sign in to comment.