diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index b7285d1b1ebee4..caa5b8f576a11e 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processAsyncOnEventHandlers: true, processAsyncWebhooksHandlers: true, sessionRecordingBlobIngestion: true, + sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED, personOverrides: true, appManagementSingleton: true, preflightSchedules: true, @@ -55,7 +56,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin sessionRecordingBlobIngestion: true, ...sharedCapabilities, } - + case PluginServerMode.recordings_blob_ingestion_overflow: + return { + sessionRecordingBlobOverflowIngestion: true, + ...sharedCapabilities, + } case PluginServerMode.recordings_ingestion_v3: return { sessionRecordingV3Ingestion: true, diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 4fd3e54b043b5b..71f9bd8ee79da5 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -29,6 +29,8 @@ export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffi // read session recording snapshot items export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}` +export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = `${prefix}session_recording_snapshot_item_overflow${suffix}` + // write session recording and replay events to ClickHouse export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}` export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}` diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 2e84d7826c0023..ec606dd1c460d4 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -6,7 +6,10 @@ import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartit import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' +import { + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, +} from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' import { PluginsServerConfig, RedisPool, TeamId } from '../../../types' @@ -41,6 +44,7 @@ require('@sentry/tracing') // WARNING: Do not change this - it will essentially reset the consumer const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob' +const KAFKA_CONSUMER_GROUP_ID_OVERFLOW = 'session-recordings-blob-overflow' const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000 const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' @@ -152,6 +156,7 @@ export class SessionRecordingIngester { private globalServerConfig: PluginsServerConfig, private postgres: PostgresRouter, private objectStorage: ObjectStorage, + private consumeOverflow: boolean, captureRedis: Redis | undefined ) { this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION @@ -165,7 +170,7 @@ export class SessionRecordingIngester { this.realtimeManager = new RealtimeManager(this.redisPool, this.config) - if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) { + if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis && !consumeOverflow) { this.overflowDetection = new OverflowManager( globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY, globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE, @@ -467,8 +472,10 @@ export class SessionRecordingIngester { this.batchConsumer = await startBatchConsumer({ connectionConfig, - groupId: KAFKA_CONSUMER_GROUP_ID, - topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + groupId: this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID, + topic: this.consumeOverflow + ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW + : KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, autoCommit: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 8c910e1857b063..27bfee273bd377 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -446,7 +446,7 @@ export async function startPluginsServer( throw new Error("Can't start session recording blob ingestion without object storage") } // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas - const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, captureRedis) + const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, false, captureRedis) await ingester.start() const batchConsumer = ingester.batchConsumer @@ -458,6 +458,28 @@ export async function startPluginsServer( } } + if (capabilities.sessionRecordingBlobOverflowIngestion) { + const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) + const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) + + if (!s3) { + throw new Error("Can't start session recording blob ingestion without object storage") + } + // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas + // NOTE: We don't pass captureRedis to disable overflow computation on the overflow topic + const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, true, undefined) + await ingester.start() + + const batchConsumer = ingester.batchConsumer + + if (batchConsumer) { + stopSessionRecordingBlobConsumer = () => ingester.stop() + shutdownOnConsumerExit(batchConsumer) + healthChecks['session-recordings-blob-overflow'] = () => ingester.isHealthy() ?? false + } + } + if (capabilities.sessionRecordingV3Ingestion) { const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 36bbbc3752ece0..da51b50eb90007 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -77,6 +77,7 @@ export enum PluginServerMode { scheduler = 'scheduler', analytics_ingestion = 'analytics-ingestion', recordings_blob_ingestion = 'recordings-blob-ingestion', + recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow', recordings_ingestion_v3 = 'recordings-ingestion-v3', person_overrides = 'person-overrides', } @@ -306,6 +307,7 @@ export interface PluginServerCapabilities { processAsyncOnEventHandlers?: boolean processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestion?: boolean + sessionRecordingBlobOverflowIngestion?: boolean sessionRecordingV3Ingestion?: boolean personOverrides?: boolean appManagementSingleton?: boolean diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 730fe28f481ac7..b1be10771b1c92 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -114,7 +114,7 @@ describe('ingester', () => { await deleteKeysWithPrefix(hub) - ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, redisConn) + ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, redisConn) await ingester.start() mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) @@ -162,7 +162,7 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) expect(ingester['debugPartition']).toEqual(103) }) @@ -171,7 +171,7 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) expect(ingester['debugPartition']).toBeUndefined() }) @@ -436,7 +436,7 @@ describe('ingester', () => { jest.setTimeout(5000) // Increased to cover lock delay beforeEach(async () => { - otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) await otherIngester.start() })