Skip to content

Commit

Permalink
add consumer integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Mar 26, 2024
1 parent ccf13c3 commit 0f84eb3
Showing 1 changed file with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { randomUUID } from 'crypto'
import { Redis } from 'ioredis'
import { mkdirSync, readdirSync, rmSync } from 'node:fs'
import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka'
import { Message, TopicPartition, TopicPartitionOffset } from 'node-rdkafka'
import path from 'path'

import { waitForExpect } from '../../../../functional_tests/expectations'
Expand All @@ -12,10 +13,14 @@ import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql'
import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures'

const SESSION_RECORDING_REDIS_PREFIX = '@posthog-tests/replay/'
const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay'

const config: PluginsServerConfig = {
...defaultConfig,
SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: true,
SESSION_RECORDING_OVERFLOW_ENABLED: true,
SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 1_000_000, // 1MB burst
SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: 1_000, // 1kB/s replenish
SESSION_RECORDING_REDIS_PREFIX,
}

Expand Down Expand Up @@ -68,6 +73,7 @@ describe('ingester', () => {
let teamToken = ''
let mockOffsets: Record<number, number> = {}
let mockCommittedOffsets: Record<number, number> = {}
let redisConn: Redis

beforeAll(async () => {
mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true })
Expand Down Expand Up @@ -103,16 +109,21 @@ describe('ingester', () => {
;[hub, closeHub] = await createHub()
team = await getFirstTeam(hub)
teamToken = team.api_token
redisConn = await hub.redisPool.acquire(0)
await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY)

await deleteKeysWithPrefix(hub)

ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined)
ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, redisConn)
await ingester.start()

mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)])
})

afterEach(async () => {
jest.setTimeout(10000)
await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY)
await hub.redisPool.release(redisConn)
await deleteKeysWithPrefix(hub)
await ingester.stop()
await closeHub()
Expand All @@ -128,7 +139,7 @@ describe('ingester', () => {
await ingester.commitAllOffsets(ingester.partitionMetrics, Object.values(ingester.sessions))
}

const createMessage = (session_id: string, partition = 1) => {
const createMessage = (session_id: string, partition = 1, messageOverrides: Partial<Message> = {}) => {
mockOffsets[partition] = mockOffsets[partition] ?? 0
mockOffsets[partition]++

Expand All @@ -137,6 +148,7 @@ describe('ingester', () => {
{
partition,
offset: mockOffsets[partition],
...messageOverrides,
},
{
$session_id: session_id,
Expand Down Expand Up @@ -561,6 +573,46 @@ describe('ingester', () => {
})
})

describe('overflow detection', () => {
const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => {
const first_timestamp = Date.now() - 2 * timestamp_delta * count

// Because messages from the same batch are reduced into a single one, we call handleEachBatch
// with individual messages to have better control on the message timestamp
for (let n = 0; n < count; n++) {
const message = createMessage('sid1', 1, {
size: size_bytes,
timestamp: first_timestamp + n * timestamp_delta,
})
await ingester.handleEachBatch([message], noop)
}
}

it('should not trigger overflow if under threshold', async () => {
await ingestBurst(10, 100, 10)
expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0)
})

it('should trigger overflow during bursts', async () => {
const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds
await ingestBurst(10, 150_000, 10)

expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1)
expect(
await redisConn.zrangebyscore(
CAPTURE_OVERFLOW_REDIS_KEY,
expected_expiration - 10,
expected_expiration + 10
)
).toEqual([`${team.id}:sid1`])
})

it('should not trigger overflow during backfills', async () => {
await ingestBurst(10, 150_000, 150_000)
expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0)
})
})

describe('lag reporting', () => {
it('should return the latest offsets', async () => {
mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => {
Expand Down

0 comments on commit 0f84eb3

Please sign in to comment.