Skip to content

Commit

Permalink
chore: Debugging options for V3 ingester (#20569)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Feb 27, 2024
1 parent ed7e5db commit 47b2882
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 96 deletions.
10 changes: 10 additions & 0 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ services:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: 'true'

kafka_ui:
image: provectuslabs/kafka-ui:latest
restart: on-failure
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
DYNAMIC_CONFIG_ENABLED: 'true'

object_storage:
image: minio/minio:RELEASE.2022-06-25T15-50-16Z
restart: on-failure
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ services:
ports:
- '9092:9092'

kafka_ui:
extends:
file: docker-compose.base.yml
service: kafka_ui
ports:
- '9093:8080'

object_storage:
extends:
file: docker-compose.base.yml
Expand Down
73 changes: 36 additions & 37 deletions plugin-server/README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ export function getDefaultConfig(): PluginsServerConfig {
PISCINA_ATOMICS_TIMEOUT: 5000,
SITE_URL: null,
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1,
RECORDING_PARTITIONS_CONSUMED_CONCURRENTLY: 5,
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '',
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
CONVERSION_BUFFER_ENABLED: false,
Expand Down Expand Up @@ -161,6 +160,7 @@ export function getDefaultConfig(): PluginsServerConfig {
POSTHOG_SESSION_RECORDING_REDIS_PORT: undefined,
SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED: true,
SESSION_RECORDING_DEBUG_PARTITION: undefined,
SESSION_RECORDING_KAFKA_DEBUG: undefined,
}
}

Expand Down
121 changes: 64 additions & 57 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka'
import { ConsumerGlobalConfig, GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka'
import { exponentialBuckets, Gauge, Histogram } from 'prom-client'

import { retryIfRetriable } from '../utils/retries'
Expand Down Expand Up @@ -69,6 +69,7 @@ export const startBatchConsumer = async ({
topicCreationTimeoutMs,
eachBatch,
queuedMinMessages = 100000,
debug,
}: {
connectionConfig: GlobalConfig
groupId: string
Expand All @@ -85,6 +86,7 @@ export const startBatchConsumer = async ({
topicCreationTimeoutMs: number
eachBatch: (messages: Message[]) => Promise<void>
queuedMinMessages?: number
debug?: string
}): Promise<BatchConsumer> => {
// Starts consuming from `topic` in batches of `fetchBatchSize` messages,
// with consumer group id `groupId`. We use `connectionConfig` to connect
Expand All @@ -106,62 +108,67 @@ export const startBatchConsumer = async ({
//
// We also instrument the consumer with Prometheus metrics, which are
// exposed on the /_metrics endpoint by the global prom-client registry.
const consumer = await createKafkaConsumer(
{
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
'max.poll.interval.ms': maxPollIntervalMs,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
* max.partition.fetch.bytes
* The maximum amount of data per-partition the server will return.
* Records are fetched in batches by the consumer.
* If the first record batch in the first non-empty partition of the fetch is larger than this limit,
* the batch will still be returned to ensure that the consumer can make progress.
* The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config)
* or max.message.bytes (topic config).
* https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#:~:text=max.partition.fetch.bytes,the%20consumer%20can%20make%20progress.
*/
'max.partition.fetch.bytes': consumerMaxBytesPerPartition,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L122
// Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
'fetch.message.max.bytes': consumerMaxBytes,
'fetch.wait.max.ms': consumerMaxWaitMs,
'fetch.error.backoff.ms': consumerErrorBackoffMs,
'enable.partition.eof': true,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L118
// Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue
'queued.min.messages': queuedMinMessages, // 100000 is the default
'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage.
// Use cooperative-sticky rebalancing strategy, which is the
// [default](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy)
// in the Java Kafka Client. There its actually
// RangeAssignor,CooperativeStickyAssignor i.e. it mixes eager and
// cooperative strategies. This is however explicitly mentioned to not
// be supported in the [librdkafka library config
// docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#partitionassignmentstrategy)
// so we just use cooperative-sticky. If there are other consumer
// members with other strategies already running, you'll need to delete
// e.g. the replicaset for them if on k8s.
//
// See
// https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/
// for details on the advantages of this rebalancing strategy as well as
// how it works.
'partition.assignment.strategy': 'cooperative-sticky',
rebalance_cb: true,
offset_commit_cb: true,
},
{
// It is typically safest to roll back to the earliest offset if we
// find ourselves in a situation where there is no stored offset or
// the stored offset is invalid, compared to the default behavior of
// potentially jumping ahead to the latest offset.
'auto.offset.reset': 'earliest',
}
)

const consumerConfig: ConsumerGlobalConfig = {
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
'max.poll.interval.ms': maxPollIntervalMs,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
* max.partition.fetch.bytes
* The maximum amount of data per-partition the server will return.
* Records are fetched in batches by the consumer.
* If the first record batch in the first non-empty partition of the fetch is larger than this limit,
* the batch will still be returned to ensure that the consumer can make progress.
* The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config)
* or max.message.bytes (topic config).
* https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#:~:text=max.partition.fetch.bytes,the%20consumer%20can%20make%20progress.
*/
'max.partition.fetch.bytes': consumerMaxBytesPerPartition,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L122
// Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
'fetch.message.max.bytes': consumerMaxBytes,
'fetch.wait.max.ms': consumerMaxWaitMs,
'fetch.error.backoff.ms': consumerErrorBackoffMs,
'enable.partition.eof': true,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L118
// Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue
'queued.min.messages': queuedMinMessages, // 100000 is the default
'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage.
// Use cooperative-sticky rebalancing strategy, which is the
// [default](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy)
// in the Java Kafka Client. There its actually
// RangeAssignor,CooperativeStickyAssignor i.e. it mixes eager and
// cooperative strategies. This is however explicitly mentioned to not
// be supported in the [librdkafka library config
// docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#partitionassignmentstrategy)
// so we just use cooperative-sticky. If there are other consumer
// members with other strategies already running, you'll need to delete
// e.g. the replicaset for them if on k8s.
//
// See
// https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/
// for details on the advantages of this rebalancing strategy as well as
// how it works.
'partition.assignment.strategy': 'cooperative-sticky',
rebalance_cb: true,
offset_commit_cb: true,
}

if (debug) {
// NOTE: If the key exists with value undefined the consumer will throw which is annoying so we define it here instead
consumerConfig.debug = debug
}

const consumer = await createKafkaConsumer(consumerConfig, {
// It is typically safest to roll back to the earliest offset if we
// find ourselves in a situation where there is no stored offset or
// the stored offset is invalid, compared to the default behavior of
// potentially jumping ahead to the latest offset.
'auto.offset.reset': 'earliest',
})

instrumentConsumerMetrics(consumer, groupId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export class SessionRecordingIngesterV3 {
eachBatch: async (messages) => {
return await this.scheduleWork(this.handleEachBatch(messages))
},
debug: this.config.SESSION_RECORDING_KAFKA_DEBUG,
})

addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ export interface PluginsServerConfig {
PISCINA_ATOMICS_TIMEOUT: number // (advanced) corresponds to the length of time a piscina worker should block for when looking for tasks
SITE_URL: string | null
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: number // (advanced) how many kafka partitions the plugin server should consume from concurrently
RECORDING_PARTITIONS_CONSUMED_CONCURRENTLY: number
CONVERSION_BUFFER_ENABLED: boolean
CONVERSION_BUFFER_ENABLED_TEAMS: string
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: string
Expand Down Expand Up @@ -236,6 +235,7 @@ export interface PluginsServerConfig {
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined
SESSION_RECORDING_KAFKA_BATCH_SIZE: number
SESSION_RECORDING_KAFKA_QUEUE_SIZE: number
SESSION_RECORDING_KAFKA_DEBUG: string | undefined

POSTHOG_SESSION_RECORDING_REDIS_HOST: string | undefined
POSTHOG_SESSION_RECORDING_REDIS_PORT: number | undefined
Expand Down

0 comments on commit 47b2882

Please sign in to comment.