From 2e8441b300850afc91bc1c6035e2ef23b2137984 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 13 Mar 2024 14:52:35 +0100 Subject: [PATCH] add and wire OverflowDetection, report a metric only for now --- plugin-server/src/config/config.ts | 3 ++ .../services/overflow-detection.ts | 45 +++++++++++++++++++ .../session-recordings-consumer.ts | 10 +++++ .../session-recording/types.ts | 1 + .../session-recording/utils.ts | 2 + plugin-server/src/types.ts | 4 ++ 6 files changed, 65 insertions(+) create mode 100644 plugin-server/src/main/ingestion-queues/session-recording/services/overflow-detection.ts diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 8e9b50afb9528d..b5fa1ce899f635 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -163,6 +163,9 @@ export function getDefaultConfig(): PluginsServerConfig { SESSION_RECORDING_DEBUG_PARTITION: undefined, SESSION_RECORDING_KAFKA_DEBUG: undefined, SESSION_RECORDING_MAX_PARALLEL_FLUSHES: 10, + SESSION_RECORDING_OVERFLOW_ENABLED: false, + SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: 1_000_000, // 1MB/second uncompressed, sustained + SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 100_000_000, // 100MB burst } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-detection.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-detection.ts new file mode 100644 index 00000000000000..8b478b781bc959 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/overflow-detection.ts @@ -0,0 +1,45 @@ +import LRUCache from 'lru-cache' +import { Gauge } from 'prom-client' + +import { Limiter } from '../../../../utils/token-bucket' + +export enum OverflowState { + Okay, + Triggered, // Recently triggered the overflow detection + Cooldown, // Already triggered the overflow detection earlier than cooldownSeconds +} + +export const overflowTriggeredGauge = new Gauge({ + name: 'overflow_detection_triggered_total', + help: 'Number of entities that triggered overflow detection.', +}) + +/** + * OverflowDetection handles consumer-side detection of hot partitions by + * accounting for data volumes per entity (a session_id, a distinct_id...). + * + * The first time that the observed spike crosses the thresholds set via burstCapacity + * and replenishRate, observe returns Triggered. Subsequent calls will return Cooldown + * until cooldownSeconds is reached. + */ +export class OverflowDetection { + private limiter: Limiter + private triggered: LRUCache + + constructor(burstCapacity: number, replenishRate: number, cooldownSeconds: number) { + this.limiter = new Limiter(burstCapacity, replenishRate) + this.triggered = new LRUCache({ max: 1_000_000, maxAge: cooldownSeconds * 1000 }) + } + + public observe(key: string, quantity: number, now?: number): OverflowState { + if (this.triggered.has(key)) { + return OverflowState.Cooldown + } + if (this.limiter.consume(key, quantity, now)) { + return OverflowState.Okay + } + this.triggered.set(key, true) + overflowTriggeredGauge.inc(1) + return OverflowState.Triggered + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 30aaab4a023d57..89e883c82c26e2 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -20,6 +20,7 @@ import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' import { eventDroppedCounter } from '../metrics' import { ConsoleLogsIngester } from './services/console-logs-ingester' import { OffsetHighWaterMarker } from './services/offset-high-water-marker' +import { OverflowDetection } from './services/overflow-detection' import { RealtimeManager } from './services/realtime-manager' import { ReplayEventsIngester } from './services/replay-events-ingester' import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager' @@ -128,6 +129,7 @@ export class SessionRecordingIngester { sessionHighWaterMarker: OffsetHighWaterMarker persistentHighWaterMarker: OffsetHighWaterMarker realtimeManager: RealtimeManager + overflowDetection?: OverflowDetection replayEventsIngester?: ReplayEventsIngester consoleLogsIngester?: ConsoleLogsIngester batchConsumer?: BatchConsumer @@ -160,6 +162,14 @@ export class SessionRecordingIngester { this.realtimeManager = new RealtimeManager(this.redisPool, this.config) + if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED) { + this.overflowDetection = new OverflowDetection( + globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE, + globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY, + 24 * 3600 // One day + ) + } + // We create a hash of the cluster to use as a unique identifier for the high-water marks // This enables us to swap clusters without having to worry about resetting the high-water marks const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex') diff --git a/plugin-server/src/main/ingestion-queues/session-recording/types.ts b/plugin-server/src/main/ingestion-queues/session-recording/types.ts index 254e3f0897ee77..d61dadda9279eb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/types.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/types.ts @@ -6,6 +6,7 @@ export type IncomingRecordingMessage = { metadata: { topic: string partition: number + rawSize: number lowOffset: number highOffset: number timestamp: number diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 4b4345d43b48d8..53ce953e5bd926 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -225,6 +225,7 @@ export const parseKafkaMessage = async ( metadata: { partition: message.partition, topic: message.topic, + rawSize: message.size, lowOffset: message.offset, highOffset: message.offset, timestamp: message.timestamp, @@ -267,6 +268,7 @@ export const reduceRecordingMessages = (messages: IncomingRecordingMessage[]): I existingMessage.eventsByWindowId[windowId] = events } } + existingMessage.metadata.rawSize += clonedMessage.metadata.rawSize // Update the events ranges existingMessage.metadata.lowOffset = Math.min( diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index b8eeb5b296a9ee..114547cfe605fc 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -230,6 +230,10 @@ export interface PluginsServerConfig { // a single partition which will output many more log messages to the console // useful when that partition is lagging unexpectedly SESSION_RECORDING_DEBUG_PARTITION: string | undefined + // overflow detection, updating Redis for capture to move the traffic away + SESSION_RECORDING_OVERFLOW_ENABLED: boolean + SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: number + SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: number // Dedicated infra values SESSION_RECORDING_KAFKA_HOSTS: string | undefined