From fd93ca40254e63a8cef97c5db38c09c86ce5b3f8 Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 26 Sep 2023 15:54:56 +0200 Subject: [PATCH] fix: Fixed up server config naming (#17598) --- .../session-recordings-consumer-v2.ts | 61 ++++++++++--------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index 950eb20f8afcf..f783b7390bc7e 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -106,31 +106,31 @@ export class SessionRecordingIngesterV2 { partitionLockInterval: NodeJS.Timer | null = null teamsRefresher: BackgroundRefresher> offsetsRefresher: BackgroundRefresher> - recordingConsumerConfig: PluginsServerConfig + config: PluginsServerConfig topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS private promises: Set> = new Set() constructor( - private serverConfig: PluginsServerConfig, + globalServerConfig: PluginsServerConfig, private postgres: PostgresRouter, private objectStorage: ObjectStorage ) { - this.recordingConsumerConfig = sessionRecordingConsumerConfig(this.serverConfig) - this.redisPool = createRedisPool(this.recordingConsumerConfig) + // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis + // We stil connect to some of the non-dedicated resources such as postgres or the Replay events kafka. + this.config = sessionRecordingConsumerConfig(globalServerConfig) + this.redisPool = createRedisPool(this.config) - this.realtimeManager = new RealtimeManager(this.redisPool, this.recordingConsumerConfig) - this.partitionLocker = new PartitionLocker( - this.redisPool, - this.recordingConsumerConfig.SESSION_RECORDING_REDIS_PREFIX - ) + this.realtimeManager = new RealtimeManager(this.redisPool, this.config) + this.partitionLocker = new PartitionLocker(this.redisPool, this.config.SESSION_RECORDING_REDIS_PREFIX) this.offsetHighWaterMarker = new OffsetHighWaterMarker( this.redisPool, - serverConfig.SESSION_RECORDING_REDIS_PREFIX + this.config.SESSION_RECORDING_REDIS_PREFIX ) - this.replayEventsIngester = new ReplayEventsIngester(this.serverConfig, this.offsetHighWaterMarker) + // NOTE: This is the only place where we need to use the shared server config + this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.offsetHighWaterMarker) this.teamsRefresher = new BackgroundRefresher(async () => { try { @@ -234,7 +234,7 @@ export class SessionRecordingIngesterV2 { const { partition, topic } = event.metadata const sessionManager = new SessionManager( - this.serverConfig, + this.config, this.objectStorage.s3, this.realtimeManager, this.offsetHighWaterMarker, @@ -339,7 +339,7 @@ export class SessionRecordingIngesterV2 { const recordingMessages: IncomingRecordingMessage[] = [] - if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { + if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { await this.partitionLocker.claim(messages) } @@ -389,7 +389,7 @@ export class SessionRecordingIngesterV2 { await runInstrumentedFunction({ statsKey: `recordingingester.handleEachBatch.consumeBatch`, func: async () => { - if (this.serverConfig.SESSION_RECORDING_PARALLEL_CONSUMPTION) { + if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) { await Promise.all(recordingMessages.map((x) => this.consume(x))) } else { for (const message of recordingMessages) { @@ -429,8 +429,13 @@ export class SessionRecordingIngesterV2 { // Currently we can't reuse any files stored on disk, so we opt to delete them all try { - rmSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true, force: true }) - mkdirSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true }) + rmSync(bufferFileDir(this.config.SESSION_RECORDING_LOCAL_DIRECTORY), { + recursive: true, + force: true, + }) + mkdirSync(bufferFileDir(this.config.SESSION_RECORDING_LOCAL_DIRECTORY), { + recursive: true, + }) } catch (e) { status.error('🔥', 'Failed to recreate local buffer directory', e) captureException(e) @@ -442,13 +447,13 @@ export class SessionRecordingIngesterV2 { await this.replayEventsIngester.start() - if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { + if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { this.partitionLockInterval = setInterval(async () => { await this.partitionLocker.claim(this.assignedTopicPartitions) }, PARTITION_LOCK_INTERVAL_MS) } - const connectionConfig = createRdConnectionConfigFromEnvVars(this.recordingConsumerConfig) + const connectionConfig = createRdConnectionConfigFromEnvVars(this.config) // Create a node-rdkafka consumer that fetches batches of messages, runs // eachBatchWithContext, then commits offsets for the batch. @@ -461,15 +466,15 @@ export class SessionRecordingIngesterV2 { // the largest size of a message that can be fetched by the consumer. // the largest size our MSK cluster allows is 20MB // we only use 9 or 10MB but there's no reason to limit this 🤷️ - consumerMaxBytes: this.recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES, - consumerMaxBytesPerPartition: this.recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, + consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES, + consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, // our messages are very big, so we don't want to buffer too many - queuedMinMessages: this.recordingConsumerConfig.SESSION_RECORDING_KAFKA_QUEUE_SIZE, - consumerMaxWaitMs: this.recordingConsumerConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS, - consumerErrorBackoffMs: this.recordingConsumerConfig.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, - fetchBatchSize: this.recordingConsumerConfig.SESSION_RECORDING_KAFKA_BATCH_SIZE, - batchingTimeoutMs: this.recordingConsumerConfig.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, - topicCreationTimeoutMs: this.recordingConsumerConfig.KAFKA_TOPIC_CREATION_TIMEOUT_MS, + queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE, + consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS, + consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, + fetchBatchSize: this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE, + batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, + topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS, autoCommit: false, eachBatch: async (messages) => { return await this.handleEachBatch(messages) @@ -548,7 +553,7 @@ export class SessionRecordingIngesterV2 { this.partitionAssignments[topicPartition.partition] = {} }) - if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { + if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { await this.partitionLocker.claim(topicPartitions) } await this.offsetsRefresher.refresh() @@ -595,7 +600,7 @@ export class SessionRecordingIngesterV2 { logExecutionTime: true, timeout: 30000, // same as the partition lock func: async () => { - if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { + if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { // Extend our claim on these partitions to give us time to flush await this.partitionLocker.claim(topicPartitions) status.info(