diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index def72eea474bb..a6ee4e91a9b15 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -134,6 +134,7 @@ export function getDefaultConfig(): PluginsServerConfig { RUSTY_HOOK_FOR_TEAMS: '', RUSTY_HOOK_ROLLOUT_PERCENTAGE: 0, RUSTY_HOOK_URL: '', + CAPTURE_CONFIG_REDIS_HOST: null, STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes STARTUP_PROFILE_CPU: false, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-detection.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-detection.ts deleted file mode 100644 index 8b478b781bc95..0000000000000 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-detection.ts +++ /dev/null @@ -1,45 +0,0 @@ -import LRUCache from 'lru-cache' -import { Gauge } from 'prom-client' - -import { Limiter } from '../../../../utils/token-bucket' - -export enum OverflowState { - Okay, - Triggered, // Recently triggered the overflow detection - Cooldown, // Already triggered the overflow detection earlier than cooldownSeconds -} - -export const overflowTriggeredGauge = new Gauge({ - name: 'overflow_detection_triggered_total', - help: 'Number of entities that triggered overflow detection.', -}) - -/** - * OverflowDetection handles consumer-side detection of hot partitions by - * accounting for data volumes per entity (a session_id, a distinct_id...). - * - * The first time that the observed spike crosses the thresholds set via burstCapacity - * and replenishRate, observe returns Triggered. Subsequent calls will return Cooldown - * until cooldownSeconds is reached. - */ -export class OverflowDetection { - private limiter: Limiter - private triggered: LRUCache - - constructor(burstCapacity: number, replenishRate: number, cooldownSeconds: number) { - this.limiter = new Limiter(burstCapacity, replenishRate) - this.triggered = new LRUCache({ max: 1_000_000, maxAge: cooldownSeconds * 1000 }) - } - - public observe(key: string, quantity: number, now?: number): OverflowState { - if (this.triggered.has(key)) { - return OverflowState.Cooldown - } - if (this.limiter.consume(key, quantity, now)) { - return OverflowState.Okay - } - this.triggered.set(key, true) - overflowTriggeredGauge.inc(1) - return OverflowState.Triggered - } -} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-manager.ts new file mode 100644 index 0000000000000..f7a10a6a63593 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-manager.ts @@ -0,0 +1,60 @@ +import { Redis } from 'ioredis' +import LRUCache from 'lru-cache' +import { Gauge } from 'prom-client' + +import { Limiter } from '../../../../utils/token-bucket' + +export const overflowTriggeredGauge = new Gauge({ + name: 'overflow_detection_triggered_total', + help: 'Number of entities that triggered overflow detection.', +}) + +/** + * OverflowManager handles consumer-side detection of hot partitions by + * accounting for data volumes per entity (a session_id, a distinct_id...) + * and maintains the Redis set that capture reads to route messages. + * + * The first time that the observed spike crosses the thresholds set via burstCapacity + * and replenishRate, the key is added to Redis and the metrics incremented, subsequent + * calls will return early until cooldownSeconds is reached. + */ +export class OverflowManager { + private limiter: Limiter + private triggered: LRUCache + + constructor( + burstCapacity: number, + replenishRate: number, + private cooldownSeconds: number, + private redisKey: string, + private redisClient: Redis + ) { + this.limiter = new Limiter(burstCapacity, replenishRate) + this.triggered = new LRUCache({ max: 1_000_000, maxAge: cooldownSeconds * 1000 }) + } + + public async observe(key: string, quantity: number, now?: number): Promise { + if (this.triggered.has(key)) { + // Cooldown state, return early + return + } + if (this.limiter.consume(key, quantity, now)) { + // Not triggering overflow, return early + return + } + this.triggered.set(key, true) + overflowTriggeredGauge.inc(1) + + // Set the `NX` arguments to not update existing entries: if a session already triggered overflow, + // it's cooldown will not be extended after we restart the consumers. + // The zset value is a timestamp in seconds. + const expiration = (now ?? Date.now()) / 1000 + this.cooldownSeconds + await this.redisClient.zadd(this.redisKey, 'NX', expiration, key) + + // Cleanup old entries with values expired more than one hour ago. + // We run the cleanup here because we assume this will only run a dozen times per day per region. + // If this code path becomes too hot, it should move to a singleton loop. + const expired = (now ?? Date.now()) / 1000 - 3600 + await this.redisClient.zremrangebyscore(this.redisKey, 0, expired) + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 491044652d80f..2e84d7826c002 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -1,5 +1,6 @@ import { captureException } from '@sentry/node' import crypto from 'crypto' +import { Redis } from 'ioredis' import { mkdirSync, rmSync } from 'node:fs' import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' import { Counter, Gauge, Histogram } from 'prom-client' @@ -20,7 +21,7 @@ import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' import { eventDroppedCounter } from '../metrics' import { ConsoleLogsIngester } from './services/console-logs-ingester' import { OffsetHighWaterMarker } from './services/offset-high-water-marker' -import { OverflowDetection } from './services/overflow-detection' +import { OverflowManager } from './services/overflow-manager' import { RealtimeManager } from './services/realtime-manager' import { ReplayEventsIngester } from './services/replay-events-ingester' import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager' @@ -42,6 +43,7 @@ require('@sentry/tracing') const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob' const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000 +const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' const gaugeSessionsHandled = new Gauge({ name: 'recording_blob_ingestion_session_manager_count', @@ -129,7 +131,7 @@ export class SessionRecordingIngester { sessionHighWaterMarker: OffsetHighWaterMarker persistentHighWaterMarker: OffsetHighWaterMarker realtimeManager: RealtimeManager - overflowDetection?: OverflowDetection + overflowDetection?: OverflowManager replayEventsIngester?: ReplayEventsIngester consoleLogsIngester?: ConsoleLogsIngester batchConsumer?: BatchConsumer @@ -149,7 +151,8 @@ export class SessionRecordingIngester { constructor( private globalServerConfig: PluginsServerConfig, private postgres: PostgresRouter, - private objectStorage: ObjectStorage + private objectStorage: ObjectStorage, + captureRedis: Redis | undefined ) { this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION ? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION) @@ -162,11 +165,13 @@ export class SessionRecordingIngester { this.realtimeManager = new RealtimeManager(this.redisPool, this.config) - if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED) { - this.overflowDetection = new OverflowDetection( + if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) { + this.overflowDetection = new OverflowManager( globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY, globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE, - 24 * 3600 // One day + 24 * 3600, // One day, + CAPTURE_OVERFLOW_REDIS_KEY, + captureRedis ) } @@ -250,6 +255,8 @@ export class SessionRecordingIngester { const { team_id, session_id } = event const key = `${team_id}-${session_id}` + // TODO: use this for session key too if it's safe to do so + const overflowKey = `${team_id}:${session_id}` const { partition, highOffset } = event.metadata if (this.debugPartition === partition) { @@ -285,9 +292,6 @@ export class SessionRecordingIngester { return } - // TODO: update Redis if this triggers - this.overflowDetection?.observe(key, event.metadata.rawSize, event.metadata.timestamp) - if (!this.sessions[key]) { const { partition, topic } = event.metadata @@ -304,7 +308,10 @@ export class SessionRecordingIngester { ) } - await this.sessions[key]?.add(event) + await Promise.allSettled([ + this.sessions[key]?.add(event), + this.overflowDetection?.observe(overflowKey, event.metadata.rawSize, event.metadata.timestamp), + ]) } public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise { diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index f3a4362db6f83..8c910e1857b06 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -18,7 +18,7 @@ import { cancelAllScheduledJobs } from '../utils/node-schedule' import { PeriodicTask } from '../utils/periodic-task' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' -import { delay } from '../utils/utils' +import { createRedisClient, delay } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state' @@ -243,6 +243,12 @@ export async function startPluginsServer( // be super lightweight and ideally not do any IO. const healthChecks: { [service: string]: () => Promise | boolean } = {} + // Creating a dedicated single-connection redis client to this Redis, as it's not relevant for hobby + // and cloud deploys don't have concurrent uses. We should abstract multi-Redis into a router util. + const captureRedis = serverConfig.CAPTURE_CONFIG_REDIS_HOST + ? await createRedisClient(serverConfig.CAPTURE_CONFIG_REDIS_HOST) + : undefined + try { // Based on the mode the plugin server was started, we start a number of // different services. Mostly this is reasonably obvious from the name. @@ -440,7 +446,7 @@ export async function startPluginsServer( throw new Error("Can't start session recording blob ingestion without object storage") } // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas - const ingester = new SessionRecordingIngester(serverConfig, postgres, s3) + const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, captureRedis) await ingester.start() const batchConsumer = ingester.batchConsumer diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 114547cfe605f..36bbbc3752ece 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -207,6 +207,7 @@ export interface PluginsServerConfig { RUSTY_HOOK_URL: string SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean PIPELINE_STEP_STALLED_LOG_TIMEOUT: number + CAPTURE_CONFIG_REDIS_HOST: string | null // Redis cluster to use to coordinate with capture (overflow, routing) // dump profiles to disk, covering the first N seconds of runtime STARTUP_PROFILE_DURATION_SECONDS: number diff --git a/plugin-server/src/utils/utils.ts b/plugin-server/src/utils/utils.ts index 87fbe9e9e0640..a49a5161b4b3a 100644 --- a/plugin-server/src/utils/utils.ts +++ b/plugin-server/src/utils/utils.ts @@ -339,8 +339,12 @@ export async function createRedis(serverConfig: PluginsServerConfig): Promise { + const redis = new Redis(url, { + ...options, maxRetriesPerRequest: -1, }) let errorCounter = 0 diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/overflow-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/overflow-manager.test.ts new file mode 100644 index 0000000000000..875a7157dcf12 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/overflow-manager.test.ts @@ -0,0 +1,104 @@ +import { Redis } from 'ioredis' + +import { OverflowManager } from '../../../../../src/main/ingestion-queues/session-recording/services/overflow-manager' +import { Hub } from '../../../../../src/types' +import { createHub } from '../../../../../src/utils/db/hub' + +jest.mock('../../../../../src/utils/status') +jest.mock('../../../../../src/kafka/producer') + +const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' + +describe('overflow manager', () => { + let hub: Hub + let closeHub: () => Promise + let redis: Redis + let overflowManager: OverflowManager + + beforeAll(async () => { + ;[hub, closeHub] = await createHub() + redis = await hub.redisPool.acquire() + }) + beforeEach(async () => { + await redis.del(CAPTURE_OVERFLOW_REDIS_KEY) + overflowManager = new OverflowManager(10, 1, 3600, CAPTURE_OVERFLOW_REDIS_KEY, redis) + }) + + afterAll(async () => { + await redis.flushdb() + await hub.redisPool.release(redis) + await closeHub?.() + }) + + test('it does not trigger if several keys are under threshold', async () => { + await overflowManager.observe('key1', 8) + await overflowManager.observe('key2', 8) + await overflowManager.observe('key3', 8) + + expect(await redis.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + test('it triggers for hot keys', async () => { + await overflowManager.observe('key1', 4) + await overflowManager.observe('key1', 4) + await overflowManager.observe('key2', 8) + expect(await redis.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + + await overflowManager.observe('key1', 4) + expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual(['key1']) + }) + + test('it does not triggers twice when cooling down', async () => { + await overflowManager.observe('key1', 11) + expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual(['key1']) + + // Delete the key to confirm that OverflowManager is in cooldown for key1 and does not re-create it + await redis.del(CAPTURE_OVERFLOW_REDIS_KEY) + await overflowManager.observe('key1', 11) + expect(await redis.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + + // But it triggers for key2 + await overflowManager.observe('key2', 11) + expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual(['key2']) + }) + + test('it does not update existing values', async () => { + const timestamp = 1711280335000 + const oldTimestamp = timestamp / 1000 - 200 + await redis.zadd(CAPTURE_OVERFLOW_REDIS_KEY, oldTimestamp, 'key1') + + await overflowManager.observe('key1', 11, timestamp) + expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1, 'WITHSCORES')).toEqual([ + 'key1', + oldTimestamp.toString(), + ]) + }) + + test('it set the expected expiration on new values', async () => { + const timestamp = 1711280335000 + const oldTimestamp = timestamp / 1000 - 200 + await redis.zadd(CAPTURE_OVERFLOW_REDIS_KEY, oldTimestamp, 'key1') + + const expectedExpiration = timestamp / 1000 + 3600 + await overflowManager.observe('key2', 11, timestamp) + expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1, 'WITHSCORES')).toEqual([ + 'key1', + oldTimestamp.toString(), + 'key2', + expectedExpiration.toString(), + ]) + }) + + test('it removes old values when adding one', async () => { + const timestamp = 1711280335000 + const oldTimestamp = timestamp / 1000 - 8000 + await redis.zadd(CAPTURE_OVERFLOW_REDIS_KEY, oldTimestamp, 'key1') + + const expectedExpiration = timestamp / 1000 + 3600 + await overflowManager.observe('key2', 11, timestamp) + expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1, 'WITHSCORES')).toEqual([ + 'key2', + expectedExpiration.toString(), + ]) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 18dc39c7e5b2e..730fe28f481ac 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'crypto' +import { Redis } from 'ioredis' import { mkdirSync, readdirSync, rmSync } from 'node:fs' -import { TopicPartition, TopicPartitionOffset } from 'node-rdkafka' +import { Message, TopicPartition, TopicPartitionOffset } from 'node-rdkafka' import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' @@ -12,10 +13,14 @@ import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql' import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures' const SESSION_RECORDING_REDIS_PREFIX = '@posthog-tests/replay/' +const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' const config: PluginsServerConfig = { ...defaultConfig, SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: true, + SESSION_RECORDING_OVERFLOW_ENABLED: true, + SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 1_000_000, // 1MB burst + SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: 1_000, // 1kB/s replenish SESSION_RECORDING_REDIS_PREFIX, } @@ -68,6 +73,7 @@ describe('ingester', () => { let teamToken = '' let mockOffsets: Record = {} let mockCommittedOffsets: Record = {} + let redisConn: Redis beforeAll(async () => { mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) @@ -103,9 +109,12 @@ describe('ingester', () => { ;[hub, closeHub] = await createHub() team = await getFirstTeam(hub) teamToken = team.api_token + redisConn = await hub.redisPool.acquire(0) + await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY) + await deleteKeysWithPrefix(hub) - ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) + ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, redisConn) await ingester.start() mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) @@ -113,6 +122,8 @@ describe('ingester', () => { afterEach(async () => { jest.setTimeout(10000) + await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY) + await hub.redisPool.release(redisConn) await deleteKeysWithPrefix(hub) await ingester.stop() await closeHub() @@ -128,7 +139,7 @@ describe('ingester', () => { await ingester.commitAllOffsets(ingester.partitionMetrics, Object.values(ingester.sessions)) } - const createMessage = (session_id: string, partition = 1) => { + const createMessage = (session_id: string, partition = 1, messageOverrides: Partial = {}) => { mockOffsets[partition] = mockOffsets[partition] ?? 0 mockOffsets[partition]++ @@ -137,6 +148,7 @@ describe('ingester', () => { { partition, offset: mockOffsets[partition], + ...messageOverrides, }, { $session_id: session_id, @@ -150,7 +162,7 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) expect(ingester['debugPartition']).toEqual(103) }) @@ -159,7 +171,7 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) expect(ingester['debugPartition']).toBeUndefined() }) @@ -424,7 +436,7 @@ describe('ingester', () => { jest.setTimeout(5000) // Increased to cover lock delay beforeEach(async () => { - otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) + otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) await otherIngester.start() }) @@ -561,6 +573,62 @@ describe('ingester', () => { }) }) + describe('overflow detection', () => { + const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { + const first_timestamp = Date.now() - 2 * timestamp_delta * count + + // Because messages from the same batch are reduced into a single one, we call handleEachBatch + // with individual messages to have better control on the message timestamp + for (let n = 0; n < count; n++) { + const message = createMessage('sid1', 1, { + size: size_bytes, + timestamp: first_timestamp + n * timestamp_delta, + }) + await ingester.handleEachBatch([message], noop) + } + } + + it('should not trigger overflow if under threshold', async () => { + await ingestBurst(10, 100, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + it('should trigger overflow during bursts', async () => { + const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds + await ingestBurst(10, 150_000, 10) + + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect( + await redisConn.zrangebyscore( + CAPTURE_OVERFLOW_REDIS_KEY, + expected_expiration - 10, + expected_expiration + 10 + ) + ).toEqual([`${team.id}:sid1`]) + }) + + it('should not trigger overflow during backfills', async () => { + await ingestBurst(10, 150_000, 150_000) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + it('should cleanup older entries when triggering', async () => { + await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 7000, 'expired:session') + await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 1000, 'not_expired:session') + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'expired:session', + 'not_expired:session', + ]) + + await ingestBurst(10, 150_000, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'not_expired:session', + `${team.id}:sid1`, + ]) + }) + }) + describe('lag reporting', () => { it('should return the latest offsets', async () => { mockConsumer.queryWatermarkOffsets.mockImplementation((_topic, partition, _timeout, cb) => {