-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(blobby): add new recordings-blob-ingestion-overflow role #21161
Changes from all commits
c39f28d
e28e766
fca1bef
9b8ed37
0bbcc66
b1c10a3
7f16eed
f1e13d0
90a818f
a11537e
0713148
ba3cfd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,10 @@ import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartit | |
import { Counter, Gauge, Histogram } from 'prom-client' | ||
|
||
import { sessionRecordingConsumerConfig } from '../../../config/config' | ||
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' | ||
import { | ||
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, | ||
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, | ||
} from '../../../config/kafka-topics' | ||
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' | ||
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' | ||
import { createKafkaProducer } from '../../../kafka/producer' | ||
|
@@ -43,6 +46,7 @@ require('@sentry/tracing') | |
|
||
// WARNING: Do not change this - it will essentially reset the consumer | ||
const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob' | ||
const KAFKA_CONSUMER_GROUP_ID_OVERFLOW = 'session-recordings-blob-overflow' | ||
const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 | ||
const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000 | ||
const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' | ||
|
@@ -141,7 +145,8 @@ export class SessionRecordingIngester { | |
teamsRefresher: BackgroundRefresher<Record<string, TeamIDWithConfig>> | ||
latestOffsetsRefresher: BackgroundRefresher<Record<number, number | undefined>> | ||
config: PluginsServerConfig | ||
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS | ||
topic: string | ||
consumerGroupId: string | ||
totalNumPartitions = 0 | ||
isStopping = false | ||
|
||
|
@@ -156,11 +161,16 @@ export class SessionRecordingIngester { | |
private globalServerConfig: PluginsServerConfig, | ||
private postgres: PostgresRouter, | ||
private objectStorage: ObjectStorage, | ||
private consumeOverflow: boolean, | ||
captureRedis: Redis | undefined | ||
) { | ||
this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION | ||
? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION) | ||
: undefined | ||
this.topic = consumeOverflow | ||
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW | ||
: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS | ||
this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID | ||
Comment on lines
+170
to
+173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👨🍳 👌 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks! I grepped for raw uses of the topic name (var name and value) and could not find any. Hopefully I tracked them all down. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't find any others either. 👍 |
||
|
||
// NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis | ||
// We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. | ||
|
@@ -169,7 +179,7 @@ export class SessionRecordingIngester { | |
|
||
this.realtimeManager = new RealtimeManager(this.redisPool, this.config) | ||
|
||
if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) { | ||
if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis && !consumeOverflow) { | ||
this.overflowDetection = new OverflowManager( | ||
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY, | ||
globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE, | ||
|
@@ -207,7 +217,7 @@ export class SessionRecordingIngester { | |
this.latestOffsetsRefresher = new BackgroundRefresher(async () => { | ||
const results = await Promise.all( | ||
this.assignedTopicPartitions.map(({ partition }) => | ||
queryWatermarkOffsets(this.connectedBatchConsumer, partition).catch((err) => { | ||
queryWatermarkOffsets(this.connectedBatchConsumer, this.topic, partition).catch((err) => { | ||
// NOTE: This can error due to a timeout or the consumer being disconnected, not stop the process | ||
// as it is currently only used for reporting lag. | ||
captureException(err) | ||
|
@@ -259,8 +269,6 @@ export class SessionRecordingIngester { | |
|
||
const { team_id, session_id } = event | ||
const key = `${team_id}-${session_id}` | ||
// TODO: use this for session key too if it's safe to do so | ||
const overflowKey = `${team_id}:${session_id}` | ||
|
||
const { partition, highOffset } = event.metadata | ||
const isDebug = this.debugPartition === partition | ||
|
@@ -285,11 +293,7 @@ export class SessionRecordingIngester { | |
|
||
// Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking) | ||
if ( | ||
await this.persistentHighWaterMarker.isBelowHighWaterMark( | ||
event.metadata, | ||
KAFKA_CONSUMER_GROUP_ID, | ||
highOffset | ||
) | ||
await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, this.consumerGroupId, highOffset) | ||
) { | ||
dropEvent('high_water_mark_partition') | ||
return | ||
|
@@ -318,7 +322,7 @@ export class SessionRecordingIngester { | |
|
||
await Promise.allSettled([ | ||
this.sessions[key]?.add(event), | ||
this.overflowDetection?.observe(overflowKey, event.metadata.rawSize, event.metadata.timestamp), | ||
this.overflowDetection?.observe(session_id, event.metadata.rawSize, event.metadata.timestamp), | ||
]) | ||
} | ||
|
||
|
@@ -486,8 +490,8 @@ export class SessionRecordingIngester { | |
const replayClusterConnectionConfig = createRdConnectionConfigFromEnvVars(this.config) | ||
this.batchConsumer = await startBatchConsumer({ | ||
connectionConfig: replayClusterConnectionConfig, | ||
groupId: KAFKA_CONSUMER_GROUP_ID, | ||
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, | ||
groupId: this.consumerGroupId, | ||
topic: this.topic, | ||
autoCommit: false, | ||
sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, | ||
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, | ||
|
@@ -510,7 +514,7 @@ export class SessionRecordingIngester { | |
debug: this.config.SESSION_RECORDING_KAFKA_DEBUG, | ||
}) | ||
|
||
this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer)).length | ||
this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer, this.topic)).length | ||
|
||
addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) | ||
|
||
|
@@ -820,7 +824,7 @@ export class SessionRecordingIngester { | |
}) | ||
|
||
// Store the committed offset to the persistent store to avoid rebalance issues | ||
await this.persistentHighWaterMarker.add(tp, KAFKA_CONSUMER_GROUP_ID, highestOffsetToCommit) | ||
await this.persistentHighWaterMarker.add(tp, this.consumerGroupId, highestOffsetToCommit) | ||
// Clear all session offsets below the committed offset (as we know they have been flushed) | ||
await this.sessionHighWaterMarker.clear(tp, highestOffsetToCommit) | ||
gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish github would let you comment on unchanged lines
reading through this again got me thinking of the various prometheus metrics. we have on our dashboard
I guess we need to change the dashboard so I can view metrics from not-overflow and overflow deployments separately rather than labelling them all on whether they're overflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All prom metrics inherit the
role
tag from the deploy's label, I'm planning on adding that tag as a dashboard var, keeping the default to the main consumer. We'll have to duplicate some kafka widgets, but the rest will be reusable without duping the whole dash.