diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 3e8a41d7cbf4c..7da306e9cad20 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -21,7 +21,6 @@ import { createRedisPool } from '../../../utils/db/redis' import { status } from '../../../utils/status' import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' import { ObjectStorage } from '../../services/object_storage' -import { runInstrumentedFunction } from '../../utils' import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' import { eventDroppedCounter } from '../metrics' import { ConsoleLogsIngester } from './services/console-logs-ingester' @@ -360,91 +359,54 @@ export class SessionRecordingIngester { }) } - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch`, - sendTimeoutGuardToSentry: false, - func: async () => { - histogramKafkaBatchSize.observe(messages.length) - histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) - - let recordingMessages: IncomingRecordingMessage[] - - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`, - func: async () => { - const { sessions, partitionStats } = await parseKafkaBatch( - messages, - (token) => - this.teamsRefresher.get().then((teams) => ({ - teamId: teams[token]?.teamId || null, - consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, - })), - this.sharedClusterProducerWrapper - ) - recordingMessages = sessions - for (const partitionStat of partitionStats) { - const metrics = this.partitionMetrics[partitionStat.partition] ?? {} - metrics.lastMessageOffset = partitionStat.offset - if (partitionStat.timestamp) { - // Could be empty on Kafka versions before KIP-32 - metrics.lastMessageTimestamp = partitionStat.timestamp - } - this.partitionMetrics[partitionStat.partition] = metrics - } - }, - }) - heartbeat() + histogramKafkaBatchSize.observe(messages.length) + histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) + + const { sessions: recordingMessages, partitionStats } = await parseKafkaBatch( + messages, + (token) => + this.teamsRefresher.get().then((teams) => ({ + teamId: teams[token]?.teamId || null, + consoleLogIngestionEnabled: teams[token]?.consoleLogIngestionEnabled ?? true, + })), + this.sharedClusterProducerWrapper + ) - await this.reportPartitionMetrics() + for (const partitionStat of partitionStats) { + const metrics = this.partitionMetrics[partitionStat.partition] ?? {} + metrics.lastMessageOffset = partitionStat.offset + if (partitionStat.timestamp) { + // Could be empty on Kafka versions before KIP-32 + metrics.lastMessageTimestamp = partitionStat.timestamp + } + this.partitionMetrics[partitionStat.partition] = metrics + } - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeBatch`, - func: async () => { - if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) { - await Promise.all(recordingMessages.map((x) => this.consume(x))) - } else { - for (const message of recordingMessages) { - await this.consume(message) - } - } - }, - }) + heartbeat() - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`, - func: async () => { - await this.flushAllReadySessions(heartbeat) - }, - }) + await this.reportPartitionMetrics() - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.commitAllOffsets`, - func: async () => { - await this.commitAllOffsets(this.partitionMetrics, Object.values(this.sessions)) - }, - }) + if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) { + await Promise.all(recordingMessages.map((x) => this.consume(x))) + } else { + for (const message of recordingMessages) { + await this.consume(message) + } + } - if (this.replayEventsIngester) { - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`, - func: async () => { - await this.replayEventsIngester!.consumeBatch(recordingMessages) - }, - }) - heartbeat() - } + await this.flushAllReadySessions(heartbeat) - if (this.consoleLogsIngester) { - await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeConsoleLogEvents`, - func: async () => { - await this.consoleLogsIngester!.consumeBatch(recordingMessages) - }, - }) - heartbeat() - } - }, - }) + await this.commitAllOffsets(this.partitionMetrics, Object.values(this.sessions)) + + if (this.replayEventsIngester) { + await this.replayEventsIngester!.consumeBatch(recordingMessages) + heartbeat() + } + + if (this.consoleLogsIngester) { + await this.consoleLogsIngester!.consumeBatch(recordingMessages) + heartbeat() + } } public async start(): Promise { @@ -687,38 +649,29 @@ export class SessionRecordingIngester { gaugeSessionsHandled.remove() const startTime = Date.now() - await runInstrumentedFunction({ - statsKey: `recordingingester.onRevokePartitions.revokeSessions`, - timeout: SHUTDOWN_FLUSH_TIMEOUT_MS, // same as the partition lock - func: async () => { - if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { - // Extend our claim on these partitions to give us time to flush - status.info( - '🔁', - `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...` - ) - - const sortedSessions = sessionsToDrop.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity) - - // Flush all the sessions we are supposed to drop - until a timeout - await allSettledWithConcurrency( - this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES, - sortedSessions, - async (sessionManager, ctx) => { - if (startTime + SHUTDOWN_FLUSH_TIMEOUT_MS < Date.now()) { - return ctx.break() - } - - await sessionManager.flush('partition_shutdown') - } - ) + if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { + // Extend our claim on these partitions to give us time to flush + status.info('🔁', `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`) + + const sortedSessions = sessionsToDrop.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity) + + // Flush all the sessions we are supposed to drop - until a timeout + await allSettledWithConcurrency( + this.config.SESSION_RECORDING_MAX_PARALLEL_FLUSHES, + sortedSessions, + async (sessionManager, ctx) => { + if (startTime + SHUTDOWN_FLUSH_TIMEOUT_MS < Date.now()) { + return ctx.break() + } - await this.commitAllOffsets(partitionsToDrop, sessionsToDrop) + await sessionManager.flush('partition_shutdown') } + ) - await Promise.allSettled(sessionsToDrop.map((x) => x.destroy())) - }, - }) + await this.commitAllOffsets(partitionsToDrop, sessionsToDrop) + } + + await Promise.allSettled(sessionsToDrop.map((x) => x.destroy())) } async flushAllReadySessions(heartbeat: () => void): Promise {