diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index dbdd432aa28ea..e3e528ee09447 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -107,7 +107,7 @@ export class SessionRecordingIngesterV2 { recordingConsumerConfig: PluginsServerConfig topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS - private promises: Promise[] = [] + private promises: Set> = new Set() constructor( private serverConfig: PluginsServerConfig, @@ -182,8 +182,8 @@ export class SessionRecordingIngesterV2 { * Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished. * That way when shutting down we can wait for all promises to finish before exiting. */ - this.promises.push(promise) - promise.finally(() => (this.promises = this.promises.filter((p) => p !== promise))) + this.promises.add(promise) + promise.finally(() => this.promises.delete(promise)) return promise } @@ -553,16 +553,6 @@ export class SessionRecordingIngesterV2 { * As a result, we need to drop all sessions currently managed for the revoked partitions */ - /** - * IDEA - * - * 1. Pull out all sessions we are revoking. - * 2. Reset all the relevant metrics - that way we don't have to worry about races - * 3. Lock all the partitions we are revoking - gives us N seconds to deal with them - * 4. Flush all the sessions (probably with a timeout) - * 5. We can safely destroy them as they aren't pat of the sessions map - */ - const revokedPartitions = topicPartitions.map((x) => x.partition) if (!revokedPartitions.length) { return