Skip to content

Commit

Permalink
fix: Fixed up server config naming (#17598)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 26, 2023
1 parent 6016bdb commit fd93ca4
Showing 1 changed file with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,31 +106,31 @@ export class SessionRecordingIngesterV2 {
partitionLockInterval: NodeJS.Timer | null = null
teamsRefresher: BackgroundRefresher<Record<string, TeamId>>
offsetsRefresher: BackgroundRefresher<Record<number, number>>
recordingConsumerConfig: PluginsServerConfig
config: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS

private promises: Set<Promise<any>> = new Set()

constructor(
private serverConfig: PluginsServerConfig,
globalServerConfig: PluginsServerConfig,
private postgres: PostgresRouter,
private objectStorage: ObjectStorage
) {
this.recordingConsumerConfig = sessionRecordingConsumerConfig(this.serverConfig)
this.redisPool = createRedisPool(this.recordingConsumerConfig)
// NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis
// We stil connect to some of the non-dedicated resources such as postgres or the Replay events kafka.
this.config = sessionRecordingConsumerConfig(globalServerConfig)
this.redisPool = createRedisPool(this.config)

this.realtimeManager = new RealtimeManager(this.redisPool, this.recordingConsumerConfig)
this.partitionLocker = new PartitionLocker(
this.redisPool,
this.recordingConsumerConfig.SESSION_RECORDING_REDIS_PREFIX
)
this.realtimeManager = new RealtimeManager(this.redisPool, this.config)
this.partitionLocker = new PartitionLocker(this.redisPool, this.config.SESSION_RECORDING_REDIS_PREFIX)

this.offsetHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
serverConfig.SESSION_RECORDING_REDIS_PREFIX
this.config.SESSION_RECORDING_REDIS_PREFIX
)

this.replayEventsIngester = new ReplayEventsIngester(this.serverConfig, this.offsetHighWaterMarker)
// NOTE: This is the only place where we need to use the shared server config
this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.offsetHighWaterMarker)

this.teamsRefresher = new BackgroundRefresher(async () => {
try {
Expand Down Expand Up @@ -234,7 +234,7 @@ export class SessionRecordingIngesterV2 {
const { partition, topic } = event.metadata

const sessionManager = new SessionManager(
this.serverConfig,
this.config,
this.objectStorage.s3,
this.realtimeManager,
this.offsetHighWaterMarker,
Expand Down Expand Up @@ -339,7 +339,7 @@ export class SessionRecordingIngesterV2 {

const recordingMessages: IncomingRecordingMessage[] = []

if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
await this.partitionLocker.claim(messages)
}

Expand Down Expand Up @@ -389,7 +389,7 @@ export class SessionRecordingIngesterV2 {
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeBatch`,
func: async () => {
if (this.serverConfig.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
await Promise.all(recordingMessages.map((x) => this.consume(x)))
} else {
for (const message of recordingMessages) {
Expand Down Expand Up @@ -429,8 +429,13 @@ export class SessionRecordingIngesterV2 {

// Currently we can't reuse any files stored on disk, so we opt to delete them all
try {
rmSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true, force: true })
mkdirSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true })
rmSync(bufferFileDir(this.config.SESSION_RECORDING_LOCAL_DIRECTORY), {
recursive: true,
force: true,
})
mkdirSync(bufferFileDir(this.config.SESSION_RECORDING_LOCAL_DIRECTORY), {
recursive: true,
})
} catch (e) {
status.error('🔥', 'Failed to recreate local buffer directory', e)
captureException(e)
Expand All @@ -442,13 +447,13 @@ export class SessionRecordingIngesterV2 {

await this.replayEventsIngester.start()

if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
this.partitionLockInterval = setInterval(async () => {
await this.partitionLocker.claim(this.assignedTopicPartitions)
}, PARTITION_LOCK_INTERVAL_MS)
}

const connectionConfig = createRdConnectionConfigFromEnvVars(this.recordingConsumerConfig)
const connectionConfig = createRdConnectionConfigFromEnvVars(this.config)

// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
Expand All @@ -461,15 +466,15 @@ export class SessionRecordingIngesterV2 {
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
// our messages are very big, so we don't want to buffer too many
queuedMinMessages: this.recordingConsumerConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: this.recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.recordingConsumerConfig.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: this.recordingConsumerConfig.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: this.recordingConsumerConfig.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.recordingConsumerConfig.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE,
batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
autoCommit: false,
eachBatch: async (messages) => {
return await this.handleEachBatch(messages)
Expand Down Expand Up @@ -548,7 +553,7 @@ export class SessionRecordingIngesterV2 {
this.partitionAssignments[topicPartition.partition] = {}
})

if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
await this.partitionLocker.claim(topicPartitions)
}
await this.offsetsRefresher.refresh()
Expand Down Expand Up @@ -595,7 +600,7 @@ export class SessionRecordingIngesterV2 {
logExecutionTime: true,
timeout: 30000, // same as the partition lock
func: async () => {
if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
// Extend our claim on these partitions to give us time to flush
await this.partitionLocker.claim(topicPartitions)
status.info(
Expand Down

0 comments on commit fd93ca4

Please sign in to comment.