Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blobby): overflow detection updates the redis set with keys that trigger #21154

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ export function getDefaultConfig(): PluginsServerConfig {
RUSTY_HOOK_FOR_TEAMS: '',
RUSTY_HOOK_ROLLOUT_PERCENTAGE: 0,
RUSTY_HOOK_URL: '',
CAPTURE_CONFIG_REDIS_HOST: null,

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Redis } from 'ioredis'
import LRUCache from 'lru-cache'
import { Gauge } from 'prom-client'

import { Limiter } from '../../../../utils/token-bucket'

export const overflowTriggeredGauge = new Gauge({
name: 'overflow_detection_triggered_total',
help: 'Number of entities that triggered overflow detection.',
})

/**
* OverflowManager handles consumer-side detection of hot partitions by
* accounting for data volumes per entity (a session_id, a distinct_id...)
* and maintains the Redis set that capture reads to route messages.
*
* The first time that the observed spike crosses the thresholds set via burstCapacity
* and replenishRate, the key is added to Redis and the metrics incremented, subsequent
* calls will return early until cooldownSeconds is reached.
*/
export class OverflowManager {
private limiter: Limiter
private triggered: LRUCache<string, boolean>

constructor(
burstCapacity: number,
replenishRate: number,
private cooldownSeconds: number,
private redisKey: string,
private redisClient: Redis
) {
this.limiter = new Limiter(burstCapacity, replenishRate)
this.triggered = new LRUCache({ max: 1_000_000, maxAge: cooldownSeconds * 1000 })
}

public async observe(key: string, quantity: number, now?: number): Promise<void> {
if (this.triggered.has(key)) {
// Cooldown state, return early
return
}
if (this.limiter.consume(key, quantity, now)) {
// Not triggering overflow, return early
return
}
this.triggered.set(key, true)
overflowTriggeredGauge.inc(1)

// Set the `NX` arguments to not update existing entries: if a session already triggered overflow,
// it's cooldown will not be extended after we restart the consumers.
// The zset value is a timestamp in seconds.
const expiration = (now ?? Date.now()) / 1000 + this.cooldownSeconds
await this.redisClient.zadd(this.redisKey, 'NX', expiration, key)

// Cleanup old entries with values expired more than one hour ago.
// We run the cleanup here because we assume this will only run a dozen times per day per region.
// If this code path becomes too hot, it should move to a singleton loop.
const expired = (now ?? Date.now()) / 1000 - 3600
await this.redisClient.zremrangebyscore(this.redisKey, 0, expired)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { captureException } from '@sentry/node'
import crypto from 'crypto'
import { Redis } from 'ioredis'
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import { Counter, Gauge, Histogram } from 'prom-client'
Expand All @@ -20,7 +21,7 @@ import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
import { OffsetHighWaterMarker } from './services/offset-high-water-marker'
import { OverflowDetection } from './services/overflow-detection'
import { OverflowManager } from './services/overflow-manager'
import { RealtimeManager } from './services/realtime-manager'
import { ReplayEventsIngester } from './services/replay-events-ingester'
import { BUCKETS_KB_WRITTEN, SessionManager } from './services/session-manager'
Expand All @@ -42,6 +43,7 @@ require('@sentry/tracing')
const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob'
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000
const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000
const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay'

const gaugeSessionsHandled = new Gauge({
name: 'recording_blob_ingestion_session_manager_count',
Expand Down Expand Up @@ -129,7 +131,7 @@ export class SessionRecordingIngester {
sessionHighWaterMarker: OffsetHighWaterMarker
persistentHighWaterMarker: OffsetHighWaterMarker
realtimeManager: RealtimeManager
overflowDetection?: OverflowDetection
overflowDetection?: OverflowManager
replayEventsIngester?: ReplayEventsIngester
consoleLogsIngester?: ConsoleLogsIngester
batchConsumer?: BatchConsumer
Expand All @@ -149,7 +151,8 @@ export class SessionRecordingIngester {
constructor(
private globalServerConfig: PluginsServerConfig,
private postgres: PostgresRouter,
private objectStorage: ObjectStorage
private objectStorage: ObjectStorage,
captureRedis: Redis | undefined
) {
this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION
? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION)
Expand All @@ -162,11 +165,13 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED) {
this.overflowDetection = new OverflowDetection(
if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) {
this.overflowDetection = new OverflowManager(
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY,
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE,
24 * 3600 // One day
24 * 3600, // One day,
CAPTURE_OVERFLOW_REDIS_KEY,
captureRedis
)
}

Expand Down Expand Up @@ -250,6 +255,8 @@ export class SessionRecordingIngester {

const { team_id, session_id } = event
const key = `${team_id}-${session_id}`
// TODO: use this for session key too if it's safe to do so
const overflowKey = `${team_id}:${session_id}`
Comment on lines 257 to +259
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the comment mean whether we should use the same for key and overflowKey?

i think safe to do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll do it in a separate no-op PR, as making this PR pass tests might be complicated already, and I don't want to add noise


const { partition, highOffset } = event.metadata
if (this.debugPartition === partition) {
Expand Down Expand Up @@ -285,9 +292,6 @@ export class SessionRecordingIngester {
return
}

// TODO: update Redis if this triggers
this.overflowDetection?.observe(key, event.metadata.rawSize, event.metadata.timestamp)

if (!this.sessions[key]) {
const { partition, topic } = event.metadata

Expand All @@ -304,7 +308,10 @@ export class SessionRecordingIngester {
)
}

await this.sessions[key]?.add(event)
await Promise.allSettled([
this.sessions[key]?.add(event),
this.overflowDetection?.observe(overflowKey, event.metadata.rawSize, event.metadata.timestamp),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could void this promise instead of waiting on it since it's ok for some to fail maybe, but it should be fast enough that that's bike-shedding and we'll see the impact if it isn't fast enough so ignore as nit-picking 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only async part is the redis ZADD when triggering on the first time, the detection itself does not yield, so I don't think voiding it would help throughput 🤔
I'd say let's rollout and re-evaluate if there's impact. When merging, overflow will be disabled, it'll be re-enabled when rolling out the new CAPTURE_CONFIG_REDIS_HOST

])
}

public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
Expand Down
10 changes: 8 additions & 2 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PeriodicTask } from '../utils/periodic-task'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { createRedisClient, delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state'
Expand Down Expand Up @@ -243,6 +243,12 @@ export async function startPluginsServer(
// be super lightweight and ideally not do any IO.
const healthChecks: { [service: string]: () => Promise<boolean> | boolean } = {}

// Creating a dedicated single-connection redis client to this Redis, as it's not relevant for hobby
// and cloud deploys don't have concurrent uses. We should abstract multi-Redis into a router util.
const captureRedis = serverConfig.CAPTURE_CONFIG_REDIS_HOST
? await createRedisClient(serverConfig.CAPTURE_CONFIG_REDIS_HOST)
: undefined

try {
// Based on the mode the plugin server was started, we start a number of
// different services. Mostly this is reasonably obvious from the name.
Expand Down Expand Up @@ -440,7 +446,7 @@ export async function startPluginsServer(
throw new Error("Can't start session recording blob ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3)
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, captureRedis)
await ingester.start()

const batchConsumer = ingester.batchConsumer
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ export interface PluginsServerConfig {
RUSTY_HOOK_URL: string
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean
PIPELINE_STEP_STALLED_LOG_TIMEOUT: number
CAPTURE_CONFIG_REDIS_HOST: string | null // Redis cluster to use to coordinate with capture (overflow, routing)

// dump profiles to disk, covering the first N seconds of runtime
STARTUP_PROFILE_DURATION_SECONDS: number
Expand Down
8 changes: 6 additions & 2 deletions plugin-server/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,12 @@ export async function createRedis(serverConfig: PluginsServerConfig): Promise<Re
}
: undefined

const redis = new Redis(credentials ? serverConfig.POSTHOG_REDIS_HOST : serverConfig.REDIS_URL, {
...credentials,
return createRedisClient(credentials ? serverConfig.POSTHOG_REDIS_HOST : serverConfig.REDIS_URL, credentials)
}

export async function createRedisClient(url: string, options?: RedisOptions): Promise<Redis.Redis> {
const redis = new Redis(url, {
...options,
maxRetriesPerRequest: -1,
})
let errorCounter = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Redis } from 'ioredis'

import { OverflowManager } from '../../../../../src/main/ingestion-queues/session-recording/services/overflow-manager'
import { Hub } from '../../../../../src/types'
import { createHub } from '../../../../../src/utils/db/hub'

jest.mock('../../../../../src/utils/status')
jest.mock('../../../../../src/kafka/producer')

const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay'
const TIMESTAMP_IN_2017 = 1511220335

describe('overflow manager', () => {
let hub: Hub
let closeHub: () => Promise<void>
let redis: Redis
let overflowManager: OverflowManager

beforeAll(async () => {
;[hub, closeHub] = await createHub()
redis = await hub.redisPool.acquire()
})
beforeEach(async () => {
await redis.del(CAPTURE_OVERFLOW_REDIS_KEY)
overflowManager = new OverflowManager(10, 1, 3600, CAPTURE_OVERFLOW_REDIS_KEY, redis)
})

afterAll(async () => {
await redis.flushdb()
await hub.redisPool.release(redis)
await closeHub?.()
})

test('it does not trigger if several keys are under threshold', async () => {
await overflowManager.observe('key1', 8)
await overflowManager.observe('key2', 8)
await overflowManager.observe('key3', 8)

expect(await redis.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0)
})

test('it triggers for hot keys', async () => {
await overflowManager.observe('key1', 4)
await overflowManager.observe('key1', 4)
await overflowManager.observe('key2', 8)
expect(await redis.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0)

await overflowManager.observe('key1', 4)
expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual(['key1'])
})

test('it does not triggers twice when cooling down', async () => {
await overflowManager.observe('key1', 11)
expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual(['key1'])

// Delete the key to confirm that OverflowManager is in cooldown for key1 and does not re-create it
await redis.del(CAPTURE_OVERFLOW_REDIS_KEY)
await overflowManager.observe('key1', 11)
expect(await redis.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0)

// But it triggers for key2
await overflowManager.observe('key2', 11)
expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual(['key2'])
})

test('it does not update existing values', async () => {
await redis.zadd(CAPTURE_OVERFLOW_REDIS_KEY, TIMESTAMP_IN_2017, 'key1')

await overflowManager.observe('key1', 11)
expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1, 'WITHSCORES')).toEqual([
'key1',
TIMESTAMP_IN_2017.toString(),
])
})

test('it set the expected expiration on new values', async () => {
await redis.zadd(CAPTURE_OVERFLOW_REDIS_KEY, TIMESTAMP_IN_2017, 'key1')

const timestamp = 1711280335000
const expectedExpiration = timestamp / 1000 + 3600
await overflowManager.observe('key2', 11, timestamp)
expect(await redis.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1, 'WITHSCORES')).toEqual([
'key1',
TIMESTAMP_IN_2017.toString(),
'key2',
expectedExpiration.toString(),
])
})
})
Loading
Loading