Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Debugging options for V3 ingester #20569

Merged
merged 4 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ services:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: 'true'

kafka_ui:
image: provectuslabs/kafka-ui:latest
restart: on-failure
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
DYNAMIC_CONFIG_ENABLED: 'true'

object_storage:
image: minio/minio:RELEASE.2022-06-25T15-50-16Z
restart: on-failure
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ services:
ports:
- '9092:9092'

kafka_ui:
extends:
file: docker-compose.base.yml
service: kafka_ui
ports:
- '9093:8080'

object_storage:
extends:
file: docker-compose.base.yml
Expand Down
73 changes: 36 additions & 37 deletions plugin-server/README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ export function getDefaultConfig(): PluginsServerConfig {
PISCINA_ATOMICS_TIMEOUT: 5000,
SITE_URL: null,
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1,
RECORDING_PARTITIONS_CONSUMED_CONCURRENTLY: 5,
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '',
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
CONVERSION_BUFFER_ENABLED: false,
Expand Down Expand Up @@ -161,6 +160,7 @@ export function getDefaultConfig(): PluginsServerConfig {
POSTHOG_SESSION_RECORDING_REDIS_PORT: undefined,
SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED: true,
SESSION_RECORDING_DEBUG_PARTITION: undefined,
SESSION_RECORDING_KAFKA_DEBUG: undefined,
}
}

Expand Down
121 changes: 64 additions & 57 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka'
import { ConsumerGlobalConfig, GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka'
import { exponentialBuckets, Gauge, Histogram } from 'prom-client'

import { retryIfRetriable } from '../utils/retries'
Expand Down Expand Up @@ -69,6 +69,7 @@ export const startBatchConsumer = async ({
topicCreationTimeoutMs,
eachBatch,
queuedMinMessages = 100000,
debug,
}: {
connectionConfig: GlobalConfig
groupId: string
Expand All @@ -85,6 +86,7 @@ export const startBatchConsumer = async ({
topicCreationTimeoutMs: number
eachBatch: (messages: Message[]) => Promise<void>
queuedMinMessages?: number
debug?: string
}): Promise<BatchConsumer> => {
// Starts consuming from `topic` in batches of `fetchBatchSize` messages,
// with consumer group id `groupId`. We use `connectionConfig` to connect
Expand All @@ -106,62 +108,67 @@ export const startBatchConsumer = async ({
//
// We also instrument the consumer with Prometheus metrics, which are
// exposed on the /_metrics endpoint by the global prom-client registry.
const consumer = await createKafkaConsumer(
{
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
'max.poll.interval.ms': maxPollIntervalMs,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
* max.partition.fetch.bytes
* The maximum amount of data per-partition the server will return.
* Records are fetched in batches by the consumer.
* If the first record batch in the first non-empty partition of the fetch is larger than this limit,
* the batch will still be returned to ensure that the consumer can make progress.
* The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config)
* or max.message.bytes (topic config).
* https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#:~:text=max.partition.fetch.bytes,the%20consumer%20can%20make%20progress.
*/
'max.partition.fetch.bytes': consumerMaxBytesPerPartition,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L122
// Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
'fetch.message.max.bytes': consumerMaxBytes,
'fetch.wait.max.ms': consumerMaxWaitMs,
'fetch.error.backoff.ms': consumerErrorBackoffMs,
'enable.partition.eof': true,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L118
// Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue
'queued.min.messages': queuedMinMessages, // 100000 is the default
'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage.
// Use cooperative-sticky rebalancing strategy, which is the
// [default](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy)
// in the Java Kafka Client. There its actually
// RangeAssignor,CooperativeStickyAssignor i.e. it mixes eager and
// cooperative strategies. This is however explicitly mentioned to not
// be supported in the [librdkafka library config
// docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#partitionassignmentstrategy)
// so we just use cooperative-sticky. If there are other consumer
// members with other strategies already running, you'll need to delete
// e.g. the replicaset for them if on k8s.
//
// See
// https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/
// for details on the advantages of this rebalancing strategy as well as
// how it works.
'partition.assignment.strategy': 'cooperative-sticky',
rebalance_cb: true,
offset_commit_cb: true,
},
{
// It is typically safest to roll back to the earliest offset if we
// find ourselves in a situation where there is no stored offset or
// the stored offset is invalid, compared to the default behavior of
// potentially jumping ahead to the latest offset.
'auto.offset.reset': 'earliest',
}
)

const consumerConfig: ConsumerGlobalConfig = {
...connectionConfig,
'group.id': groupId,
'session.timeout.ms': sessionTimeout,
'max.poll.interval.ms': maxPollIntervalMs,
'enable.auto.commit': autoCommit,
'enable.auto.offset.store': false,
/**
* max.partition.fetch.bytes
* The maximum amount of data per-partition the server will return.
* Records are fetched in batches by the consumer.
* If the first record batch in the first non-empty partition of the fetch is larger than this limit,
* the batch will still be returned to ensure that the consumer can make progress.
* The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config)
* or max.message.bytes (topic config).
* https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#:~:text=max.partition.fetch.bytes,the%20consumer%20can%20make%20progress.
*/
'max.partition.fetch.bytes': consumerMaxBytesPerPartition,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L122
// Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
'fetch.message.max.bytes': consumerMaxBytes,
'fetch.wait.max.ms': consumerMaxWaitMs,
'fetch.error.backoff.ms': consumerErrorBackoffMs,
'enable.partition.eof': true,
// https://github.com/confluentinc/librdkafka/blob/e75de5be191b6b8e9602efc969f4af64071550de/CONFIGURATION.md?plain=1#L118
// Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue
'queued.min.messages': queuedMinMessages, // 100000 is the default
'queued.max.messages.kbytes': 102400, // 1048576 is the default, we go smaller to reduce mem usage.
// Use cooperative-sticky rebalancing strategy, which is the
// [default](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy)
// in the Java Kafka Client. There its actually
// RangeAssignor,CooperativeStickyAssignor i.e. it mixes eager and
// cooperative strategies. This is however explicitly mentioned to not
// be supported in the [librdkafka library config
// docs](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#partitionassignmentstrategy)
// so we just use cooperative-sticky. If there are other consumer
// members with other strategies already running, you'll need to delete
// e.g. the replicaset for them if on k8s.
//
// See
// https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/
// for details on the advantages of this rebalancing strategy as well as
// how it works.
'partition.assignment.strategy': 'cooperative-sticky',
rebalance_cb: true,
offset_commit_cb: true,
}

if (debug) {
// NOTE: If the key exists with value undefined the consumer will throw which is annoying so we define it here instead
consumerConfig.debug = debug
}

const consumer = await createKafkaConsumer(consumerConfig, {
// It is typically safest to roll back to the earliest offset if we
// find ourselves in a situation where there is no stored offset or
// the stored offset is invalid, compared to the default behavior of
// potentially jumping ahead to the latest offset.
'auto.offset.reset': 'earliest',
})

instrumentConsumerMetrics(consumer, groupId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export class SessionRecordingIngesterV3 {
eachBatch: async (messages) => {
return await this.scheduleWork(this.handleEachBatch(messages))
},
debug: this.config.SESSION_RECORDING_KAFKA_DEBUG,
})

addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ export interface PluginsServerConfig {
PISCINA_ATOMICS_TIMEOUT: number // (advanced) corresponds to the length of time a piscina worker should block for when looking for tasks
SITE_URL: string | null
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: number // (advanced) how many kafka partitions the plugin server should consume from concurrently
RECORDING_PARTITIONS_CONSUMED_CONCURRENTLY: number
CONVERSION_BUFFER_ENABLED: boolean
CONVERSION_BUFFER_ENABLED_TEAMS: string
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: string
Expand Down Expand Up @@ -236,6 +235,7 @@ export interface PluginsServerConfig {
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: KafkaSecurityProtocol | undefined
SESSION_RECORDING_KAFKA_BATCH_SIZE: number
SESSION_RECORDING_KAFKA_QUEUE_SIZE: number
SESSION_RECORDING_KAFKA_DEBUG: string | undefined

POSTHOG_SESSION_RECORDING_REDIS_HOST: string | undefined
POSTHOG_SESSION_RECORDING_REDIS_PORT: number | undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
WHERE and(equals(events.team_id, 2), greaterOrEquals(toTimeZone(events.timestamp, 'UTC'), minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-12 00:00:00', 6, 'UTC'))), toIntervalDay(1))), less(toTimeZone(events.timestamp, 'UTC'), plus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-19 23:59:59', 6, 'UTC'))), toIntervalDay(1))), ifNull(in(person_id,
(SELECT cohortpeople.person_id AS person_id
FROM cohortpeople
WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 7))
WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 6))
GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version
HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0), equals(events.event, '$pageview'))
GROUP BY person_id)
Expand Down
Loading
Loading