From bedda6b68d6abb76b1bb314fd5307f9c1ed17bc3 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Mar 2024 22:10:59 -0700 Subject: [PATCH 1/5] feat: Preserve distinct ID locality on overflow rerouting --- .../ingestion-queues/batch-processing/each-batch-ingestion.ts | 3 +-- .../analytics-events-ingestion-consumer.test.ts | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 9466838cdf9bc..24a23a33f2882 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -255,7 +255,7 @@ async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[] queue.pluginsServer.kafkaProducer.produce({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, value: message.value, - key: null, // No locality guarantees in overflow + key: message.key, headers: message.headers, waitForAck: true, }) @@ -330,7 +330,6 @@ export function splitIngestionBatch( !ConfiguredLimiter.consume(eventKey, 1, message.timestamp) ) { // Local overflow detection triggering, reroute to overflow topic too - message.key = null ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc() if (LoggingLimiter.consume(eventKey, 1)) { status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index e042cf5c1ac34..aa855ac456c54 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -126,7 +126,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { value: JSON.stringify(captureEndpointEvent1), timestamp: captureEndpointEvent1['timestamp'], offset: captureEndpointEvent1['offset'], - key: null, + key: batch[0].key, waitForAck: true, }) From 4c85f5b1744cfab0b5cf16fac5942ee3ee22f3d9 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:29:50 -0700 Subject: [PATCH 2/5] Replace `isIngestionOverflowEnabled` helper with standard(ish) `Hub`-based configuration. --- plugin-server/src/config/config.ts | 1 + .../ingestion-queues/analytics-events-ingestion-consumer.ts | 3 +-- plugin-server/src/types.ts | 1 + plugin-server/src/utils/env-utils.ts | 5 ----- 4 files changed, 3 insertions(+), 7 deletions(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 8e9b50afb9528..cff0cb9bb6885 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -74,6 +74,7 @@ export function getDefaultConfig(): PluginsServerConfig { TASKS_PER_WORKER: 10, INGESTION_CONCURRENCY: 10, INGESTION_BATCH_SIZE: 500, + INGESTION_OVERFLOW_ENABLED: false, PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log, LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info, SENTRY_DSN: null, diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts index 2b9c4ce77152d..18b37f97ab920 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts @@ -4,7 +4,6 @@ import { Counter } from 'prom-client' import { buildStringMatcher } from '../../config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' import { Hub } from '../../types' -import { isIngestionOverflowEnabled } from '../../utils/env-utils' import { status } from '../../utils/status' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' import { IngestionConsumer } from './kafka-queue' @@ -47,7 +46,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({ // deployment, we require an env variable to be set to confirm this before // enabling re-production of events to the OVERFLOW topic. - const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled + const overflowMode = hub.INGESTION_OVERFLOW_ENABLED ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false) const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise => { diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index b8eeb5b296a9e..636f3c6ac4052 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -93,6 +93,7 @@ export interface PluginsServerConfig { TASKS_PER_WORKER: number // number of parallel tasks per worker thread INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch INGESTION_BATCH_SIZE: number // kafka consumer batch size + INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion) TASK_TIMEOUT: number // how many seconds until tasks are timed out DATABASE_URL: string // Postgres database URL DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database diff --git a/plugin-server/src/utils/env-utils.ts b/plugin-server/src/utils/env-utils.ts index c2c47f8b8e46d..77064a53ac8e3 100644 --- a/plugin-server/src/utils/env-utils.ts +++ b/plugin-server/src/utils/env-utils.ts @@ -42,11 +42,6 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT -export function isIngestionOverflowEnabled(): boolean { - const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED - return stringToBoolean(ingestionOverflowEnabled) -} - export function isOverflowBatchByDistinctId(): boolean { const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID return stringToBoolean(overflowBatchByDistinctId) From c1ad17ec6efa48347aedf769f736911109185d53 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:40:06 -0700 Subject: [PATCH 3/5] Add `INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY` and add `RerouteRandomly` mode. --- plugin-server/src/config/config.ts | 1 + .../analytics-events-ingestion-consumer.ts | 6 ++- .../batch-processing/each-batch-ingestion.ts | 21 ++++---- plugin-server/src/types.ts | 1 + ...nalytics-events-ingestion-consumer.test.ts | 53 ++++++++++--------- 5 files changed, 47 insertions(+), 35 deletions(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index cff0cb9bb6885..8be8565eb2abe 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -75,6 +75,7 @@ export function getDefaultConfig(): PluginsServerConfig { INGESTION_CONCURRENCY: 10, INGESTION_BATCH_SIZE: 500, INGESTION_OVERFLOW_ENABLED: false, + INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: true, PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log, LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info, SENTRY_DSN: null, diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts index 18b37f97ab920..df99b6cdc4640 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts @@ -46,7 +46,11 @@ export const startAnalyticsEventsIngestionConsumer = async ({ // deployment, we require an env variable to be set to confirm this before // enabling re-production of events to the OVERFLOW topic. - const overflowMode = hub.INGESTION_OVERFLOW_ENABLED ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled + const overflowMode = hub.INGESTION_OVERFLOW_ENABLED + ? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY + ? IngestionOverflowMode.Reroute + : IngestionOverflowMode.RerouteRandomly + : IngestionOverflowMode.Disabled const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false) const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise => { diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 24a23a33f2882..904a258d1b814 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -27,7 +27,8 @@ require('@sentry/tracing') export enum IngestionOverflowMode { Disabled, - Reroute, + Reroute, // preserves partition locality + RerouteRandomly, // discards partition locality ConsumeSplitByDistinctId, ConsumeSplitEvenly, } @@ -210,7 +211,9 @@ export async function eachBatchParallelIngestion( op: 'emitToOverflow', data: { eventCount: splitBatch.toOverflow.length }, }) - processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow)) + processingPromises.push( + emitToOverflow(queue, splitBatch.toOverflow, overflowMode === IngestionOverflowMode.RerouteRandomly) + ) overflowSpan.finish() } @@ -248,14 +251,14 @@ function computeKey(pluginEvent: PipelineEvent): string { return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}` } -async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) { +async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], useRandomPartitioner: boolean) { ingestionOverflowingMessagesTotal.inc(kafkaMessages.length) await Promise.all( kafkaMessages.map((message) => queue.pluginsServer.kafkaProducer.produce({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, value: message.value, - key: message.key, + key: useRandomPartitioner ? undefined : message.key, headers: message.headers, waitForAck: true, }) @@ -277,6 +280,9 @@ export function splitIngestionBatch( toProcess: [], toOverflow: [], } + const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes( + overflowMode + ) if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) { /** @@ -305,7 +311,7 @@ export function splitIngestionBatch( const batches: Map = new Map() for (const message of kafkaMessages) { - if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) { + if (shouldRerouteToOverflow && message.key == null) { // Overflow detected by capture, reroute to overflow topic // Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers output.toOverflow.push(message) @@ -325,10 +331,7 @@ export function splitIngestionBatch( } const eventKey = computeKey(pluginEvent) - if ( - overflowMode === IngestionOverflowMode.Reroute && - !ConfiguredLimiter.consume(eventKey, 1, message.timestamp) - ) { + if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) { // Local overflow detection triggering, reroute to overflow topic too ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc() if (LoggingLimiter.consume(eventKey, 1)) { diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 636f3c6ac4052..72595fb789b29 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -94,6 +94,7 @@ export interface PluginsServerConfig { INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch INGESTION_BATCH_SIZE: number // kafka consumer batch size INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion) + INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow TASK_TIMEOUT: number // how many seconds until tasks are timed out DATABASE_URL: string // Postgres database URL DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index aa855ac456c54..e95cb2a4e51c6 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -107,32 +107,35 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { expect(runEventPipeline).not.toHaveBeenCalled() }) - it('reroutes excess events to OVERFLOW topic', async () => { - const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) - - const tokenBlockList = buildStringMatcher('another_token,more_token', false) - await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) - - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], - 1, - now - ) - expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ - topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], - key: batch[0].key, - waitForAck: true, - }) + it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])( + 'reroutes excess events to OVERFLOW topic (mode=%p)', + async (overflowMode) => { + const now = Date.now() + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) + const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) + + const tokenBlockList = buildStringMatcher('another_token,more_token', false) + await eachBatchParallelIngestion(tokenBlockList, batch, queue, overflowMode) + + expect(consume).toHaveBeenCalledWith( + captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], + 1, + now + ) + expect(captureIngestionWarning).not.toHaveBeenCalled() + expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ + topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, + value: JSON.stringify(captureEndpointEvent1), + timestamp: captureEndpointEvent1['timestamp'], + offset: captureEndpointEvent1['offset'], + key: overflowMode === IngestionOverflowMode.Reroute ? batch[0].key : undefined, + waitForAck: true, + }) - // Event is not processed here - expect(runEventPipeline).not.toHaveBeenCalled() - }) + // Event is not processed here + expect(runEventPipeline).not.toHaveBeenCalled() + } + ) it('does not reroute if not over capacity limit', async () => { const now = Date.now() From b196d21636801065b63d770e21433db6fbf34ffa Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:55:20 -0700 Subject: [PATCH 4/5] Tidy up comment that referenced old configruation function name. --- .../ingestion-queues/analytics-events-ingestion-consumer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts index df99b6cdc4640..e452140931c7c 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts @@ -23,7 +23,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({ Consumes analytics events from the Kafka topic `events_plugin_ingestion` and processes them for ingestion into ClickHouse. - Before processing, if isIngestionOverflowEnabled and an event has + Before processing, if overflow rerouting is enabled and an event has overflowed the capacity for its (team_id, distinct_id) pair, it will not be processed here but instead re-produced into the `events_plugin_ingestion_overflow` topic for later processing. From dc6dd4970fff8762507208bd95804bc007195cfd Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 27 Mar 2024 14:36:37 -0700 Subject: [PATCH 5/5] Default `INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY` to false for reverse compatibility and no surprises during deploy --- plugin-server/src/config/config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 8be8565eb2abe..24880db690086 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -75,7 +75,7 @@ export function getDefaultConfig(): PluginsServerConfig { INGESTION_CONCURRENCY: 10, INGESTION_BATCH_SIZE: 500, INGESTION_OVERFLOW_ENABLED: false, - INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: true, + INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false, PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log, LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info, SENTRY_DSN: null,