Skip to content

Commit

Permalink
feat(blobby): add new recordings-blob-ingestion-overflow role (#21161)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Apr 2, 2024
1 parent a9035da commit db30cd2
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 117 deletions.
7 changes: 6 additions & 1 deletion plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED,
personOverrides: true,
appManagementSingleton: true,
preflightSchedules: true,
Expand Down Expand Up @@ -55,7 +56,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
sessionRecordingBlobIngestion: true,
...sharedCapabilities,
}

case PluginServerMode.recordings_blob_ingestion_overflow:
return {
sessionRecordingBlobOverflowIngestion: true,
...sharedCapabilities,
}
case PluginServerMode.recordings_ingestion_v3:
return {
sessionRecordingV3Ingestion: true,
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffi

// read session recording snapshot items
export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}`
export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = `${prefix}session_recording_snapshot_item_overflow${suffix}`

// write session recording and replay events to ClickHouse
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartit
import { Counter, Gauge, Histogram } from 'prom-client'

import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import {
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
} from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer } from '../../../kafka/producer'
Expand Down Expand Up @@ -43,6 +46,7 @@ require('@sentry/tracing')

// WARNING: Do not change this - it will essentially reset the consumer
const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob'
const KAFKA_CONSUMER_GROUP_ID_OVERFLOW = 'session-recordings-blob-overflow'
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000
const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000
const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay'
Expand Down Expand Up @@ -141,7 +145,8 @@ export class SessionRecordingIngester {
teamsRefresher: BackgroundRefresher<Record<string, TeamIDWithConfig>>
latestOffsetsRefresher: BackgroundRefresher<Record<number, number | undefined>>
config: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
topic: string
consumerGroupId: string
totalNumPartitions = 0
isStopping = false

Expand All @@ -156,11 +161,16 @@ export class SessionRecordingIngester {
private globalServerConfig: PluginsServerConfig,
private postgres: PostgresRouter,
private objectStorage: ObjectStorage,
private consumeOverflow: boolean,
captureRedis: Redis | undefined
) {
this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION
? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION)
: undefined
this.topic = consumeOverflow
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW
: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID

// NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis
// We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
Expand All @@ -169,7 +179,7 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) {
if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis && !consumeOverflow) {
this.overflowDetection = new OverflowManager(
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY,
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE,
Expand Down Expand Up @@ -207,7 +217,7 @@ export class SessionRecordingIngester {
this.latestOffsetsRefresher = new BackgroundRefresher(async () => {
const results = await Promise.all(
this.assignedTopicPartitions.map(({ partition }) =>
queryWatermarkOffsets(this.connectedBatchConsumer, partition).catch((err) => {
queryWatermarkOffsets(this.connectedBatchConsumer, this.topic, partition).catch((err) => {
// NOTE: This can error due to a timeout or the consumer being disconnected, not stop the process
// as it is currently only used for reporting lag.
captureException(err)
Expand Down Expand Up @@ -259,8 +269,6 @@ export class SessionRecordingIngester {

const { team_id, session_id } = event
const key = `${team_id}-${session_id}`
// TODO: use this for session key too if it's safe to do so
const overflowKey = `${team_id}:${session_id}`

const { partition, highOffset } = event.metadata
const isDebug = this.debugPartition === partition
Expand All @@ -285,11 +293,7 @@ export class SessionRecordingIngester {

// Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking)
if (
await this.persistentHighWaterMarker.isBelowHighWaterMark(
event.metadata,
KAFKA_CONSUMER_GROUP_ID,
highOffset
)
await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, this.consumerGroupId, highOffset)
) {
dropEvent('high_water_mark_partition')
return
Expand Down Expand Up @@ -318,7 +322,7 @@ export class SessionRecordingIngester {

await Promise.allSettled([
this.sessions[key]?.add(event),
this.overflowDetection?.observe(overflowKey, event.metadata.rawSize, event.metadata.timestamp),
this.overflowDetection?.observe(session_id, event.metadata.rawSize, event.metadata.timestamp),
])
}

Expand Down Expand Up @@ -486,8 +490,8 @@ export class SessionRecordingIngester {
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config)
this.batchConsumer = await startBatchConsumer({
connectionConfig: replayClusterConnectionConfig,
groupId: KAFKA_CONSUMER_GROUP_ID,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
groupId: this.consumerGroupId,
topic: this.topic,
autoCommit: false,
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
Expand All @@ -510,7 +514,7 @@ export class SessionRecordingIngester {
debug: this.config.SESSION_RECORDING_KAFKA_DEBUG,
})

this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer)).length
this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer, this.topic)).length

addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)

Expand Down Expand Up @@ -820,7 +824,7 @@ export class SessionRecordingIngester {
})

// Store the committed offset to the persistent store to avoid rebalance issues
await this.persistentHighWaterMarker.add(tp, KAFKA_CONSUMER_GROUP_ID, highestOffsetToCommit)
await this.persistentHighWaterMarker.add(tp, this.consumerGroupId, highestOffsetToCommit)
// Clear all session offsets below the committed offset (as we know they have been flushed)
await this.sessionHighWaterMarker.clear(tp, highestOffsetToCommit)
gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit)
Expand Down
25 changes: 10 additions & 15 deletions plugin-server/src/main/ingestion-queues/session-recording/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartitio
import path from 'path'
import { Counter } from 'prom-client'

import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../utils/status'
Expand Down Expand Up @@ -36,6 +35,7 @@ export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-f

export const queryWatermarkOffsets = (
kafkaConsumer: KafkaConsumer | undefined,
topic: string,
partition: number,
timeout = 10000
): Promise<[number, number]> => {
Expand All @@ -44,20 +44,15 @@ export const queryWatermarkOffsets = (
return reject('Not connected')
}

kafkaConsumer.queryWatermarkOffsets(
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
partition,
timeout,
(err, offsets) => {
if (err) {
captureException(err)
status.error('🔥', 'Failed to query kafka watermark offsets', err)
return reject(err)
}

resolve([partition, offsets.highOffset])
kafkaConsumer.queryWatermarkOffsets(topic, partition, timeout, (err, offsets) => {
if (err) {
captureException(err)
status.error('🔥', 'Failed to query kafka watermark offsets', err)
return reject(err)
}
)

resolve([partition, offsets.highOffset])
})
})
}

Expand Down Expand Up @@ -89,7 +84,7 @@ export const queryCommittedOffsets = (

export const getPartitionsForTopic = (
kafkaConsumer: KafkaConsumer | undefined,
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
topic: string
): Promise<PartitionMetadata[]> => {
return new Promise<PartitionMetadata[]>((resolve, reject) => {
if (!kafkaConsumer) {
Expand Down
26 changes: 25 additions & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export async function startPluginsServer(
// meantime.
let bufferConsumer: Consumer | undefined
let stopSessionRecordingBlobConsumer: (() => void) | undefined
let stopSessionRecordingBlobOverflowConsumer: (() => void) | undefined
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined

Expand Down Expand Up @@ -151,6 +152,7 @@ export async function startPluginsServer(
bufferConsumer?.disconnect(),
jobsConsumer?.disconnect(),
stopSessionRecordingBlobConsumer?.(),
stopSessionRecordingBlobOverflowConsumer?.(),
schedulerTasksConsumer?.disconnect(),
personOverridesPeriodicTask?.stop(),
])
Expand Down Expand Up @@ -446,7 +448,7 @@ export async function startPluginsServer(
throw new Error("Can't start session recording blob ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, captureRedis)
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, false, captureRedis)
await ingester.start()

const batchConsumer = ingester.batchConsumer
Expand All @@ -458,6 +460,28 @@ export async function startPluginsServer(
}
}

if (capabilities.sessionRecordingBlobOverflowIngestion) {
const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig)

if (!s3) {
throw new Error("Can't start session recording blob ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
// NOTE: We don't pass captureRedis to disable overflow computation on the overflow topic
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, true, undefined)
await ingester.start()

const batchConsumer = ingester.batchConsumer

if (batchConsumer) {
stopSessionRecordingBlobOverflowConsumer = () => ingester.stop()
shutdownOnConsumerExit(batchConsumer)
healthChecks['session-recordings-blob-overflow'] = () => ingester.isHealthy() ?? false
}
}

if (capabilities.sessionRecordingV3Ingestion) {
const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig)
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export enum PluginServerMode {
scheduler = 'scheduler',
analytics_ingestion = 'analytics-ingestion',
recordings_blob_ingestion = 'recordings-blob-ingestion',
recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow',
recordings_ingestion_v3 = 'recordings-ingestion-v3',
person_overrides = 'person-overrides',
}
Expand Down Expand Up @@ -306,6 +307,7 @@ export interface PluginServerCapabilities {
processAsyncOnEventHandlers?: boolean
processAsyncWebhooksHandlers?: boolean
sessionRecordingBlobIngestion?: boolean
sessionRecordingBlobOverflowIngestion?: boolean
sessionRecordingV3Ingestion?: boolean
personOverrides?: boolean
appManagementSingleton?: boolean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Message } from 'node-rdkafka'

import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics'
import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types'
import jsonFullSnapshot from './data/snapshot-full.json'

Expand Down Expand Up @@ -41,13 +40,14 @@ export function createIncomingRecordingMessage(
}

export function createKafkaMessage(
topic: string,
token: number | string,
messageOverrides: Partial<Message> = {},
eventProperties: Record<string, any> = {}
): Message {
return {
partition: 1,
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
topic,
offset: 0,
timestamp: messageOverrides.timestamp ?? Date.now(),
size: 1,
Expand All @@ -72,6 +72,6 @@ export function createKafkaMessage(
}
}

export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) {
export function createTP(partition: number, topic: string) {
return { topic, partition }
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import path from 'path'

import { waitForExpect } from '../../../../functional_tests/expectations'
import { defaultConfig } from '../../../../src/config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics'
import {
SessionManagerBufferContext,
SessionManagerContext,
Expand Down Expand Up @@ -76,6 +77,7 @@ describe('ingester', () => {
let teamToken = ''
let mockOffsets: Record<number, number> = {}
let mockCommittedOffsets: Record<number, number> = {}
const consumedTopic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS

beforeAll(async () => {
mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true })
Expand Down Expand Up @@ -120,7 +122,7 @@ describe('ingester', () => {
ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage)
await ingester.start()

mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)])
mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)])
})

afterEach(async () => {
Expand All @@ -139,6 +141,7 @@ describe('ingester', () => {
mockOffsets[partition]++

return createKafkaMessage(
consumedTopic,
teamToken,
{
partition,
Expand Down Expand Up @@ -223,7 +226,7 @@ describe('ingester', () => {

describe('batch event processing', () => {
it('should batch parse incoming events and batch them to reduce writes', async () => {
mockConsumer.assignments.mockImplementation(() => [createTP(1)])
mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)])
await ingester.handleEachBatch(
[
createMessage('session_id_1', 1),
Expand Down Expand Up @@ -279,7 +282,11 @@ describe('ingester', () => {
const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)]
const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)]

mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)])
mockConsumer.assignments.mockImplementation(() => [
createTP(1, consumedTopic),
createTP(2, consumedTopic),
createTP(3, consumedTopic),
])
await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop)

expect(getSessions(ingester)).toMatchObject([
Expand All @@ -291,12 +298,15 @@ describe('ingester', () => {

// Call handleEachBatch with both consumers - we simulate the assignments which
// is what is responsible for the actual syncing of the sessions
mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)])
mockConsumer.assignments.mockImplementation(() => [
createTP(2, consumedTopic),
createTP(3, consumedTopic),
])
await otherIngester.handleEachBatch(
[createMessage('session_id_4', 2), createMessage('session_id_5', 2)],
noop
)
mockConsumer.assignments.mockImplementation(() => [createTP(1)])
mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)])
await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop)

// Should still have the partition 1 sessions that didnt move with added events
Expand All @@ -317,8 +327,8 @@ describe('ingester', () => {
// non-zero offset because the code can't commit offset 0
await ingester.handleEachBatch(
[
createKafkaMessage('invalid_token', { offset: 12 }),
createKafkaMessage('invalid_token', { offset: 13 }),
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }),
createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }),
],
noop
)
Expand Down
Loading

0 comments on commit db30cd2

Please sign in to comment.