diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index ef013bc22a9fb..13ce5ccabe60f 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -56,11 +56,11 @@ export function getDefaultConfig(): PluginsServerConfig { KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: null, KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: 30_000, KAFKA_TOPIC_CREATION_TIMEOUT_MS: isDevEnv() ? 30_000 : 5_000, // rdkafka default is 5s, increased in devenv to resist to slow kafka - KAFKA_PRODUCER_MAX_QUEUE_SIZE: isTestEnv() ? 0 : 1000, - KAFKA_MAX_MESSAGE_BATCH_SIZE: isDevEnv() ? 0 : 900_000, KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500, APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: isTestEnv() ? 5 : 1000, + KAFKA_PRODUCER_LINGER_MS: 20, // rdkafka default is 5ms + KAFKA_PRODUCER_BATCH_SIZE: 8 * 1024 * 1024, // rdkafka default is 1MiB REDIS_URL: 'redis://127.0.0.1', POSTHOG_REDIS_PASSWORD: '', POSTHOG_REDIS_HOST: '', diff --git a/plugin-server/src/kafka/config.ts b/plugin-server/src/kafka/config.ts index 5d609b1768019..64b24402ee809 100644 --- a/plugin-server/src/kafka/config.ts +++ b/plugin-server/src/kafka/config.ts @@ -2,6 +2,7 @@ import { GlobalConfig } from 'node-rdkafka-acosom' import { hostname } from 'os' import { KafkaConfig } from '../utils/db/hub' +import { KafkaProducerConfig } from './producer' export const RDKAFKA_LOG_LEVEL_MAPPING = { NOTHING: 0, @@ -43,3 +44,10 @@ export const createRdConnectionConfigFromEnvVars = (kafkaConfig: KafkaConfig): G return config } + +export const createRdProducerConfigFromEnvVars = (producerConfig: KafkaProducerConfig): KafkaProducerConfig => { + return { + KAFKA_PRODUCER_LINGER_MS: producerConfig.KAFKA_PRODUCER_LINGER_MS, + KAFKA_PRODUCER_BATCH_SIZE: producerConfig.KAFKA_PRODUCER_BATCH_SIZE, + } +} diff --git a/plugin-server/src/kafka/producer.ts b/plugin-server/src/kafka/producer.ts index f08d71a229e06..4b373fd2c734f 100644 --- a/plugin-server/src/kafka/producer.ts +++ b/plugin-server/src/kafka/producer.ts @@ -11,33 +11,34 @@ import { import { getSpan } from '../sentry' import { status } from '../utils/status' +export type KafkaProducerConfig = { + KAFKA_PRODUCER_LINGER_MS: number + KAFKA_PRODUCER_BATCH_SIZE: number +} + // Kafka production related functions using node-rdkafka. -export const createKafkaProducer = async (config: ProducerGlobalConfig) => { +export const createKafkaProducer = async (globalConfig: ProducerGlobalConfig, producerConfig: KafkaProducerConfig) => { const producer = new RdKafkaProducer({ - // milliseconds to wait after the most recently added message before - // sending a batch. The default is 0, which means that messages are sent - // as soon as possible. This does not mean that there will only be one - // message per batch, as the producer will attempt to fill batches up to - // the batch size while the number of Kafka inflight requests is - // saturated, by default 5 inflight requests. - 'linger.ms': 20, - // The default is 1MiB. - 'batch.size': 8 * 1024 * 1024, + // milliseconds to wait after the most recently added message before sending a batch. The + // default is 0, which means that messages are sent as soon as possible. This does not mean + // that there will only be one message per batch, as the producer will attempt to fill + // batches up to the batch size while the number of Kafka inflight requests is saturated, by + // default 5 inflight requests. + 'linger.ms': producerConfig.KAFKA_PRODUCER_LINGER_MS, + 'batch.size': producerConfig.KAFKA_PRODUCER_BATCH_SIZE, 'compression.codec': 'snappy', - // Ensure that librdkafka handled producer retries do not produce - // duplicates. Note this doesn't mean that if we manually retry a - // message that it will be idempotent. May reduce throughput. Note that - // at the time of writing the session recording events table in - // ClickHouse uses a `ReplicatedReplacingMergeTree` with a ver param of - // _timestamp i.e. when the event was added to the Kafka ingest topic. - // The sort key is `team_id, toHour(timestamp), session_id, timestamp, - // uuid` which means duplicate production of the same event _should_ be - // deduplicated when merges occur on the table. This isn't a guarantee - // on removing duplicates though and rather still requires deduplication - // either when querying the table or client side. + // Ensure that librdkafka handled producer retries do not produce duplicates. Note this + // doesn't mean that if we manually retry a message that it will be idempotent. May reduce + // throughput. Note that at the time of writing the session recording events table in + // ClickHouse uses a `ReplicatedReplacingMergeTree` with a ver param of _timestamp i.e. when + // the event was added to the Kafka ingest topic. The sort key is `team_id, + // toHour(timestamp), session_id, timestamp, uuid` which means duplicate production of the + // same event _should_ be deduplicated when merges occur on the table. This isn't a + // guarantee on removing duplicates though and rather still requires deduplication either + // when querying the table or client side. 'enable.idempotence': true, dr_cb: true, - ...config, + ...globalConfig, }) producer.on('event.log', function (log) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index 9356582f4a277..68abeb224c1eb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -3,7 +3,7 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node- import { Counter } from 'prom-client' import { KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics' -import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config' import { findOffsetsToCommit } from '../../../../kafka/consumer' import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling' import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' @@ -147,7 +147,8 @@ export class ConsoleLogsIngester { } public async start(): Promise { const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) - this.producer = await createKafkaProducer(connectionConfig) + const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig) + this.producer = await createKafkaProducer(connectionConfig, producerConfig) this.producer.connect() } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index 78d15f10406fa..5be4d3411d260 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -5,7 +5,7 @@ import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node- import { Counter } from 'prom-client' import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics' -import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../../kafka/config' import { findOffsetsToCommit } from '../../../../kafka/consumer' import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling' import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer' @@ -176,7 +176,8 @@ export class ReplayEventsIngester { } public async start(): Promise { const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) - this.producer = await createKafkaProducer(connectionConfig) + const producerConfig = createRdProducerConfigFromEnvVars(this.serverConfig) + this.producer = await createKafkaProducer(connectionConfig, producerConfig) this.producer.connect() } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts index 81b4fc9ec2be8..ee4d77a156bbe 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v1.ts @@ -11,9 +11,15 @@ import { KAFKA_SESSION_RECORDING_EVENTS_DLQ, } from '../../../config/kafka-topics' import { startBatchConsumer } from '../../../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' import { retryOnDependencyUnavailableError } from '../../../kafka/error-handling' -import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../kafka/producer' +import { + createKafkaProducer, + disconnectProducer, + flushProducer, + KafkaProducerConfig, + produce, +} from '../../../kafka/producer' import { PipelineEvent, RawEventMessage, Team } from '../../../types' import { KafkaConfig } from '../../../utils/db/hub' import { status } from '../../../utils/status' @@ -30,6 +36,7 @@ import { eventDroppedCounter } from '../metrics' export const startSessionRecordingEventsConsumerV1 = async ({ teamManager, kafkaConfig, + kafkaProducerConfig, consumerMaxBytes, consumerMaxBytesPerPartition, consumerMaxWaitMs, @@ -39,6 +46,7 @@ export const startSessionRecordingEventsConsumerV1 = async ({ }: { teamManager: TeamManager kafkaConfig: KafkaConfig + kafkaProducerConfig: KafkaProducerConfig consumerMaxBytes: number consumerMaxBytesPerPartition: number consumerMaxWaitMs: number @@ -72,7 +80,8 @@ export const startSessionRecordingEventsConsumerV1 = async ({ status.info('🔁', 'Starting session recordings consumer') const connectionConfig = createRdConnectionConfigFromEnvVars(kafkaConfig) - const producer = await createKafkaProducer(connectionConfig) + const producerConfig = createRdProducerConfigFromEnvVars(kafkaProducerConfig) + const producer = await createKafkaProducer(connectionConfig, producerConfig) const eachBatchWithContext = eachBatch({ teamManager, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 9f0d34a7d8e86..20119c7f08542 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -413,6 +413,7 @@ export async function startPluginsServer( } = await startSessionRecordingEventsConsumerV1({ teamManager: teamManager, kafkaConfig: serverConfig, + kafkaProducerConfig: serverConfig, consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES, consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS, diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 22242dc00f4ba..1f8c868027ab6 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -139,8 +139,8 @@ export interface PluginsServerConfig { KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: number | null KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: number KAFKA_TOPIC_CREATION_TIMEOUT_MS: number - KAFKA_PRODUCER_MAX_QUEUE_SIZE: number - KAFKA_MAX_MESSAGE_BATCH_SIZE: number + KAFKA_PRODUCER_LINGER_MS: number // linger.ms rdkafka parameter + KAFKA_PRODUCER_BATCH_SIZE: number // batch.size rdkafka parameter KAFKA_FLUSH_FREQUENCY_MS: number APP_METRICS_FLUSH_FREQUENCY_MS: number APP_METRICS_FLUSH_MAX_QUEUE_SIZE: number diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index d226c0f91f8f8..71509e1041560 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -13,7 +13,7 @@ import { getPluginServerCapabilities } from '../../capabilities' import { buildIntegerMatcher, defaultConfig } from '../../config/config' import { KAFKAJS_LOG_LEVEL_MAPPING } from '../../config/constants' import { KAFKA_JOBS } from '../../config/kafka-topics' -import { createRdConnectionConfigFromEnvVars } from '../../kafka/config' +import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../kafka/config' import { createKafkaProducer } from '../../kafka/producer' import { getObjectStorage } from '../../main/services/object_storage' import { @@ -53,7 +53,8 @@ pgTypes.setTypeParser(1184 /* types.TypeId.TIMESTAMPTZ */, (timeStr) => export async function createKafkaProducerWrapper(serverConfig: PluginsServerConfig): Promise { const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig) - const producer = await createKafkaProducer({ ...kafkaConnectionConfig }) + const producerConfig = createRdProducerConfigFromEnvVars(serverConfig) + const producer = await createKafkaProducer(kafkaConnectionConfig, producerConfig) return new KafkaProducerWrapper(producer) }