Skip to content

Commit

Permalink
add and wire OverflowDetection, report a metric only for now
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Mar 20, 2024
1 parent 235f7c6 commit 2e8441b
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 0 deletions.
3 changes: 3 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ export function getDefaultConfig(): PluginsServerConfig {
SESSION_RECORDING_DEBUG_PARTITION: undefined,
SESSION_RECORDING_KAFKA_DEBUG: undefined,
SESSION_RECORDING_MAX_PARALLEL_FLUSHES: 10,
SESSION_RECORDING_OVERFLOW_ENABLED: false,
SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: 1_000_000, // 1MB/second uncompressed, sustained
SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 100_000_000, // 100MB burst
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import LRUCache from 'lru-cache'
import { Gauge } from 'prom-client'

import { Limiter } from '../../../../utils/token-bucket'

export enum OverflowState {
Okay,
Triggered, // Recently triggered the overflow detection
Cooldown, // Already triggered the overflow detection earlier than cooldownSeconds
}

export const overflowTriggeredGauge = new Gauge({
name: 'overflow_detection_triggered_total',
help: 'Number of entities that triggered overflow detection.',
})

/**
* OverflowDetection handles consumer-side detection of hot partitions by
* accounting for data volumes per entity (a session_id, a distinct_id...).
*
* The first time that the observed spike crosses the thresholds set via burstCapacity
* and replenishRate, observe returns Triggered. Subsequent calls will return Cooldown
* until cooldownSeconds is reached.
*/
export class OverflowDetection {
private limiter: Limiter
private triggered: LRUCache<string, boolean>

constructor(burstCapacity: number, replenishRate: number, cooldownSeconds: number) {
this.limiter = new Limiter(burstCapacity, replenishRate)
this.triggered = new LRUCache({ max: 1_000_000, maxAge: cooldownSeconds * 1000 })
}

public observe(key: string, quantity: number, now?: number): OverflowState {
if (this.triggered.has(key)) {
return OverflowState.Cooldown
}
if (this.limiter.consume(key, quantity, now)) {
return OverflowState.Okay
}
this.triggered.set(key, true)
overflowTriggeredGauge.inc(1)
return OverflowState.Triggered
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
import { OffsetHighWaterMarker } from './services/offset-high-water-marker'
import { OverflowDetection } from './services/overflow-detection'
import { RealtimeManager } from './services/realtime-manager'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager'
Expand Down Expand Up @@ -128,6 +129,7 @@ export class SessionRecordingIngester {
sessionHighWaterMarker: OffsetHighWaterMarker
persistentHighWaterMarker: OffsetHighWaterMarker
realtimeManager: RealtimeManager
overflowDetection?: OverflowDetection
replayEventsIngester?: ReplayEventsIngester
consoleLogsIngester?: ConsoleLogsIngester
batchConsumer?: BatchConsumer
Expand Down Expand Up @@ -160,6 +162,14 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED) {
this.overflowDetection = new OverflowDetection(
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE,
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY,
24 * 3600 // One day
)
}

// We create a hash of the cluster to use as a unique identifier for the high-water marks
// This enables us to swap clusters without having to worry about resetting the high-water marks
const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export type IncomingRecordingMessage = {
metadata: {
topic: string
partition: number
rawSize: number
lowOffset: number
highOffset: number
timestamp: number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ export const parseKafkaMessage = async (
metadata: {
partition: message.partition,
topic: message.topic,
rawSize: message.size,
lowOffset: message.offset,
highOffset: message.offset,
timestamp: message.timestamp,
Expand Down Expand Up @@ -267,6 +268,7 @@ export const reduceRecordingMessages = (messages: IncomingRecordingMessage[]): I
existingMessage.eventsByWindowId[windowId] = events
}
}
existingMessage.metadata.rawSize += clonedMessage.metadata.rawSize

// Update the events ranges
existingMessage.metadata.lowOffset = Math.min(
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ export interface PluginsServerConfig {
// a single partition which will output many more log messages to the console
// useful when that partition is lagging unexpectedly
SESSION_RECORDING_DEBUG_PARTITION: string | undefined
// overflow detection, updating Redis for capture to move the traffic away
SESSION_RECORDING_OVERFLOW_ENABLED: boolean
SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: number
SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: number

// Dedicated infra values
SESSION_RECORDING_KAFKA_HOSTS: string | undefined
Expand Down

0 comments on commit 2e8441b

Please sign in to comment.