Skip to content

Commit

Permalink
fix: does run instrumented function leak promises (#26069)
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Nov 11, 2024
1 parent babbe33 commit 36d37c6
Showing 1 changed file with 62 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { createRedisPool } from '../../../utils/db/redis'
import { status } from '../../../utils/status'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
import { ObjectStorage } from '../../services/object_storage'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
Expand Down Expand Up @@ -360,91 +359,54 @@ export class SessionRecordingIngester {
})
}

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch`,
sendTimeoutGuardToSentry: false,
func: async () => {
histogramKafkaBatchSize.observe(messages.length)
histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024)

let recordingMessages: IncomingRecordingMessage[]

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`,
func: async () => {
const { sessions, partitionStats } = await parseKafkaBatch(
messages,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
this.sharedClusterProducerWrapper
)
recordingMessages = sessions
for (const partitionStat of partitionStats) {
const metrics = this.partitionMetrics[partitionStat.partition] ?? {}
metrics.lastMessageOffset = partitionStat.offset
if (partitionStat.timestamp) {
// Could be empty on Kafka versions before KIP-32
metrics.lastMessageTimestamp = partitionStat.timestamp
}
this.partitionMetrics[partitionStat.partition] = metrics
}
},
})
heartbeat()
histogramKafkaBatchSize.observe(messages.length)
histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024)

const { sessions: recordingMessages, partitionStats } = await parseKafkaBatch(
messages,
(token) =>
this.teamsRefresher.get().then((teams) => ({
teamId: teams[token]?.teamId || null,
consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true,
})),
this.sharedClusterProducerWrapper
)

await this.reportPartitionMetrics()
for (const partitionStat of partitionStats) {
const metrics = this.partitionMetrics[partitionStat.partition] ?? {}
metrics.lastMessageOffset = partitionStat.offset
if (partitionStat.timestamp) {
// Could be empty on Kafka versions before KIP-32
metrics.lastMessageTimestamp = partitionStat.timestamp
}
this.partitionMetrics[partitionStat.partition] = metrics
}

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeBatch`,
func: async () => {
if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
await Promise.all(recordingMessages.map((x) => this.consume(x)))
} else {
for (const message of recordingMessages) {
await this.consume(message)
}
}
},
})
heartbeat()

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`,
func: async () => {
await this.flushAllReadySessions(heartbeat)
},
})
await this.reportPartitionMetrics()

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.commitAllOffsets`,
func: async () => {
await this.commitAllOffsets(this.partitionMetrics, Object.values(this.sessions))
},
})
if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
await Promise.all(recordingMessages.map((x) => this.consume(x)))
} else {
for (const message of recordingMessages) {
await this.consume(message)
}
}

if (this.replayEventsIngester) {
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`,
func: async () => {
await this.replayEventsIngester!.consumeBatch(recordingMessages)
},
})
heartbeat()
}
await this.flushAllReadySessions(heartbeat)

if (this.consoleLogsIngester) {
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeConsoleLogEvents`,
func: async () => {
await this.consoleLogsIngester!.consumeBatch(recordingMessages)
},
})
heartbeat()
}
},
})
await this.commitAllOffsets(this.partitionMetrics, Object.values(this.sessions))

if (this.replayEventsIngester) {
await this.replayEventsIngester!.consumeBatch(recordingMessages)
heartbeat()
}

if (this.consoleLogsIngester) {
await this.consoleLogsIngester!.consumeBatch(recordingMessages)
heartbeat()
}
}

public async start(): Promise<void> {
Expand Down Expand Up @@ -687,38 +649,29 @@ export class SessionRecordingIngester {
gaugeSessionsHandled.remove()

const startTime = Date.now()
await runInstrumentedFunction({
statsKey: `recordingingester.onRevokePartitions.revokeSessions`,
timeout: SHUTDOWN_FLUSH_TIMEOUT_MS, // same as the partition lock
func: async () => {
if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
// Extend our claim on these partitions to give us time to flush
status.info(
'🔁',
`blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`
)

const sortedSessions = sessionsToDrop.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity)

// Flush all the sessions we are supposed to drop - until a timeout
await allSettledWithConcurrency(
this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES,
sortedSessions,
async (sessionManager, ctx) => {
if (startTime + SHUTDOWN_FLUSH_TIMEOUT_MS < Date.now()) {
return ctx.break()
}

await sessionManager.flush('partition_shutdown')
}
)
if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
// Extend our claim on these partitions to give us time to flush
status.info('🔁', `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`)

const sortedSessions = sessionsToDrop.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity)

// Flush all the sessions we are supposed to drop - until a timeout
await allSettledWithConcurrency(
this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES,
sortedSessions,
async (sessionManager, ctx) => {
if (startTime + SHUTDOWN_FLUSH_TIMEOUT_MS < Date.now()) {
return ctx.break()
}

await this.commitAllOffsets(partitionsToDrop, sessionsToDrop)
await sessionManager.flush('partition_shutdown')
}
)

await Promise.allSettled(sessionsToDrop.map((x) => x.destroy()))
},
})
await this.commitAllOffsets(partitionsToDrop, sessionsToDrop)
}

await Promise.allSettled(sessionsToDrop.map((x) => x.destroy()))
}

async flushAllReadySessions(heartbeat: () => void): Promise<void> {
Expand Down

0 comments on commit 36d37c6

Please sign in to comment.