From ce6db7575a5f0a330bc1b35e0c26a9614b032389 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 10 Oct 2023 17:23:20 +0200 Subject: [PATCH] chore(plugins-server): remove eachBatchLegacyIngestion --- plugin-server/src/config/config.ts | 1 - .../analytics-events-ingestion-consumer.ts | 70 +--- ...cs-events-ingestion-historical-consumer.ts | 55 +-- ...tics-events-ingestion-overflow-consumer.ts | 55 +-- .../each-batch-ingestion-kafkajs.ts | 379 ------------------ plugin-server/src/main/pluginsServer.ts | 6 +- plugin-server/src/types.ts | 1 - ...nalytics-events-ingestion-consumer.test.ts | 138 ------- ...events-ingestion-overflow-consumer.test.ts | 94 ----- .../main/ingestion-queues/each-batch.test.ts | 135 ------- 10 files changed, 6 insertions(+), 928 deletions(-) delete mode 100644 plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 13ce5ccabe60f..4caeabb38dde0 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -45,7 +45,6 @@ export function getDefaultConfig(): PluginsServerConfig { KAFKA_SASL_USER: undefined, KAFKA_SASL_PASSWORD: undefined, KAFKA_CLIENT_RACK: undefined, - KAFKA_CONSUMPTION_USE_RDKAFKA: true, // Transitional setting, ignored for consumers that only support one library KAFKA_CONSUMPTION_MAX_BYTES: 10_485_760, // Default value for kafkajs KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: 1_048_576, // Default value for kafkajs, must be bigger than message size KAFKA_CONSUMPTION_MAX_WAIT_MS: 50, // Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages. diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts index 3d6a1dce57221..fc951ad1dbaca 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts @@ -1,6 +1,4 @@ -import { EachBatchPayload } from 'kafkajs' import { Message } from 'node-rdkafka' -import * as schedule from 'node-schedule' import { Counter } from 'prom-client' import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' @@ -9,9 +7,7 @@ import { isIngestionOverflowEnabled } from '../../utils/env-utils' import { status } from '../../utils/status' import Piscina from '../../worker/piscina' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' -import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs' -import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue' -import { makeHealthCheck } from './on-event-handler-consumer' +import { IngestionConsumer } from './kafka-queue' export const ingestionPartitionKeyOverflowed = new Counter({ name: 'ingestion_partition_key_overflowed', @@ -26,9 +22,6 @@ export const startAnalyticsEventsIngestionConsumer = async ({ hub: Hub piscina: Piscina }) => { - if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) { - return startLegacyAnalyticsEventsIngestionConsumer({ hub, piscina }) - } /* Consumes analytics events from the Kafka topic `events_plugin_ingestion` and processes them for ingestion into ClickHouse. @@ -73,64 +66,3 @@ export const startAnalyticsEventsIngestionConsumer = async ({ return { queue, isHealthy } } - -const startLegacyAnalyticsEventsIngestionConsumer = async ({ - hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. - piscina, -}: { - hub: Hub - piscina: Piscina -}) => { - /* - Consumes analytics events from the Kafka topic `events_plugin_ingestion` - and processes them for ingestion into ClickHouse. - - Before processing, if isIngestionOverflowEnabled and an event has - overflowed the capacity for its (team_id, distinct_id) pair, it will not - be processed here but instead re-produced into the - `events_plugin_ingestion_overflow` topic for later processing. - - At the moment this is just a wrapper around `IngestionConsumer`. We may - want to further remove that abstraction in the future. - */ - status.info('🔁', 'Starting analytics events consumer with kafkajs') - - // NOTE: we are explicitly not maintaining backwards compatibility with - // previous functionality regards to consumer group id usage prior to the - // introduction of this file. Previouslty, when ingestion and export - // workloads ran on the same process they would share the same consumer - // group id. In these cases, updating to this version will result in the - // re-exporting of events still in Kafka `clickhouse_events_json` topic. - - // We need a way to determine if ingestionOverflow is enabled when using - // separate deployments for ingestion consumers in order to scale them - // independently. Since ingestionOverflow may be enabled in a separate - // deployment, we require an env variable to be set to confirm this before - // enabling re-production of events to the OVERFLOW topic. - - const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled - const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise => { - await eachBatchLegacyIngestion(payload, queue, overflowMode) - } - - const queue = new KafkaJSIngestionConsumer( - hub, - piscina, - KAFKA_EVENTS_PLUGIN_INGESTION, - `${KAFKA_PREFIX}clickhouse-ingestion`, - batchHandler - ) - - await queue.start() - - schedule.scheduleJob('0 * * * * *', async () => { - await queue.emitConsumerGroupMetrics() - }) - - // Subscribe to the heatbeat event to track when the consumer has last - // successfully consumed a message. This is used to determine if the - // consumer is healthy. - const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout) - - return { queue, isHealthy } -} diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts index 668421ff42d58..a0bda33e2458f 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts @@ -1,15 +1,11 @@ -import { EachBatchPayload } from 'kafkajs' import { Message } from 'node-rdkafka' -import * as schedule from 'node-schedule' import { KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' import { Hub } from '../../types' import { status } from '../../utils/status' import Piscina from '../../worker/piscina' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' -import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs' -import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue' -import { makeHealthCheck } from './on-event-handler-consumer' +import { IngestionConsumer } from './kafka-queue' export const startAnalyticsEventsIngestionHistoricalConsumer = async ({ hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. @@ -18,9 +14,6 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({ hub: Hub piscina: Piscina }) => { - if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) { - return startLegacyAnalyticsEventsIngestionHistoricalConsumer({ hub, piscina }) - } /* Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical` and processes them for ingestion into ClickHouse. @@ -50,49 +43,3 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({ return { queue, isHealthy } } - -export const startLegacyAnalyticsEventsIngestionHistoricalConsumer = async ({ - hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. - piscina, -}: { - hub: Hub - piscina: Piscina -}) => { - /* - Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical` - and processes them for ingestion into ClickHouse. - - This is the historical events or "slow-lane" processing queue as it contains only - events that have timestamps in the past. - */ - status.info('🔁', 'Starting analytics events historical consumer with kafkajs') - - /* - We don't want to move events to overflow from here, it's fine for the processing to - take longer, but we want the locality constraints to be respected like normal ingestion. - */ - const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise => { - await eachBatchLegacyIngestion(payload, queue, IngestionOverflowMode.Disabled) - } - - const queue = new KafkaJSIngestionConsumer( - hub, - piscina, - KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, - `${KAFKA_PREFIX}clickhouse-ingestion-historical`, - batchHandler - ) - - await queue.start() - - schedule.scheduleJob('0 * * * * *', async () => { - await queue.emitConsumerGroupMetrics() - }) - - // Subscribe to the heatbeat event to track when the consumer has last - // successfully consumed a message. This is used to determine if the - // consumer is healthy. - const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout) - - return { queue, isHealthy } -} diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts index aa2ff70beaf21..fbbdf011a52f3 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts @@ -1,14 +1,11 @@ -import { EachBatchPayload } from 'kafkajs' import { Message } from 'node-rdkafka' -import * as schedule from 'node-schedule' import { KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' import { Hub } from '../../types' import { status } from '../../utils/status' import Piscina from '../../worker/piscina' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' -import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs' -import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue' +import { IngestionConsumer } from './kafka-queue' export const startAnalyticsEventsIngestionOverflowConsumer = async ({ hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. @@ -17,9 +14,6 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({ hub: Hub piscina: Piscina }) => { - if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) { - return startLegacyAnalyticsEventsIngestionOverflowConsumer({ hub, piscina }) - } /* Consumes analytics events from the Kafka topic `events_plugin_ingestion_overflow` and processes them for ingestion into ClickHouse. @@ -55,50 +49,3 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({ return queue } - -export const startLegacyAnalyticsEventsIngestionOverflowConsumer = async ({ - hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. - piscina, -}: { - hub: Hub - piscina: Piscina -}) => { - /* - Consumes analytics events from the Kafka topic `events_plugin_ingestion_overflow` - and processes them for ingestion into ClickHouse. - - This is the overflow or "slow-lane" processing queue as it contains only events that - have exceed capacity. - - At the moment this is just a wrapper around `IngestionConsumer`. We may - want to further remove that abstraction in the future. - */ - status.info('🔁', 'Starting analytics events overflow consumer with kafkajs') - - // NOTE: we are explicitly not maintaining backwards compatibility with - // previous functionality regards to consumer group id usage prior to the - // introduction of this file. Previouslty, when ingestion and export - // workloads ran on the same process they would share the same consumer - // group id. In these cases, updating to this version will result in the - // re-exporting of events still in Kafka `clickhouse_events_json` topic. - - const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise => { - await eachBatchLegacyIngestion(payload, queue, IngestionOverflowMode.Consume) - } - - const queue = new KafkaJSIngestionConsumer( - hub, - piscina, - KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - `${KAFKA_PREFIX}clickhouse-ingestion-overflow`, - batchHandler - ) - - await queue.start() - - schedule.scheduleJob('0 * * * * *', async () => { - await queue.emitConsumerGroupMetrics() - }) - - return queue -} diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts deleted file mode 100644 index 6be2d9e988346..0000000000000 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts +++ /dev/null @@ -1,379 +0,0 @@ -import * as Sentry from '@sentry/node' -import { EachBatchPayload, KafkaMessage } from 'kafkajs' - -import { KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../config/kafka-topics' -import { Hub, PipelineEvent, WorkerMethods } from '../../../types' -import { normalizeEvent } from '../../../utils/event' -import { status } from '../../../utils/status' -import { ConfiguredLimiter, LoggingLimiter, WarningLimiter } from '../../../utils/token-bucket' -import { EventPipelineResult } from '../../../worker/ingestion/event-pipeline/runner' -import { captureIngestionWarning } from '../../../worker/ingestion/utils' -import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer' -import { KafkaJSIngestionConsumer } from '../kafka-queue' -import { latestOffsetTimestampGauge } from '../metrics' -import { IngestionOverflowMode } from './each-batch-ingestion' -import { - ingestionOverflowingMessagesTotal, - ingestionParallelism, - ingestionParallelismPotential, - kafkaBatchOffsetCommitted, - kafkaBatchStart, -} from './metrics' - -// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals -require('@sentry/tracing') - -type IngestionSplitBatch = { - toProcess: { message: KafkaMessage; pluginEvent: PipelineEvent }[][] - toOverflow: KafkaMessage[] -} - -// Subset of EventPipelineResult to make sure we don't access what's exported for the tests -type IngestResult = { - // Promises that the batch handler should await on before committing offsets, - // contains the Kafka producer ACKs, to avoid blocking after every message. - promises?: Array> -} - -/** - * Legacy consumer loop that uses the kafkajs consumer, kept as a fallback while we iterate on - * eachBatchParallelIngestion and rdkafka. - * TODO: delete as soon as rdkafka is tuned and ready for prime time. - */ -export async function eachBatchLegacyIngestion( - { batch, resolveOffset, heartbeat, commitOffsetsIfNecessary, isRunning, isStale }: EachBatchPayload, - queue: KafkaJSIngestionConsumer, - overflowMode: IngestionOverflowMode -): Promise { - async function eachMessage(event: PipelineEvent, queue: KafkaJSIngestionConsumer): Promise { - return ingestEvent(queue.pluginsServer, queue.workerMethods, event) - } - - const batchStartTimer = new Date() - const metricKey = 'ingestion' - const loggingKey = `each_batch_legacy_ingestion` - - const transaction = Sentry.startTransaction({ name: `eachBatchLegacyIngestion` }, { topic: queue.topic }) - - try { - /** - * Micro-batches should be executed from biggest to smallest to enable the best concurrency. - * We're sorting with biggest last and pop()ing. Ideally, we'd use a priority queue by length - * and a separate array for single messages, but let's look at profiles before optimizing. - */ - const prepareSpan = transaction.startChild({ op: 'prepareBatch' }) - const splitBatch = splitKafkaJSIngestionBatch(batch.messages, overflowMode) - splitBatch.toProcess.sort((a, b) => a.length - b.length) - - queue.pluginsServer.statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { - key: metricKey, - }) - queue.pluginsServer.statsd?.histogram('ingest_event_batching.batch_count', splitBatch.toProcess.length, { - key: metricKey, - }) - prepareSpan.finish() - - const processingPromises: Array> = [] - - async function processMicroBatches( - batches: { message: KafkaMessage; pluginEvent: PipelineEvent }[][] - ): Promise { - let currentBatch - let processedBatches = 0 - while ((currentBatch = batches.pop()) !== undefined) { - const batchSpan = transaction.startChild({ - op: 'messageBatch', - data: { batchLength: currentBatch.length }, - }) - - // Process overflow ingestion warnings - if (overflowMode == IngestionOverflowMode.Consume && currentBatch.length > 0) { - const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent) - const distinct_id = currentBatch[0].pluginEvent.distinct_id - if (team && WarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) { - processingPromises.push( - captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', { - overflowDistinctId: distinct_id, - }) - ) - } - } - - // Process every message sequentially, stash promises to await on later - for (const { message, pluginEvent } of currentBatch) { - try { - const result = await eachMessage(pluginEvent, queue) - if (result.promises) { - processingPromises.push(...result.promises) - } - } catch (error) { - status.error('🔥', `Error processing message`, { - stack: error.stack, - error: error, - }) - - // If there error is a non-retriable error, push - // to the dlq and commit the offset. Else raise the - // error. - // - // NOTE: there is behavior to push to a DLQ at the - // moment within EventPipelineRunner. This doesn't work - // so well with e.g. messages that when sent to the DLQ - // is it's self too large. Here we explicitly do _not_ - // add any additional metadata to the message. We might - // want to add some metadata to the message e.g. in the - // header or reference e.g. the sentry event id. - // - // TODO: property abstract out this `isRetriable` error - // logic. This is currently relying on the fact that - // node-rdkafka adheres to the `isRetriable` interface. - if (error?.isRetriable === false) { - const sentryEventId = Sentry.captureException(error) - try { - await queue.pluginsServer.kafkaProducer.queueMessage({ - topic: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, - messages: [ - { - ...message, - headers: { - ...message.headers, - 'sentry-event-id': sentryEventId, - 'event-id': pluginEvent.uuid, - }, - }, - ], - }) - } catch (error) { - // If we can't send to the DLQ and it's not - // retriable, just continue. We'll commit the - // offset and move on. - if (error?.isRetriable === false) { - status.error('🔥', `Error pushing to DLQ`, { - stack: error.stack, - error: error, - }) - continue - } - - // If we can't send to the DLQ and it is - // retriable, raise the error. - throw error - } - } else { - throw error - } - } - } - - // Emit the Kafka heartbeat if needed then close the micro-batch - await heartbeat() - processedBatches++ - batchSpan.finish() - } - status.debug('🧩', `Stopping worker after processing ${processedBatches} micro-batches`) - return Promise.resolve() - } - - /** - * Process micro-batches in parallel tasks on the main event loop. This will not allow to use more than - * one core, but will make better use of that core by waiting less on IO. Parallelism is currently - * limited by the distinct_id constraint we have on the input topic: one consumer batch does not hold - * a lot of different distinct_ids. - * - * Overflow rerouting (mostly waiting on kafka ACKs) is done in an additional task if needed. - */ - const parallelism = Math.min(splitBatch.toProcess.length, queue.pluginsServer.INGESTION_CONCURRENCY) - ingestionParallelism - .labels({ - overflow_mode: IngestionOverflowMode[overflowMode], - }) - .observe(parallelism) - ingestionParallelismPotential - .labels({ - overflow_mode: IngestionOverflowMode[overflowMode], - }) - .observe(splitBatch.toProcess.length) - kafkaBatchStart.inc() // just before processing any events - const tasks = [...Array(parallelism)].map(() => processMicroBatches(splitBatch.toProcess)) - await Promise.all(tasks) - - // Process overflow after the main batch is successful to reduce the risk of duplicates - // generated by batch retries. Delay ACKs into processingPromises too. - if (splitBatch.toOverflow.length > 0) { - const overflowSpan = transaction.startChild({ - op: 'emitToOverflow', - data: { eventCount: splitBatch.toOverflow.length }, - }) - processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow)) - overflowSpan.finish() - } - - // Await on successful Kafka writes before closing the batch. At this point, messages - // have been successfully queued in the producer, only broker / network failures could - // impact the success. Delaying ACKs allows the producer to write in big batches for - // better throughput and lower broker load. - const awaitSpan = transaction.startChild({ op: 'awaitACKs', data: { promiseCount: processingPromises.length } }) - await Promise.all(processingPromises) - awaitSpan.finish() - - // Commit offsets once at the end of the batch. We run the risk of duplicates - // if the pod is prematurely killed in the middle of a batch, but this allows - // us to process events out of order within a batch, for higher throughput. - const commitSpan = transaction.startChild({ op: 'offsetCommit' }) - const lastMessage = batch.messages.at(-1) - if (lastMessage) { - resolveOffset(lastMessage.offset) - await commitOffsetsIfNecessary() - latestOffsetTimestampGauge - .labels({ partition: batch.partition, topic: batch.topic, groupId: metricKey }) - .set(Number.parseInt(lastMessage.timestamp)) - } - commitSpan.finish() - kafkaBatchOffsetCommitted.inc() // and we successfully committed the offsets - - status.debug( - '🧩', - `Kafka batch of ${batch.messages.length} events completed in ${ - new Date().valueOf() - batchStartTimer.valueOf() - }ms (${loggingKey})` - ) - - if (!isRunning() || isStale()) { - status.info('🚪', `Ending the consumer loop`, { - isRunning: isRunning(), - isStale: isStale(), - msFromBatchStart: new Date().valueOf() - batchStartTimer.valueOf(), - }) - await heartbeat() - return - } - } finally { - queue.pluginsServer.statsd?.timing(`kafka_queue.${loggingKey}`, batchStartTimer) - transaction.finish() - } -} - -async function ingestEvent( - server: Hub, - workerMethods: WorkerMethods, - event: PipelineEvent, - checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again -): Promise { - const eachEventStartTimer = new Date() - - checkAndPause?.() - - server.statsd?.increment('kafka_queue_ingest_event_hit', { - pipeline: 'runEventPipeline', - }) - const result = await workerMethods.runEventPipeline(event) - - server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer) - countAndLogEvents() - - return result -} - -let messageCounter = 0 -let messageLogDate = 0 - -function computeKey(pluginEvent: PipelineEvent): string { - return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}` -} - -async function emitToOverflow(queue: KafkaJSIngestionConsumer, kafkaMessages: KafkaMessage[]) { - ingestionOverflowingMessagesTotal.inc(kafkaMessages.length) - await Promise.all( - kafkaMessages.map((message) => - queue.pluginsServer.kafkaProducer.queueMessage( - { - topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - messages: [message], - }, - true - ) - ) - ) -} - -export function splitKafkaJSIngestionBatch( - kafkaMessages: KafkaMessage[], - overflowMode: IngestionOverflowMode -): IngestionSplitBatch { - /** - * Prepares micro-batches for use by eachBatchParallelIngestion: - * - events are parsed and grouped by token & distinct_id for sequential processing - * - if overflowMode=Reroute, messages to send to overflow are in the second array - */ - const output: IngestionSplitBatch = { - toProcess: [], - toOverflow: [], - } - - if (overflowMode === IngestionOverflowMode.Consume) { - /** - * Grouping by distinct_id is inefficient here, because only a few ones are overflowing - * at a time. When messages are sent to overflow, we already give away the ordering guarantee, - * so we just return batches of one to increase concurrency. - * TODO: add a PipelineEvent[] field to IngestionSplitBatch for batches of 1 - */ - output.toProcess = kafkaMessages.map((m) => new Array({ message: m, pluginEvent: formPipelineEvent(m) })) - return output - } - - const batches: Map = new Map() - for (const message of kafkaMessages) { - if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) { - // Overflow detected by capture, reroute to overflow topic - output.toOverflow.push(message) - continue - } - const pluginEvent = formPipelineEvent(message) - const eventKey = computeKey(pluginEvent) - if (overflowMode === IngestionOverflowMode.Reroute && !ConfiguredLimiter.consume(eventKey, 1)) { - // Local overflow detection triggering, reroute to overflow topic too - message.key = null - ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc() - if (LoggingLimiter.consume(eventKey, 1)) { - status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`) - } - output.toOverflow.push(message) - continue - } - const siblings = batches.get(eventKey) - if (siblings) { - siblings.push({ message, pluginEvent }) - } else { - batches.set(eventKey, [{ message, pluginEvent }]) - } - } - output.toProcess = Array.from(batches.values()) - return output -} - -function countAndLogEvents(): void { - const now = new Date().valueOf() - messageCounter++ - if (now - messageLogDate > 10000) { - status.info( - '🕒', - `Processed ${messageCounter} events${ - messageLogDate === 0 ? '' : ` in ${Math.round((now - messageLogDate) / 10) / 100}s` - }` - ) - messageCounter = 0 - messageLogDate = now - } -} - -function formPipelineEvent(message: KafkaMessage): PipelineEvent { - // TODO: inefficient to do this twice? - const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString()) - const combinedEvent = { ...JSON.parse(dataStr), ...rawEvent } - const event: PipelineEvent = normalizeEvent({ - ...combinedEvent, - site_url: combinedEvent.site_url || null, - ip: combinedEvent.ip || null, - }) - return event -} diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 20119c7f08542..3f94169ff659f 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -89,9 +89,9 @@ export async function startPluginsServer( // has enabled. // 5. publishes the resulting event to a Kafka topic on which ClickHouse is // listening. - let analyticsEventsIngestionConsumer: KafkaJSIngestionConsumer | IngestionConsumer | undefined - let analyticsEventsIngestionOverflowConsumer: KafkaJSIngestionConsumer | IngestionConsumer | undefined - let analyticsEventsIngestionHistoricalConsumer: KafkaJSIngestionConsumer | IngestionConsumer | undefined + let analyticsEventsIngestionConsumer: IngestionConsumer | undefined + let analyticsEventsIngestionOverflowConsumer: IngestionConsumer | undefined + let analyticsEventsIngestionHistoricalConsumer: IngestionConsumer | undefined let onEventHandlerConsumer: KafkaJSIngestionConsumer | undefined let stopWebhooksHandlerConsumer: () => Promise | undefined diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index ace2221ca65f5..6562f328871f1 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -128,7 +128,6 @@ export interface PluginsServerConfig { KAFKA_SASL_USER: string | undefined KAFKA_SASL_PASSWORD: string | undefined KAFKA_CLIENT_RACK: string | undefined - KAFKA_CONSUMPTION_USE_RDKAFKA: boolean KAFKA_CONSUMPTION_MAX_BYTES: number KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: number KAFKA_CONSUMPTION_MAX_WAIT_MS: number // fetch.wait.max.ms rdkafka parameter diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 51122c706b15a..b8d466a649662 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -3,7 +3,6 @@ import { eachBatchParallelIngestion, IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' -import { eachBatchLegacyIngestion } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs' import { ConfiguredLimiter } from '../../../src/utils/token-bucket' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' @@ -131,140 +130,3 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() }) }) - -describe('eachBatchLegacyIngestion with overflow reroute', () => { - let queue: any - - function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any { - return { - batch: { - partition: 0, - topic: KAFKA_EVENTS_PLUGIN_INGESTION, - messages: events.map((event) => ({ - value: JSON.stringify(event), - timestamp, - offset: event.offset, - key: event.team_id + ':' + event.distinct_id, - })), - }, - resolveOffset: jest.fn(), - heartbeat: jest.fn(), - commitOffsetsIfNecessary: jest.fn(), - isRunning: jest.fn(() => true), - isStale: jest.fn(() => false), - } - } - - beforeEach(() => { - queue = { - bufferSleep: jest.fn(), - pluginsServer: { - INGESTION_CONCURRENCY: 4, - statsd: { - timing: jest.fn(), - increment: jest.fn(), - histogram: jest.fn(), - gauge: jest.fn(), - }, - kafkaProducer: { - queueMessage: jest.fn(), - }, - db: 'database', - }, - workerMethods: { - runEventPipeline: jest.fn(() => Promise.resolve({})), - }, - } - }) - - it('reroutes events with no key to OVERFLOW topic', async () => { - const batch = { - batch: { - partition: 0, - topic: KAFKA_EVENTS_PLUGIN_INGESTION, - messages: [ - { - value: JSON.stringify(captureEndpointEvent), - timestamp: captureEndpointEvent['timestamp'], - offset: captureEndpointEvent['offset'], - key: null, - }, - ], - }, - resolveOffset: jest.fn(), - heartbeat: jest.fn(), - commitOffsetsIfNecessary: jest.fn(), - isRunning: jest.fn(() => true), - isStale: jest.fn(() => false), - } - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) - - await eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Reroute) - - expect(consume).not.toHaveBeenCalled() - expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.queueMessage).toHaveBeenCalledWith( - { - topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - messages: [ - { - value: JSON.stringify(captureEndpointEvent), - timestamp: captureEndpointEvent['timestamp'], - offset: captureEndpointEvent['offset'], - key: null, - }, - ], - }, - true - ) - - // Event is not processed here - expect(queue.workerMethods.runEventPipeline).not.toHaveBeenCalled() - }) - - it('reroutes excess events to OVERFLOW topic', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) - - await eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Reroute) - - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 - ) - expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.queueMessage).toHaveBeenCalledWith( - { - topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - messages: [ - { - value: JSON.stringify(captureEndpointEvent), - timestamp: captureEndpointEvent['timestamp'], - offset: captureEndpointEvent['offset'], - key: null, - }, - ], - }, - true - ) - - // Event is not processed here - expect(queue.workerMethods.runEventPipeline).not.toHaveBeenCalled() - }) - - it('does not reroute if not over capacity limit', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) - - await eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Reroute) - - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 - ) - expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled() - // Event is processed - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() - }) -}) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index e97de06c7d4b0..d0a1224112391 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -1,9 +1,7 @@ -import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../../src/config/kafka-topics' import { eachBatchParallelIngestion, IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' -import { eachBatchLegacyIngestion } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs' import { WarningLimiter } from '../../../src/utils/token-bucket' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' @@ -104,95 +102,3 @@ describe('eachBatchParallelIngestion with overflow consume', () => { expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() }) }) - -describe('eachBatchLegacyIngestion with overflow consume', () => { - let queue: any - - function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any { - return { - batch: { - partition: 0, - topic: KAFKA_EVENTS_PLUGIN_INGESTION, - messages: events.map((event) => ({ - value: JSON.stringify(event), - timestamp, - offset: event.offset, - key: event.team_id + ':' + event.distinct_id, - })), - }, - resolveOffset: jest.fn(), - heartbeat: jest.fn(), - commitOffsetsIfNecessary: jest.fn(), - isRunning: jest.fn(() => true), - isStale: jest.fn(() => false), - } - } - - beforeEach(() => { - queue = { - bufferSleep: jest.fn(), - pluginsServer: { - INGESTION_CONCURRENCY: 4, - statsd: { - timing: jest.fn(), - increment: jest.fn(), - histogram: jest.fn(), - gauge: jest.fn(), - }, - kafkaProducer: { - queueMessage: jest.fn(), - }, - teamManager: { - getTeamForEvent: jest.fn(), - }, - db: 'database', - }, - workerMethods: { - runEventPipeline: jest.fn(() => Promise.resolve({})), - }, - } - }) - - it('raises ingestion warning when consuming from overflow', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) - const consume = jest.spyOn(WarningLimiter, 'consume').mockImplementation(() => true) - - queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 }) - await eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Consume) - - expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1) - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 - ) - expect(captureIngestionWarning).toHaveBeenCalledWith( - queue.pluginsServer.db, - captureEndpointEvent['team_id'], - 'ingestion_capacity_overflow', - { - overflowDistinctId: captureEndpointEvent['distinct_id'], - } - ) - - // Event is processed - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() - }) - - it('does not raise ingestion warning when under threshold', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) - const consume = jest.spyOn(WarningLimiter, 'consume').mockImplementation(() => false) - - queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 }) - await eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Consume) - - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 - ) - expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled() - - // Event is processed - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalled() - }) -}) diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index 2f8756e23665e..e2f8a9b018947 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -5,10 +5,6 @@ import { IngestionOverflowMode, splitIngestionBatch, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' -import { - eachBatchLegacyIngestion, - splitKafkaJSIngestionBatch, -} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs' import { eachBatchAppsOnEventHandlers } from '../../../src/main/ingestion-queues/batch-processing/each-batch-onevent' import { eachBatchWebhooksHandlers, @@ -527,135 +523,4 @@ describe('eachBatchX', () => { }) }) }) - - describe('eachBatchLegacyIngestion', () => { - it('calls runEventPipeline', async () => { - const batch = createKafkaJSBatch(captureEndpointEvent) - await eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Disabled) - - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledWith({ - distinct_id: 'id', - event: 'event', - properties: {}, - ip: null, - now: null, - sent_at: null, - site_url: null, - team_id: 1, - uuid: 'uuid1', - }) - expect(queue.pluginsServer.statsd.timing).toHaveBeenCalledWith( - 'kafka_queue.each_batch_legacy_ingestion', - expect.any(Date) - ) - }) - - it('fails the batch if runEventPipeline rejects', async () => { - const batch = createKafkaJSBatch(captureEndpointEvent) - queue.workerMethods.runEventPipeline = jest.fn(() => Promise.reject('runEventPipeline nopes out')) - await expect(eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe( - 'runEventPipeline nopes out' - ) - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledTimes(1) - }) - - it('fails the batch if one deferred promise rejects', async () => { - const batch = createKafkaJSBatch(captureEndpointEvent) - queue.workerMethods.runEventPipeline = jest.fn(() => - Promise.resolve({ - promises: [Promise.resolve(), Promise.reject('deferred nopes out')], - }) - ) - await expect(eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe( - 'deferred nopes out' - ) - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledTimes(1) - }) - - it('batches events by team or token and distinct_id', () => { - const batch = createKafkaJSBatchWithMultipleEvents([ - { ...captureEndpointEvent, team_id: 3, distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: 3, distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: 3, distinct_id: 'b' }, - { ...captureEndpointEvent, team_id: 4, distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: 4, distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: 4, distinct_id: 'b' }, - { ...captureEndpointEvent, team_id: undefined, token: 'tok', distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: undefined, token: 'tok', distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: undefined, token: 'tok', distinct_id: 'b' }, - { ...captureEndpointEvent, team_id: 3, distinct_id: 'c' }, - { ...captureEndpointEvent, team_id: 3, distinct_id: 'b' }, - { ...captureEndpointEvent, team_id: 3, distinct_id: 'a' }, - ]) - const stats = new Map() - for (const group of splitIngestionBatch(batch.batch.messages, IngestionOverflowMode.Disabled).toProcess) { - const key = `${group[0].pluginEvent.team_id}:${group[0].pluginEvent.token}:${group[0].pluginEvent.distinct_id}` - for (const { pluginEvent: event } of group) { - expect(`${event.team_id}:${event.token}:${event.distinct_id}`).toEqual(key) - } - stats.set(key, group.length) - } - expect(stats.size).toEqual(7) - expect(stats).toEqual( - new Map([ - ['3:undefined:a', 3], - ['3:undefined:b', 2], - ['3:undefined:c', 1], - ['4:undefined:a', 2], - ['4:undefined:b', 1], - ['undefined:tok:a', 2], - ['undefined:tok:b', 1], - ]) - ) - }) - - it('does not batch events when consuming overflow', () => { - const input = createKafkaJSBatchWithMultipleEvents([ - { ...captureEndpointEvent, team_id: 3, distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: 3, distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: 3, distinct_id: 'b' }, - { ...captureEndpointEvent, team_id: 4, distinct_id: 'a' }, - { ...captureEndpointEvent, team_id: 4, distinct_id: 'a' }, - ]) - const batches = splitKafkaJSIngestionBatch(input.batch.messages, IngestionOverflowMode.Consume).toProcess - expect(batches.length).toEqual(input.batch.messages.length) - for (const group of batches) { - expect(group.length).toEqual(1) - } - }) - - it('batches events but commits offsets only once', async () => { - const batch = createKafkaJSBatchWithMultipleEvents([ - { ...captureEndpointEvent, offset: 1, team_id: 3 }, - { ...captureEndpointEvent, offset: 2, team_id: 3 }, // repeat - { ...captureEndpointEvent, offset: 3, team_id: 3 }, // repeat - { ...captureEndpointEvent, offset: 4, team_id: 3 }, // repeat - { ...captureEndpointEvent, offset: 5, team_id: 3 }, // repeat - { ...captureEndpointEvent, offset: 6, team_id: 3, distinct_id: 'id2' }, - { ...captureEndpointEvent, offset: 7, team_id: 4 }, - { ...captureEndpointEvent, offset: 8, team_id: 5 }, - { ...captureEndpointEvent, offset: 9, team_id: 5 }, // repeat - { ...captureEndpointEvent, offset: 10, team_id: 3, distinct_id: 'id2' }, // repeat - { ...captureEndpointEvent, offset: 11, team_id: 8 }, - { ...captureEndpointEvent, offset: 12, team_id: 4 }, // repeat - { ...captureEndpointEvent, offset: 13, team_id: 3 }, // repeat - { ...captureEndpointEvent, offset: 14, team_id: 5 }, // repeat - ]) - - await eachBatchLegacyIngestion(batch, queue, IngestionOverflowMode.Disabled) - expect(batch.resolveOffset).toBeCalledTimes(1) - expect(batch.resolveOffset).toHaveBeenCalledWith(14) - expect(queue.workerMethods.runEventPipeline).toHaveBeenCalledTimes(14) - expect(queue.pluginsServer.statsd.histogram).toHaveBeenCalledWith( - 'ingest_event_batching.input_length', - 14, - { - key: 'ingestion', - } - ) - expect(queue.pluginsServer.statsd.histogram).toHaveBeenCalledWith('ingest_event_batching.batch_count', 5, { - key: 'ingestion', - }) - }) - }) })