Skip to content

Commit

Permalink
feat(blobby): add new recordings-blob-ingestion-overflow role
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Mar 27, 2024
1 parent 4edcf1b commit c39f28d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 10 deletions.
7 changes: 6 additions & 1 deletion plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED,
personOverrides: true,
appManagementSingleton: true,
preflightSchedules: true,
Expand Down Expand Up @@ -55,7 +56,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
sessionRecordingBlobIngestion: true,
...sharedCapabilities,
}

case PluginServerMode.recordings_blob_ingestion_overflow:
return {
sessionRecordingBlobOverflowIngestion: true,
...sharedCapabilities,
}
case PluginServerMode.recordings_ingestion_v3:
return {
sessionRecordingV3Ingestion: true,
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffi

// read session recording snapshot items
export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}`
export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = `${prefix}session_recording_snapshot_item_overflow${suffix}`

// write session recording and replay events to ClickHouse
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartit
import { Counter, Gauge, Histogram } from 'prom-client'

import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import {
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
} from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { PluginsServerConfig, RedisPool, TeamId } from '../../../types'
Expand Down Expand Up @@ -41,6 +44,7 @@ require('@sentry/tracing')

// WARNING: Do not change this - it will essentially reset the consumer
const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob'
const KAFKA_CONSUMER_GROUP_ID_OVERFLOW = 'session-recordings-blob-overflow'
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000
const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000
const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay'
Expand Down Expand Up @@ -152,6 +156,7 @@ export class SessionRecordingIngester {
private globalServerConfig: PluginsServerConfig,
private postgres: PostgresRouter,
private objectStorage: ObjectStorage,
private consumeOverflow: boolean,
captureRedis: Redis | undefined
) {
this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION
Expand All @@ -165,7 +170,7 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) {
if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis && !consumeOverflow) {
this.overflowDetection = new OverflowManager(
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY,
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE,
Expand Down Expand Up @@ -467,8 +472,10 @@ export class SessionRecordingIngester {

this.batchConsumer = await startBatchConsumer({
connectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
groupId: this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID,
topic: this.consumeOverflow
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW
: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
autoCommit: false,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
Expand Down
24 changes: 23 additions & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ export async function startPluginsServer(
throw new Error("Can't start session recording blob ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, captureRedis)
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, false, captureRedis)
await ingester.start()

const batchConsumer = ingester.batchConsumer
Expand All @@ -458,6 +458,28 @@ export async function startPluginsServer(
}
}

if (capabilities.sessionRecordingBlobOverflowIngestion) {
const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig)

if (!s3) {
throw new Error("Can't start session recording blob ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
// NOTE: We don't pass captureRedis to disable overflow computation on the overflow topic
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, true, undefined)
await ingester.start()

const batchConsumer = ingester.batchConsumer

if (batchConsumer) {
stopSessionRecordingBlobConsumer = () => ingester.stop()
shutdownOnConsumerExit(batchConsumer)
healthChecks['session-recordings-blob-overflow'] = () => ingester.isHealthy() ?? false
}
}

if (capabilities.sessionRecordingV3Ingestion) {
const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export enum PluginServerMode {
scheduler = 'scheduler',
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow',
recordings_ingestion_v3 = 'recordings-ingestion-v3',
person_overrides = 'person-overrides',
}
Expand Down Expand Up @@ -306,6 +307,7 @@ export interface PluginServerCapabilities {
processAsyncOnEventHandlers?: boolean
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestion?: boolean
sessionRecordingBlobOverflowIngestion?: boolean
sessionRecordingV3Ingestion?: boolean
personOverrides?: boolean
appManagementSingleton?: boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ describe('ingester', () => {

await deleteKeysWithPrefix(hub)

ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, redisConn)
ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, redisConn)
await ingester.start()

mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)])
Expand Down Expand Up @@ -162,7 +162,7 @@ describe('ingester', () => {
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig

const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined)
const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined)
expect(ingester['debugPartition']).toEqual(103)
})

Expand All @@ -171,7 +171,7 @@ describe('ingester', () => {
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig

const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined)
const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined)
expect(ingester['debugPartition']).toBeUndefined()
})

Expand Down Expand Up @@ -436,7 +436,7 @@ describe('ingester', () => {
jest.setTimeout(5000) // Increased to cover lock delay

beforeEach(async () => {
otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined)
otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined)
await otherIngester.start()
})

Expand Down

0 comments on commit c39f28d

Please sign in to comment.