Skip to content

Commit

Permalink
feat: allow configuring fetch min bytes on plugin server batch consum…
Browse files Browse the repository at this point in the history
…er (#23571)
  • Loading branch information
pauldambra authored Jul 9, 2024
1 parent f4f20f1 commit 67e3bca
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 1 deletion.
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ export function getDefaultConfig(): PluginsServerConfig {
SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 200_000_000, // 200MB burst
SESSION_RECORDING_OVERFLOW_MIN_PER_BATCH: 1_000_000, // All sessions consume at least 1MB/batch, to penalise poor batching
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 30_000, // emit stats event once every 30 seconds - DEBUG value, TODO: default should be 0
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB
// CDP
CDP_WATCHER_OBSERVATION_PERIOD: 10000,
CDP_WATCHER_DISABLED_PERIOD: 1000 * 60 * 10,
Expand Down
11 changes: 10 additions & 1 deletion plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export const startBatchConsumer = async ({
debug,
queuedMaxMessagesKBytes = 102400,
kafkaStatisticIntervalMs = 0,
fetchMinBytes,
maxHealthHeartbeatIntervalMs = 60_000,
}: {
connectionConfig: GlobalConfig
groupId: string
Expand All @@ -92,6 +94,7 @@ export const startBatchConsumer = async ({
callEachBatchWhenEmpty?: boolean
debug?: string
queuedMaxMessagesKBytes?: number
fetchMinBytes?: number
/**
* default to 0 which disables logging
* granularity of 1000ms
Expand All @@ -100,6 +103,7 @@ export const startBatchConsumer = async ({
* see https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
*/
kafkaStatisticIntervalMs?: number
maxHealthHeartbeatIntervalMs?: number
}): Promise<BatchConsumer> => {
// Starts consuming from `topic` in batches of `fetchBatchSize` messages,
// with consumer group id `groupId`. We use `connectionConfig` to connect
Expand Down Expand Up @@ -171,6 +175,11 @@ export const startBatchConsumer = async ({
'statistics.interval.ms': kafkaStatisticIntervalMs,
}

// undefined is valid but things get unhappy if you provide that explicitly
if (fetchMinBytes) {
consumerConfig['fetch.min.bytes'] = fetchMinBytes
}

if (debug) {
// NOTE: If the key exists with value undefined the consumer will throw which is annoying so we define it here instead
consumerConfig.debug = debug
Expand Down Expand Up @@ -322,7 +331,7 @@ export const startBatchConsumer = async ({
const isHealthy = () => {
// We define health as the last consumer loop having run in the last
// minute. This might not be bullet-proof, let's see.
return Date.now() - lastHeartbeatTime < 60000
return Date.now() - lastHeartbeatTime < maxHealthHeartbeatIntervalMs
}

const stop = async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ export class SessionRecordingIngester {
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
fetchMinBytes: this.config.SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES,
// our messages are very big, so we don't want to queue too many
queuedMinMessages: this.config.SESSION_RECORDING_KAFKA_QUEUE_SIZE,
// we'll anyway never queue more than the value set here
Expand All @@ -514,6 +515,7 @@ export class SessionRecordingIngester {
callEachBatchWhenEmpty: true, // Useful as we will still want to account for flushing sessions
debug: this.config.SESSION_RECORDING_KAFKA_DEBUG,
kafkaStatisticIntervalMs: this.config.SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS,
maxHealthHeartbeatIntervalMs: KAFKA_CONSUMER_SESSION_TIMEOUT_MS * 2, // we don't want to proactively declare healthy - we'll let the broker do it
})

this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer, this.topic)).length
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ export interface PluginsServerConfig extends CdpConfig {
SESSION_RECORDING_KAFKA_QUEUE_SIZE_KB: number | undefined
SESSION_RECORDING_KAFKA_DEBUG: string | undefined
SESSION_RECORDING_MAX_PARALLEL_FLUSHES: number
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: number

POSTHOG_SESSION_RECORDING_REDIS_HOST: string | undefined
POSTHOG_SESSION_RECORDING_REDIS_PORT: number | undefined
Expand Down

0 comments on commit 67e3bca

Please sign in to comment.