From 4457dd53965c5d4b96c6d9f7d292a8037780ebb4 Mon Sep 17 00:00:00 2001 From: ted kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 2 Apr 2024 09:39:45 -0700 Subject: [PATCH] revert: "feat(plugin-server): Support preserving distinct ID locality on overflow rerouting (#20945)" (#21279) This reverts commit 85ef23745934cf91cdb3e5ea18d366eba2a5d764. --- plugin-server/src/config/config.ts | 2 - .../analytics-events-ingestion-consumer.ts | 9 ++-- .../batch-processing/each-batch-ingestion.ts | 22 ++++---- plugin-server/src/types.ts | 2 - plugin-server/src/utils/env-utils.ts | 5 ++ ...nalytics-events-ingestion-consumer.test.ts | 53 +++++++++---------- 6 files changed, 43 insertions(+), 50 deletions(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 1efc0ae9aa3e3..a6ee4e91a9b15 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -74,8 +74,6 @@ export function getDefaultConfig(): PluginsServerConfig { TASKS_PER_WORKER: 10, INGESTION_CONCURRENCY: 10, INGESTION_BATCH_SIZE: 500, - INGESTION_OVERFLOW_ENABLED: false, - INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false, PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log, LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info, SENTRY_DSN: null, diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts index e452140931c7c..2b9c4ce77152d 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts @@ -4,6 +4,7 @@ import { Counter } from 'prom-client' import { buildStringMatcher } from '../../config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' import { Hub } from '../../types' +import { isIngestionOverflowEnabled } from '../../utils/env-utils' import { status } from '../../utils/status' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' import { IngestionConsumer } from './kafka-queue' @@ -23,7 +24,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({ Consumes analytics events from the Kafka topic `events_plugin_ingestion` and processes them for ingestion into ClickHouse. - Before processing, if overflow rerouting is enabled and an event has + Before processing, if isIngestionOverflowEnabled and an event has overflowed the capacity for its (team_id, distinct_id) pair, it will not be processed here but instead re-produced into the `events_plugin_ingestion_overflow` topic for later processing. @@ -46,11 +47,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({ // deployment, we require an env variable to be set to confirm this before // enabling re-production of events to the OVERFLOW topic. - const overflowMode = hub.INGESTION_OVERFLOW_ENABLED - ? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY - ? IngestionOverflowMode.Reroute - : IngestionOverflowMode.RerouteRandomly - : IngestionOverflowMode.Disabled + const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false) const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise => { diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index ec020cfb6921f..588c2c92beb86 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -28,8 +28,7 @@ require('@sentry/tracing') export enum IngestionOverflowMode { Disabled, - Reroute, // preserves partition locality - RerouteRandomly, // discards partition locality + Reroute, ConsumeSplitByDistinctId, ConsumeSplitEvenly, } @@ -218,9 +217,7 @@ export async function eachBatchParallelIngestion( op: 'emitToOverflow', data: { eventCount: splitBatch.toOverflow.length }, }) - processingPromises.push( - emitToOverflow(queue, splitBatch.toOverflow, overflowMode === IngestionOverflowMode.RerouteRandomly) - ) + processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow)) overflowSpan.finish() } @@ -260,14 +257,14 @@ function computeKey(pluginEvent: PipelineEvent): string { return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}` } -async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], useRandomPartitioner: boolean) { +async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) { ingestionOverflowingMessagesTotal.inc(kafkaMessages.length) await Promise.all( kafkaMessages.map((message) => queue.pluginsServer.kafkaProducer.produce({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, value: message.value, - key: useRandomPartitioner ? undefined : message.key, + key: null, // No locality guarantees in overflow headers: message.headers, waitForAck: true, }) @@ -289,9 +286,6 @@ export function splitIngestionBatch( toProcess: [], toOverflow: [], } - const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes( - overflowMode - ) if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) { /** @@ -320,7 +314,7 @@ export function splitIngestionBatch( const batches: Map = new Map() for (const message of kafkaMessages) { - if (shouldRerouteToOverflow && message.key == null) { + if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) { // Overflow detected by capture, reroute to overflow topic // Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers output.toOverflow.push(message) @@ -340,8 +334,12 @@ export function splitIngestionBatch( } const eventKey = computeKey(pluginEvent) - if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) { + if ( + overflowMode === IngestionOverflowMode.Reroute && + !ConfiguredLimiter.consume(eventKey, 1, message.timestamp) + ) { // Local overflow detection triggering, reroute to overflow topic too + message.key = null ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc() if (LoggingLimiter.consume(eventKey, 1)) { status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index df2efa8daa5f7..98b656e37a18d 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -94,8 +94,6 @@ export interface PluginsServerConfig { TASKS_PER_WORKER: number // number of parallel tasks per worker thread INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch INGESTION_BATCH_SIZE: number // kafka consumer batch size - INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion) - INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow TASK_TIMEOUT: number // how many seconds until tasks are timed out DATABASE_URL: string // Postgres database URL DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database diff --git a/plugin-server/src/utils/env-utils.ts b/plugin-server/src/utils/env-utils.ts index 77064a53ac8e3..c2c47f8b8e46d 100644 --- a/plugin-server/src/utils/env-utils.ts +++ b/plugin-server/src/utils/env-utils.ts @@ -42,6 +42,11 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT +export function isIngestionOverflowEnabled(): boolean { + const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED + return stringToBoolean(ingestionOverflowEnabled) +} + export function isOverflowBatchByDistinctId(): boolean { const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID return stringToBoolean(overflowBatchByDistinctId) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index e95cb2a4e51c6..e042cf5c1ac34 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -107,35 +107,32 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { expect(runEventPipeline).not.toHaveBeenCalled() }) - it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])( - 'reroutes excess events to OVERFLOW topic (mode=%p)', - async (overflowMode) => { - const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) - - const tokenBlockList = buildStringMatcher('another_token,more_token', false) - await eachBatchParallelIngestion(tokenBlockList, batch, queue, overflowMode) - - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], - 1, - now - ) - expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ - topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], - key: overflowMode === IngestionOverflowMode.Reroute ? batch[0].key : undefined, - waitForAck: true, - }) + it('reroutes excess events to OVERFLOW topic', async () => { + const now = Date.now() + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) + const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) - // Event is not processed here - expect(runEventPipeline).not.toHaveBeenCalled() - } - ) + const tokenBlockList = buildStringMatcher('another_token,more_token', false) + await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) + + expect(consume).toHaveBeenCalledWith( + captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], + 1, + now + ) + expect(captureIngestionWarning).not.toHaveBeenCalled() + expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ + topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, + value: JSON.stringify(captureEndpointEvent1), + timestamp: captureEndpointEvent1['timestamp'], + offset: captureEndpointEvent1['offset'], + key: null, + waitForAck: true, + }) + + // Event is not processed here + expect(runEventPipeline).not.toHaveBeenCalled() + }) it('does not reroute if not over capacity limit', async () => { const now = Date.now()