Skip to content

Commit

Permalink
chore(plugin-server): kafka: change analytics linger.ms and batch.siz… (
Browse files Browse the repository at this point in the history
#17820)

* chore(plugin-server): kafka: change analytics linger.ms and batch.size for all producers

* add envvars
  • Loading branch information
bretthoerner authored Oct 9, 2023
1 parent febb718 commit 951cf4f
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 36 deletions.
4 changes: 2 additions & 2 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: null,
KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: 30_000,
KAFKA_TOPIC_CREATION_TIMEOUT_MS: isDevEnv() ? 30_000 : 5_000, // rdkafka default is 5s, increased in devenv to resist to slow kafka
KAFKA_PRODUCER_MAX_QUEUE_SIZE: isTestEnv() ? 0 : 1000,
KAFKA_MAX_MESSAGE_BATCH_SIZE: isDevEnv() ? 0 : 900_000,
KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500,
APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000,
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: isTestEnv() ? 5 : 1000,
KAFKA_PRODUCER_LINGER_MS: 20, // rdkafka default is 5ms
KAFKA_PRODUCER_BATCH_SIZE: 8 * 1024 * 1024, // rdkafka default is 1MiB
REDIS_URL: 'redis://127.0.0.1',
POSTHOG_REDIS_PASSWORD: '',
POSTHOG_REDIS_HOST: '',
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/src/kafka/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { GlobalConfig } from 'node-rdkafka-acosom'
import { hostname } from 'os'

import { KafkaConfig } from '../utils/db/hub'
import { KafkaProducerConfig } from './producer'

export const RDKAFKA_LOG_LEVEL_MAPPING = {
NOTHING: 0,
Expand Down Expand Up @@ -43,3 +44,10 @@ export const createRdConnectionConfigFromEnvVars = (kafkaConfig: KafkaConfig): G

return config
}

export const createRdProducerConfigFromEnvVars = (producerConfig: KafkaProducerConfig): KafkaProducerConfig => {
return {
KAFKA_PRODUCER_LINGER_MS: producerConfig.KAFKA_PRODUCER_LINGER_MS,
KAFKA_PRODUCER_BATCH_SIZE: producerConfig.KAFKA_PRODUCER_BATCH_SIZE,
}
}
46 changes: 23 additions & 23 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,34 @@ import {
import { getSpan } from '../sentry'
import { status } from '../utils/status'

export type KafkaProducerConfig = {
KAFKA_PRODUCER_LINGER_MS: number
KAFKA_PRODUCER_BATCH_SIZE: number
}

// Kafka production related functions using node-rdkafka.
export const createKafkaProducer = async (config: ProducerGlobalConfig) => {
export const createKafkaProducer = async (globalConfig: ProducerGlobalConfig, producerConfig: KafkaProducerConfig) => {
const producer = new RdKafkaProducer({
// milliseconds to wait after the most recently added message before
// sending a batch. The default is 0, which means that messages are sent
// as soon as possible. This does not mean that there will only be one
// message per batch, as the producer will attempt to fill batches up to
// the batch size while the number of Kafka inflight requests is
// saturated, by default 5 inflight requests.
'linger.ms': 20,
// The default is 16kb. 1024kb also seems quite small for our use case
// but at least larger than the default.
'batch.size': 1024 * 1024,
// milliseconds to wait after the most recently added message before sending a batch. The
// default is 0, which means that messages are sent as soon as possible. This does not mean
// that there will only be one message per batch, as the producer will attempt to fill
// batches up to the batch size while the number of Kafka inflight requests is saturated, by
// default 5 inflight requests.
'linger.ms': producerConfig.KAFKA_PRODUCER_LINGER_MS,
'batch.size': producerConfig.KAFKA_PRODUCER_BATCH_SIZE,
'compression.codec': 'snappy',
// Ensure that librdkafka handled producer retries do not produce
// duplicates. Note this doesn't mean that if we manually retry a
// message that it will be idempotent. May reduce throughput. Note that
// at the time of writing the session recording events table in
// ClickHouse uses a `ReplicatedReplacingMergeTree` with a ver param of
// _timestamp i.e. when the event was added to the Kafka ingest topic.
// The sort key is `team_id, toHour(timestamp), session_id, timestamp,
// uuid` which means duplicate production of the same event _should_ be
// deduplicated when merges occur on the table. This isn't a guarantee
// on removing duplicates though and rather still requires deduplication
// either when querying the table or client side.
// Ensure that librdkafka handled producer retries do not produce duplicates. Note this
// doesn't mean that if we manually retry a message that it will be idempotent. May reduce
// throughput. Note that at the time of writing the session recording events table in
// ClickHouse uses a `ReplicatedReplacingMergeTree` with a ver param of _timestamp i.e. when
// the event was added to the Kafka ingest topic. The sort key is `team_id,
// toHour(timestamp), session_id, timestamp, uuid` which means duplicate production of the
// same event _should_ be deduplicated when merges occur on the table. This isn't a
// guarantee on removing duplicates though and rather still requires deduplication either
// when querying the table or client side.
'enable.idempotence': true,
dr_cb: true,
...config,
...globalConfig,
})

producer.on('event.log', function (log) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-
import { Counter } from 'prom-client'

import { KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
Expand Down Expand Up @@ -147,7 +147,8 @@ export class ConsoleLogsIngester {
}
public async start(): Promise<void> {
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig)
const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig, producerConfig)
this.producer.connect()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-
import { Counter } from 'prom-client'

import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
Expand Down Expand Up @@ -176,7 +176,8 @@ export class ReplayEventsIngester {
}
public async start(): Promise<void> {
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig)
const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig, producerConfig)
this.producer.connect()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ import {
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
} from '../../../config/kafka-topics'
import { startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config'
import { retryOnDependencyUnavailableError } from '../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../kafka/producer'
import {
createKafkaProducer,
disconnectProducer,
flushProducer,
KafkaProducerConfig,
produce,
} from '../../../kafka/producer'
import { PipelineEvent, RawEventMessage, Team } from '../../../types'
import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
Expand All @@ -30,6 +36,7 @@ import { eventDroppedCounter } from '../metrics'
export const startSessionRecordingEventsConsumerV1 = async ({
teamManager,
kafkaConfig,
kafkaProducerConfig,
consumerMaxBytes,
consumerMaxBytesPerPartition,
consumerMaxWaitMs,
Expand All @@ -39,6 +46,7 @@ export const startSessionRecordingEventsConsumerV1 = async ({
}: {
teamManager: TeamManager
kafkaConfig: KafkaConfig
kafkaProducerConfig: KafkaProducerConfig
consumerMaxBytes: number
consumerMaxBytesPerPartition: number
consumerMaxWaitMs: number
Expand Down Expand Up @@ -72,7 +80,8 @@ export const startSessionRecordingEventsConsumerV1 = async ({
status.info('🔁', 'Starting session recordings consumer')

const connectionConfig = createRdConnectionConfigFromEnvVars(kafkaConfig)
const producer = await createKafkaProducer(connectionConfig)
const producerConfig = createRdProducerConfigFromEnvVars(kafkaProducerConfig)
const producer = await createKafkaProducer(connectionConfig, producerConfig)

const eachBatchWithContext = eachBatch({
teamManager,
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ export async function startPluginsServer(
} = await startSessionRecordingEventsConsumerV1({
teamManager: teamManager,
kafkaConfig: serverConfig,
kafkaProducerConfig: serverConfig,
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ export interface PluginsServerConfig {
KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: number | null
KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: number
KAFKA_TOPIC_CREATION_TIMEOUT_MS: number
KAFKA_PRODUCER_MAX_QUEUE_SIZE: number
KAFKA_MAX_MESSAGE_BATCH_SIZE: number
KAFKA_PRODUCER_LINGER_MS: number // linger.ms rdkafka parameter
KAFKA_PRODUCER_BATCH_SIZE: number // batch.size rdkafka parameter
KAFKA_FLUSH_FREQUENCY_MS: number
APP_METRICS_FLUSH_FREQUENCY_MS: number
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: number
Expand Down
5 changes: 3 additions & 2 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { getPluginServerCapabilities } from '../../capabilities'
import { buildIntegerMatcher, defaultConfig } from '../../config/config'
import { KAFKAJS_LOG_LEVEL_MAPPING } from '../../config/constants'
import { KAFKA_JOBS } from '../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../kafka/config'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../kafka/config'
import { createKafkaProducer } from '../../kafka/producer'
import { getObjectStorage } from '../../main/services/object_storage'
import {
Expand Down Expand Up @@ -53,7 +53,8 @@ pgTypes.setTypeParser(1184 /* types.TypeId.TIMESTAMPTZ */, (timeStr) =>

export async function createKafkaProducerWrapper(serverConfig: PluginsServerConfig): Promise<KafkaProducerWrapper> {
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })
const producerConfig = createRdProducerConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer(kafkaConnectionConfig, producerConfig)
return new KafkaProducerWrapper(producer)
}

Expand Down

0 comments on commit 951cf4f

Please sign in to comment.