diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 22cd812fe2790..f70790f0d0826 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -47,6 +47,16 @@ services: KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 ALLOW_PLAINTEXT_LISTENER: 'true' + kafka_ui: + image: provectuslabs/kafka-ui:latest + restart: on-failure + depends_on: + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + DYNAMIC_CONFIG_ENABLED: 'true' + object_storage: image: minio/minio:RELEASE.2022-06-25T15-50-16Z restart: on-failure diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index b4601915c95c3..a3900fbbc9ba6 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -63,6 +63,13 @@ services: ports: - '9092:9092' + kafka_ui: + extends: + file: docker-compose.base.yml + service: kafka_ui + ports: + - '9093:8080' + object_storage: extends: file: docker-compose.base.yml diff --git a/plugin-server/README.md b/plugin-server/README.md index 05ddf1e85c477..0272937b9feb6 100644 --- a/plugin-server/README.md +++ b/plugin-server/README.md @@ -104,43 +104,42 @@ If `PLUGIN_SERVER_MODE` is not set the plugin server will execute all of its tas There's a multitude of settings you can use to control the plugin server. Use them as environment variables. -| Name | Description | Default value | -| ------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------- | -| DATABASE_URL | Postgres database URL | `'postgres://localhost:5432/posthog'` | -| REDIS_URL | Redis store URL | `'redis://localhost'` | -| BASE_DIR | base path for resolving local plugins | `'.'` | -| WORKER_CONCURRENCY | number of concurrent worker threads | `0` – all cores | -| TASKS_PER_WORKER | number of parallel tasks per worker thread | `10` | -| REDIS_POOL_MIN_SIZE | minimum number of Redis connections to use per thread | `1` | -| REDIS_POOL_MAX_SIZE | maximum number of Redis connections to use per thread | `3` | -| SCHEDULE_LOCK_TTL | how many seconds to hold the lock for the schedule | `60` | -| PLUGINS_RELOAD_PUBSUB_CHANNEL | Redis channel for reload events | `'reload-plugins'` | -| CLICKHOUSE_HOST | ClickHouse host | `'localhost'` | -| CLICKHOUSE_OFFLINE_CLUSTER_HOST | ClickHouse host to use for offline workloads. Falls back to CLICKHOUSE_HOST | `null` | -| CLICKHOUSE_DATABASE | ClickHouse database | `'default'` | -| CLICKHOUSE_USER | ClickHouse username | `'default'` | -| CLICKHOUSE_PASSWORD | ClickHouse password | `null` | -| CLICKHOUSE_CA | ClickHouse CA certs | `null` | -| CLICKHOUSE_SECURE | whether to secure ClickHouse connection | `false` | -| KAFKA_HOSTS | comma-delimited Kafka hosts | `null` | -| KAFKA_CONSUMPTION_TOPIC | Kafka incoming events topic | `'events_plugin_ingestion'` | -| KAFKA_CLIENT_CERT_B64 | Kafka certificate in Base64 | `null` | -| KAFKA_CLIENT_CERT_KEY_B64 | Kafka certificate key in Base64 | `null` | -| KAFKA_TRUSTED_CERT_B64 | Kafka trusted CA in Base64 | `null` | -| KAFKA_PRODUCER_MAX_QUEUE_SIZE | Kafka producer batch max size before flushing | `20` | -| KAFKA_FLUSH_FREQUENCY_MS | Kafka producer batch max duration before flushing | `500` | -| KAFKA_MAX_MESSAGE_BATCH_SIZE | Kafka producer batch max size in bytes before flushing | `900000` | -| LOG_LEVEL | minimum log level | `'info'` | -| SENTRY_DSN | Sentry ingestion URL | `null` | -| DISABLE_MMDB | whether to disable MMDB IP location capabilities | `false` | -| INTERNAL_MMDB_SERVER_PORT | port of the internal server used for IP location (0 means random) | `0` | -| DISTINCT_ID_LRU_SIZE | size of persons distinct ID LRU cache | `10000` | -| PISCINA_USE_ATOMICS | corresponds to the piscina useAtomics config option (https://github.com/piscinajs/piscina#constructor-new-piscinaoptions) | `true` | -| PISCINA_ATOMICS_TIMEOUT | (advanced) corresponds to the length of time (in ms) a piscina worker should block for when looking for tasks - instances with high volumes (100+ events/sec) might benefit from setting this to a lower value | `5000` | -| HEALTHCHECK_MAX_STALE_SECONDS | 'maximum number of seconds the plugin server can go without ingesting events before the healthcheck fails' | `7200` | -| KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY | (advanced) how many kafka partitions the plugin server should consume from concurrently | `1` | -| RECORDING_PARTITIONS_CONSUMED_CONCURRENTLY | (advanced) how many kafka partitions the recordings consumer should consume from concurrently | `1` | -| PLUGIN_SERVER_MODE | (advanced) see alternative modes section | `null` | +| Name | Description | Default value | +| -------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------- | +| DATABASE_URL | Postgres database URL | `'postgres://localhost:5432/posthog'` | +| REDIS_URL | Redis store URL | `'redis://localhost'` | +| BASE_DIR | base path for resolving local plugins | `'.'` | +| WORKER_CONCURRENCY | number of concurrent worker threads | `0` – all cores | +| TASKS_PER_WORKER | number of parallel tasks per worker thread | `10` | +| REDIS_POOL_MIN_SIZE | minimum number of Redis connections to use per thread | `1` | +| REDIS_POOL_MAX_SIZE | maximum number of Redis connections to use per thread | `3` | +| SCHEDULE_LOCK_TTL | how many seconds to hold the lock for the schedule | `60` | +| PLUGINS_RELOAD_PUBSUB_CHANNEL | Redis channel for reload events | `'reload-plugins'` | +| CLICKHOUSE_HOST | ClickHouse host | `'localhost'` | +| CLICKHOUSE_OFFLINE_CLUSTER_HOST | ClickHouse host to use for offline workloads. Falls back to CLICKHOUSE_HOST | `null` | +| CLICKHOUSE_DATABASE | ClickHouse database | `'default'` | +| CLICKHOUSE_USER | ClickHouse username | `'default'` | +| CLICKHOUSE_PASSWORD | ClickHouse password | `null` | +| CLICKHOUSE_CA | ClickHouse CA certs | `null` | +| CLICKHOUSE_SECURE | whether to secure ClickHouse connection | `false` | +| KAFKA_HOSTS | comma-delimited Kafka hosts | `null` | +| KAFKA_CONSUMPTION_TOPIC | Kafka incoming events topic | `'events_plugin_ingestion'` | +| KAFKA_CLIENT_CERT_B64 | Kafka certificate in Base64 | `null` | +| KAFKA_CLIENT_CERT_KEY_B64 | Kafka certificate key in Base64 | `null` | +| KAFKA_TRUSTED_CERT_B64 | Kafka trusted CA in Base64 | `null` | +| KAFKA_PRODUCER_MAX_QUEUE_SIZE | Kafka producer batch max size before flushing | `20` | +| KAFKA_FLUSH_FREQUENCY_MS | Kafka producer batch max duration before flushing | `500` | +| KAFKA_MAX_MESSAGE_BATCH_SIZE | Kafka producer batch max size in bytes before flushing | `900000` | +| LOG_LEVEL | minimum log level | `'info'` | +| SENTRY_DSN | Sentry ingestion URL | `null` | +| DISABLE_MMDB | whether to disable MMDB IP location capabilities | `false` | +| INTERNAL_MMDB_SERVER_PORT | port of the internal server used for IP location (0 means random) | `0` | +| DISTINCT_ID_LRU_SIZE | size of persons distinct ID LRU cache | `10000` | +| PISCINA_USE_ATOMICS | corresponds to the piscina useAtomics config option (https://github.com/piscinajs/piscina#constructor-new-piscinaoptions) | `true` | +| PISCINA_ATOMICS_TIMEOUT | (advanced) corresponds to the length of time (in ms) a piscina worker should block for when looking for tasks - instances with high volumes (100+ events/sec) might benefit from setting this to a lower value | `5000` | +| HEALTHCHECK_MAX_STALE_SECONDS | 'maximum number of seconds the plugin server can go without ingesting events before the healthcheck fails' | `7200` | +| KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY | (advanced) how many kafka partitions the plugin server should consume from concurrently | `1` | +| PLUGIN_SERVER_MODE | (advanced) see alternative modes section | `null` | ## Releasing a new version diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 3c8347f579013..05781c8b68f91 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -102,7 +102,6 @@ export function getDefaultConfig(): PluginsServerConfig { PISCINA_ATOMICS_TIMEOUT: 5000, SITE_URL: null, KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1, - RECORDING_PARTITIONS_CONSUMED_CONCURRENTLY: 5, CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '', CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON, CONVERSION_BUFFER_ENABLED: false, @@ -161,6 +160,7 @@ export function getDefaultConfig(): PluginsServerConfig { POSTHOG_SESSION_RECORDING_REDIS_PORT: undefined, SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED: true, SESSION_RECORDING_DEBUG_PARTITION: undefined, + SESSION_RECORDING_KAFKA_DEBUG: undefined, } } diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 2b6720ba4ea08..a4bbc5b211d33 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -1,4 +1,4 @@ -import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka' +import { ConsumerGlobalConfig, GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka' import { exponentialBuckets, Gauge, Histogram } from 'prom-client' import { retryIfRetriable } from '../utils/retries' @@ -69,6 +69,7 @@ export const startBatchConsumer = async ({ topicCreationTimeoutMs, eachBatch, queuedMinMessages = 100000, + debug, }: { connectionConfig: GlobalConfig groupId: string @@ -85,6 +86,7 @@ export const startBatchConsumer = async ({ topicCreationTimeoutMs: number eachBatch: (messages: Message[]) => Promise queuedMinMessages?: number + debug?: string }): Promise => { // Starts consuming from `topic` in batches of `fetchBatchSize` messages, // with consumer group id `groupId`. We use `connectionConfig` to connect @@ -106,62 +108,67 @@ export const startBatchConsumer = async ({ // // We also instrument the consumer with Prometheus metrics, which are // exposed on the /_metrics endpoint by the global prom-client registry. - const consumer = await createKafkaConsumer( - { - ...connectionConfig, - 'group.id': groupId, - 'session.timeout.ms': sessionTimeout, - 'max.poll.interval.ms': maxPollIntervalMs, - 'enable.auto.commit': autoCommit, - 'enable.auto.offset.store': false, - /** - * max.partition.fetch.bytes - * The maximum amount of data per-partition the server will return. - * Records are fetched in batches by the consumer. - * If the first record batch in the first non-empty partition of the fetch is larger than this limit, - * the batch will still be returned to ensure that the consumer can make progress. - * The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) - * or max.message.bytes (topic config). - * https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#:~:text=max.partition.fetch.bytes,the%20consumer%20can%20make%20progress. - */ - 'max.partition.fetch.bytes': consumerMaxBytesPerPartition, - // https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L122 - // Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. - 'fetch.message.max.bytes': consumerMaxBytes, - 'fetch.wait.max.ms': consumerMaxWaitMs, - 'fetch.error.backoff.ms': consumerErrorBackoffMs, - 'enable.partition.eof': true, - // https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L118 - // Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue - 'queued.min.messages': queuedMinMessages, // 100000 is the default - 'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage. - // Use cooperative-sticky rebalancing strategy, which is the - // [default](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy) - // in the Java Kafka Client. There its actually - // RangeAssignor,CooperativeStickyAssignor i.e. it mixes eager and - // cooperative strategies. This is however explicitly mentioned to not - // be supported in the [librdkafka library config - // docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#partitionassignmentstrategy) - // so we just use cooperative-sticky. If there are other consumer - // members with other strategies already running, you'll need to delete - // e.g. the replicaset for them if on k8s. - // - // See - // https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/ - // for details on the advantages of this rebalancing strategy as well as - // how it works. - 'partition.assignment.strategy': 'cooperative-sticky', - rebalance_cb: true, - offset_commit_cb: true, - }, - { - // It is typically safest to roll back to the earliest offset if we - // find ourselves in a situation where there is no stored offset or - // the stored offset is invalid, compared to the default behavior of - // potentially jumping ahead to the latest offset. - 'auto.offset.reset': 'earliest', - } - ) + + const consumerConfig: ConsumerGlobalConfig = { + ...connectionConfig, + 'group.id': groupId, + 'session.timeout.ms': sessionTimeout, + 'max.poll.interval.ms': maxPollIntervalMs, + 'enable.auto.commit': autoCommit, + 'enable.auto.offset.store': false, + /** + * max.partition.fetch.bytes + * The maximum amount of data per-partition the server will return. + * Records are fetched in batches by the consumer. + * If the first record batch in the first non-empty partition of the fetch is larger than this limit, + * the batch will still be returned to ensure that the consumer can make progress. + * The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) + * or max.message.bytes (topic config). + * https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#:~:text=max.partition.fetch.bytes,the%20consumer%20can%20make%20progress. + */ + 'max.partition.fetch.bytes': consumerMaxBytesPerPartition, + // https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L122 + // Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. + 'fetch.message.max.bytes': consumerMaxBytes, + 'fetch.wait.max.ms': consumerMaxWaitMs, + 'fetch.error.backoff.ms': consumerErrorBackoffMs, + 'enable.partition.eof': true, + // https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L118 + // Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue + 'queued.min.messages': queuedMinMessages, // 100000 is the default + 'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage. + // Use cooperative-sticky rebalancing strategy, which is the + // [default](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy) + // in the Java Kafka Client. There its actually + // RangeAssignor,CooperativeStickyAssignor i.e. it mixes eager and + // cooperative strategies. This is however explicitly mentioned to not + // be supported in the [librdkafka library config + // docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#partitionassignmentstrategy) + // so we just use cooperative-sticky. If there are other consumer + // members with other strategies already running, you'll need to delete + // e.g. the replicaset for them if on k8s. + // + // See + // https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/ + // for details on the advantages of this rebalancing strategy as well as + // how it works. + 'partition.assignment.strategy': 'cooperative-sticky', + rebalance_cb: true, + offset_commit_cb: true, + } + + if (debug) { + // NOTE: If the key exists with value undefined the consumer will throw which is annoying so we define it here instead + consumerConfig.debug = debug + } + + const consumer = await createKafkaConsumer(consumerConfig, { + // It is typically safest to roll back to the earliest offset if we + // find ourselves in a situation where there is no stored offset or + // the stored offset is invalid, compared to the default behavior of + // potentially jumping ahead to the latest offset. + 'auto.offset.reset': 'earliest', + }) instrumentConsumerMetrics(consumer, groupId) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts index 58a741487e561..07404945fb4f9 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v3.ts @@ -296,6 +296,7 @@ export class SessionRecordingIngesterV3 { eachBatch: async (messages) => { return await this.scheduleWork(this.handleEachBatch(messages)) }, + debug: this.config.SESSION_RECORDING_KAFKA_DEBUG, }) addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 2445567668d72..652fa08754a30 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -173,7 +173,6 @@ export interface PluginsServerConfig { PISCINA_ATOMICS_TIMEOUT: number // (advanced) corresponds to the length of time a piscina worker should block for when looking for tasks SITE_URL: string | null KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: number // (advanced) how many kafka partitions the plugin server should consume from concurrently - RECORDING_PARTITIONS_CONSUMED_CONCURRENTLY: number CONVERSION_BUFFER_ENABLED: boolean CONVERSION_BUFFER_ENABLED_TEAMS: string CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: string @@ -236,6 +235,7 @@ export interface PluginsServerConfig { SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined SESSION_RECORDING_KAFKA_BATCH_SIZE: number SESSION_RECORDING_KAFKA_QUEUE_SIZE: number + SESSION_RECORDING_KAFKA_DEBUG: string | undefined POSTHOG_SESSION_RECORDING_REDIS_HOST: string | undefined POSTHOG_SESSION_RECORDING_REDIS_PORT: number | undefined