Skip to content

Commit

Permalink
feat: Added kafka hash to redis watermarks (#20201)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Feb 12, 2024
1 parent f691f1e commit 9d7d093
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { captureException } from '@sentry/node'
import crypto from 'crypto'
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import { Counter, Gauge, Histogram } from 'prom-client'
Expand Down Expand Up @@ -149,14 +150,18 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

// We create a hash of the cluster to use as a unique identifier for the high water marks
// This enables us to swap clusters without having to worry about resetting the high water marks
const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex')

this.sessionHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
this.config.SESSION_RECORDING_REDIS_PREFIX
this.config.SESSION_RECORDING_REDIS_PREFIX + `kafka-${kafkaClusterIdentifier}/`
)

this.persistentHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
this.config.SESSION_RECORDING_REDIS_PREFIX + 'persistent/'
this.config.SESSION_RECORDING_REDIS_PREFIX + `kafka-${kafkaClusterIdentifier}/persistent/`
)

// NOTE: This is the only place where we need to use the shared server config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,17 @@ describe('ingester', () => {
it('can parse debug partition config', () => {
const config = {
SESSION_RECORDING_DEBUG_PARTITION: '103',
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig

const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage)
expect(ingester['debugPartition']).toEqual(103)
})

it('can parse absence of debug partition config', () => {
const config = {} satisfies Partial<PluginsServerConfig> as PluginsServerConfig
const config = {
KAFKA_HOSTS: 'localhost:9092',
} satisfies Partial<PluginsServerConfig> as PluginsServerConfig

const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage)
expect(ingester['debugPartition']).toBeUndefined()
Expand Down

0 comments on commit 9d7d093

Please sign in to comment.