diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 9ef2888d431ea..9d7665bc60262 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -1,4 +1,5 @@ import { captureException } from '@sentry/node' +import crypto from 'crypto' import { mkdirSync, rmSync } from 'node:fs' import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' import { Counter, Gauge, Histogram } from 'prom-client' @@ -149,14 +150,18 @@ export class SessionRecordingIngester { this.realtimeManager = new RealtimeManager(this.redisPool, this.config) + // We create a hash of the cluster to use as a unique identifier for the high water marks + // This enables us to swap clusters without having to worry about resetting the high water marks + const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex') + this.sessionHighWaterMarker = new OffsetHighWaterMarker( this.redisPool, - this.config.SESSION_RECORDING_REDIS_PREFIX + this.config.SESSION_RECORDING_REDIS_PREFIX + `kafka-${kafkaClusterIdentifier}/` ) this.persistentHighWaterMarker = new OffsetHighWaterMarker( this.redisPool, - this.config.SESSION_RECORDING_REDIS_PREFIX + 'persistent/' + this.config.SESSION_RECORDING_REDIS_PREFIX + `kafka-${kafkaClusterIdentifier}/persistent/` ) // NOTE: This is the only place where we need to use the shared server config diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 467a2cbe6da89..e780b94196750 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -145,6 +145,7 @@ describe('ingester', () => { it('can parse debug partition config', () => { const config = { SESSION_RECORDING_DEBUG_PARTITION: '103', + KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) @@ -152,7 +153,9 @@ describe('ingester', () => { }) it('can parse absence of debug partition config', () => { - const config = {} satisfies Partial as PluginsServerConfig + const config = { + KAFKA_HOSTS: 'localhost:9092', + } satisfies Partial as PluginsServerConfig const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage) expect(ingester['debugPartition']).toBeUndefined()