diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index 939df4cf80f0f..dbdd432aa28ea 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -107,6 +107,8 @@ export class SessionRecordingIngesterV2 { recordingConsumerConfig: PluginsServerConfig topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + private promises: Promise[] = [] + constructor( private serverConfig: PluginsServerConfig, private postgres: PostgresRouter, @@ -140,21 +142,21 @@ export class SessionRecordingIngesterV2 { this.offsetsRefresher = new BackgroundRefresher(async () => { const results = await Promise.all( - Object.keys(this.partitionAssignments).map(async (partition) => { + this.assignedTopicPartitions.map(async ({ partition }) => { return new Promise<[number, number]>((resolve, reject) => { if (!this.batchConsumer) { return reject('Not connected') } this.batchConsumer.consumer.queryWatermarkOffsets( KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, - parseInt(partition), + partition, (err, offsets) => { if (err) { status.error('🔥', 'Failed to query kafka watermark offsets', err) return reject() } - resolve([parseInt(partition), offsets.highOffset]) + resolve([partition, offsets.highOffset]) } ) }) @@ -168,6 +170,24 @@ export class SessionRecordingIngesterV2 { }, 5000) } + private get assignedTopicPartitions(): TopicPartition[] { + return Object.keys(this.partitionAssignments).map((partition) => ({ + partition: parseInt(partition), + topic: this.topic, + })) + } + + private scheduleWork(promise: Promise): Promise { + /** + * Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished. + * That way when shutting down we can wait for all promises to finish before exiting. + */ + this.promises.push(promise) + promise.finally(() => (this.promises = this.promises.filter((p) => p !== promise))) + + return promise + } + public async consume(event: IncomingRecordingMessage, sentrySpan?: Sentry.Span): Promise { // we have to reset this counter once we're consuming messages since then we know we're not re-balancing // otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever @@ -426,12 +446,7 @@ export class SessionRecordingIngesterV2 { if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { this.partitionLockInterval = setInterval(async () => { - await this.partitionLocker.claim( - Object.keys(this.partitionAssignments).map((partition) => ({ - partition: parseInt(partition), - topic: this.topic, - })) - ) + await this.partitionLocker.claim(this.assignedTopicPartitions) }, PARTITION_LOCK_INTERVAL_MS) } @@ -478,7 +493,7 @@ export class SessionRecordingIngesterV2 { } if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) { - return this.onRevokePartitions(topicPartitions) + return this.scheduleWork(this.onRevokePartitions(topicPartitions)) } // We had a "real" error @@ -510,23 +525,12 @@ export class SessionRecordingIngesterV2 { await this.batchConsumer?.stop() // Simulate a revoke command to try and flush all sessions - // The rebalance event should have done this but we do it again as an extra precaution and to await the flushes - await this.onRevokePartitions( - Object.keys(this.partitionAssignments).map((partition) => ({ - partition: parseInt(partition), - topic: this.topic, - })) as TopicPartition[] - ) + // There is a race between the revoke callback and this function - Either way one of them gets there and covers the revocations + void this.scheduleWork(this.onRevokePartitions(this.assignedTopicPartitions)) await this.realtimeManager.unsubscribe() await this.replayEventsIngester.stop() - - // This is inefficient but currently necessary due to new instances restarting from the committed offset point - await this.destroySessions(Object.entries(this.sessions)) - - this.sessions = {} - - gaugeRealtimeSessions.reset() + await Promise.allSettled(this.promises) } public isHealthy() { @@ -549,54 +553,68 @@ export class SessionRecordingIngesterV2 { * As a result, we need to drop all sessions currently managed for the revoked partitions */ + /** + * IDEA + * + * 1. Pull out all sessions we are revoking. + * 2. Reset all the relevant metrics - that way we don't have to worry about races + * 3. Lock all the partitions we are revoking - gives us N seconds to deal with them + * 4. Flush all the sessions (probably with a timeout) + * 5. We can safely destroy them as they aren't pat of the sessions map + */ + const revokedPartitions = topicPartitions.map((x) => x.partition) if (!revokedPartitions.length) { return } - const sessionsToDrop = Object.entries(this.sessions).filter(([_, sessionManager]) => - revokedPartitions.includes(sessionManager.partition) - ) + const sessionsToDrop: SessionManager[] = [] + + // First we pull out all sessions that are being dropped. This way if we get reassigned and start consuming, we don't accidentally destroy them + Object.entries(this.sessions).forEach(([key, sessionManager]) => { + if (revokedPartitions.includes(sessionManager.partition)) { + sessionsToDrop.push(sessionManager) + delete this.sessions[key] + } + }) + + // Reset all metrics for the revoked partitions + topicPartitions.forEach((topicPartition: TopicPartition) => { + const partition = topicPartition.partition + + delete this.partitionAssignments[partition] + gaugeLag.remove({ partition }) + gaugeLagMilliseconds.remove({ partition }) + gaugeOffsetCommitted.remove({ partition }) + gaugeOffsetCommitFailed.remove({ partition }) + this.offsetHighWaterMarker.revoke(topicPartition) + }) gaugeSessionsRevoked.set(sessionsToDrop.length) gaugeSessionsHandled.remove() - // Attempt to flush all sessions - // TODO: Improve this to - // - work from oldest to newest - // - have some sort of timeout so we don't get stuck here forever if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { + // Extend our claim on these partitions to give us time to flush + await this.partitionLocker.claim(topicPartitions) status.info('🔁', `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`) + // Flush all the sessions we are supposed to drop await runInstrumentedFunction({ statsKey: `recordingingester.onRevokePartitions.flushSessions`, logExecutionTime: true, func: async () => { await Promise.allSettled( sessionsToDrop - .map(([_, x]) => x) .sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity) .map((x) => x.flush('partition_shutdown')) ) }, }) - } - - topicPartitions.forEach((topicPartition: TopicPartition) => { - const partition = topicPartition.partition - delete this.partitionAssignments[partition] - gaugeLag.remove({ partition }) - gaugeLagMilliseconds.remove({ partition }) - gaugeOffsetCommitted.remove({ partition }) - gaugeOffsetCommitFailed.remove({ partition }) - this.offsetHighWaterMarker.revoke(topicPartition) - }) - - if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { await this.partitionLocker.release(topicPartitions) } - await this.destroySessions(sessionsToDrop) + + await Promise.allSettled(sessionsToDrop.map((x) => x.destroy())) await this.offsetsRefresher.refresh() }