From 0f84eb32cb435d340787a1ff3a3d40349be8ed21 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 26 Mar 2024 11:28:51 +0100 Subject: [PATCH] add consumer integration test --- .../session-recordings-consumer.test.ts | 58 ++++++++++++++++++- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 7101528ed26ec..e4ff5f8522b20 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'crypto' +import { Redis } from 'ioredis' import { mkdirSync, readdirSync, rmSync } from 'node:fs' -import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka' +import { Message, TopicPartition, TopicPartitionOffset } from 'node-rdkafka' import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' @@ -12,10 +13,14 @@ import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql' import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures' const SESSION_RECORDING_REDIS_PREFIX = '@posthog-tests/replay/' +const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' const config: PluginsServerConfig = { ...defaultConfig, SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: true, + SESSION_RECORDING_OVERFLOW_ENABLED: true, + SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 1_000_000, // 1MB burst + SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: 1_000, // 1kB/s replenish SESSION_RECORDING_REDIS_PREFIX, } @@ -68,6 +73,7 @@ describe('ingester', () => { let teamToken = '' let mockOffsets: Record = {} let mockCommittedOffsets: Record = {} + let redisConn: Redis beforeAll(async () => { mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) @@ -103,9 +109,12 @@ describe('ingester', () => { ;[hub, closeHub] = await createHub() team = await getFirstTeam(hub) teamToken = team.api_token + redisConn = await hub.redisPool.acquire(0) + await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY) + await deleteKeysWithPrefix(hub) - ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, redisConn) await ingester.start() mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) @@ -113,6 +122,8 @@ describe('ingester', () => { afterEach(async () => { jest.setTimeout(10000) + await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY) + await hub.redisPool.release(redisConn) await deleteKeysWithPrefix(hub) await ingester.stop() await closeHub() @@ -128,7 +139,7 @@ describe('ingester', () => { await ingester.commitAllOffsets(ingester.partitionMetrics, Object.values(ingester.sessions)) } - const createMessage = (session_id: string, partition = 1) => { + const createMessage = (session_id: string, partition = 1, messageOverrides: Partial = {}) => { mockOffsets[partition] = mockOffsets[partition] ?? 0 mockOffsets[partition]++ @@ -137,6 +148,7 @@ describe('ingester', () => { { partition, offset: mockOffsets[partition], + ...messageOverrides, }, { $session_id: session_id, @@ -561,6 +573,46 @@ describe('ingester', () => { }) }) + describe('overflow detection', () => { + const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { + const first_timestamp = Date.now() - 2 * timestamp_delta * count + + // Because messages from the same batch are reduced into a single one, we call handleEachBatch + // with individual messages to have better control on the message timestamp + for (let n = 0; n < count; n++) { + const message = createMessage('sid1', 1, { + size: size_bytes, + timestamp: first_timestamp + n * timestamp_delta, + }) + await ingester.handleEachBatch([message], noop) + } + } + + it('should not trigger overflow if under threshold', async () => { + await ingestBurst(10, 100, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + it('should trigger overflow during bursts', async () => { + const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds + await ingestBurst(10, 150_000, 10) + + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect( + await redisConn.zrangebyscore( + CAPTURE_OVERFLOW_REDIS_KEY, + expected_expiration - 10, + expected_expiration + 10 + ) + ).toEqual([`${team.id}:sid1`]) + }) + + it('should not trigger overflow during backfills', async () => { + await ingestBurst(10, 150_000, 150_000) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + }) + describe('lag reporting', () => { it('should return the latest offsets', async () => { mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => {