From 7a3f2785093cc9fa8c8dbe9298ea997d9f3fb00c Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 19 Sep 2023 08:44:05 +0200 Subject: [PATCH] Fix tests --- .../session-recording/services/realtime-manager.ts | 1 + .../session-recording/session-recordings-consumer-v2.ts | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts index 86839a31af5f3..2643d3e267c5f 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts @@ -63,6 +63,7 @@ export class RealtimeManager extends EventEmitter { public async unsubscribe(): Promise { await this.pubsubRedis?.unsubscribe(Keys.realtimeSubscriptions()) + await this.pubsubRedis?.quit() } private async run(description: string, fn: (client: Redis) => Promise): Promise { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index 12799ebe0dc22..156fbc67ff646 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -546,7 +546,10 @@ export class SessionRecordingIngesterV2 { if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { status.info('🔁', `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`) await Promise.allSettled( - sessionsToDrop.map(([_, sessionManager]) => sessionManager.flush('partition_shutdown')) + sessionsToDrop + .map(([_, x]) => x) + .sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity) + .map((x) => x.flush('partition_shutdown')) ) }