diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 02ee642c6648f..afca924b3cb1f 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -152,6 +152,7 @@ export function getDefaultConfig(): PluginsServerConfig { SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings', SESSION_RECORDING_REDIS_PREFIX: '@posthog/replay/', SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: false, + SESSION_RECORDING_PARALLEL_CONSUMPTION: false, POSTHOG_SESSION_RECORDING_REDIS_HOST: undefined, POSTHOG_SESSION_RECORDING_REDIS_PORT: undefined, } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index 51eba13ff0f8f..fd8115a30988f 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -332,7 +332,6 @@ export class SessionRecordingIngesterV2 { statsKey: `recordingingester.handleEachBatch`, logExecutionTime: true, func: async () => { - const transaction = Sentry.startTransaction({ name: `blobIngestion_handleEachBatch` }, {}) histogramKafkaBatchSize.observe(messages.length) const recordingMessages: IncomingRecordingMessage[] = [] @@ -385,16 +384,14 @@ export class SessionRecordingIngesterV2 { }) await runInstrumentedFunction({ - statsKey: `recordingingester.handleEachBatch.consumeSerial`, + statsKey: `recordingingester.handleEachBatch.consumeBatch`, func: async () => { - for (const message of recordingMessages) { - const consumeSpan = transaction?.startChild({ - op: 'blobConsume', - }) - - await this.consume(message, consumeSpan) - // TODO: We could do this as batch of offsets for the whole lot... - consumeSpan?.finish() + if (this.serverConfig.SESSION_RECORDING_PARALLEL_CONSUMPTION) { + await Promise.all(recordingMessages.map((x) => this.consume(x))) + } else { + for (const message of recordingMessages) { + await this.consume(message) + } } }, }) @@ -417,8 +414,6 @@ export class SessionRecordingIngesterV2 { await this.flushAllReadySessions() }, }) - - transaction.finish() }, }) } @@ -544,7 +539,9 @@ export class SessionRecordingIngesterV2 { this.partitionAssignments[topicPartition.partition] = {} }) - await this.partitionLocker.claim(topicPartitions) + if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { + await this.partitionLocker.claim(topicPartitions) + } await this.offsetsRefresher.refresh() } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 12f76ce214378..f6c2c9077c5ac 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -221,6 +221,7 @@ export interface PluginsServerConfig { SESSION_RECORDING_REMOTE_FOLDER: string SESSION_RECORDING_REDIS_PREFIX: string SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: boolean + SESSION_RECORDING_PARALLEL_CONSUMPTION: boolean // Dedicated infra values SESSION_RECORDING_KAFKA_HOSTS: string | undefined