diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-manager.ts index de9027cee4058..f7a10a6a63593 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-manager.ts @@ -50,5 +50,11 @@ export class OverflowManager { // The zset value is a timestamp in seconds. const expiration = (now ?? Date.now()) / 1000 + this.cooldownSeconds await this.redisClient.zadd(this.redisKey, 'NX', expiration, key) + + // Cleanup old entries with values expired more than one hour ago. + // We run the cleanup here because we assume this will only run a dozen times per day per region. + // If this code path becomes too hot, it should move to a singleton loop. + const expired = (now ?? Date.now()) / 1000 - 3600 + await this.redisClient.zremrangebyscore(this.redisKey, 0, expired) } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index e4ff5f8522b20..730fe28f481ac 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -611,6 +611,22 @@ describe('ingester', () => { await ingestBurst(10, 150_000, 150_000) expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) }) + + it('should cleanup older entries when triggering', async () => { + await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 7000, 'expired:session') + await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 1000, 'not_expired:session') + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'expired:session', + 'not_expired:session', + ]) + + await ingestBurst(10, 150_000, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'not_expired:session', + `${team.id}:sid1`, + ]) + }) }) describe('lag reporting', () => {