diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index abbd770d7bb77..c6ff46bf5bf6d 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -106,6 +106,7 @@ export const capture = async ({ }) ), key: teamId ? teamId.toString() : '', + waitForAck: true, }) } diff --git a/plugin-server/functional_tests/jobs-consumer.test.ts b/plugin-server/functional_tests/jobs-consumer.test.ts index 30e2abd9af282..353bd3518397e 100644 --- a/plugin-server/functional_tests/jobs-consumer.test.ts +++ b/plugin-server/functional_tests/jobs-consumer.test.ts @@ -43,7 +43,7 @@ describe('dlq handling', () => { test.concurrent(`handles empty messages`, async () => { const key = uuidv4() - await produce({ topic: 'jobs', message: null, key }) + await produce({ topic: 'jobs', message: null, key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -54,7 +54,7 @@ describe('dlq handling', () => { test.concurrent(`handles invalid JSON`, async () => { const key = uuidv4() - await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key }) + await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -72,7 +72,7 @@ describe('dlq handling', () => { labels: { topic: 'jobs', partition: '0', groupId: 'jobs-inserter' }, }) - await produce({ topic: 'jobs', message: Buffer.from(''), key: '' }) + await produce({ topic: 'jobs', message: Buffer.from(''), key: '', waitForAck: true }) await waitForExpect(async () => { const metricAfter = await getMetric({ diff --git a/plugin-server/functional_tests/kafka.ts b/plugin-server/functional_tests/kafka.ts index c2ab7ac87a6ab..f431488b290ac 100644 --- a/plugin-server/functional_tests/kafka.ts +++ b/plugin-server/functional_tests/kafka.ts @@ -36,7 +36,17 @@ export async function createKafkaProducer() { return producer } -export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) { +export async function produce({ + topic, + message, + key, + waitForAck, +}: { + topic: string + message: Buffer | null + key: string + waitForAck: boolean +}) { producer = producer ?? (await createKafkaProducer()) - await defaultProduce({ producer, topic, value: message, key: Buffer.from(key) }) + await defaultProduce({ producer, topic, value: message, key: Buffer.from(key), waitForAck }) } diff --git a/plugin-server/functional_tests/scheduled-tasks-runner.test.ts b/plugin-server/functional_tests/scheduled-tasks-runner.test.ts index 3e3345245a644..48764ae7f90a7 100644 --- a/plugin-server/functional_tests/scheduled-tasks-runner.test.ts +++ b/plugin-server/functional_tests/scheduled-tasks-runner.test.ts @@ -43,7 +43,7 @@ describe('dlq handling', () => { test.concurrent(`handles empty messages`, async () => { const key = uuidv4() - await produce({ topic: 'scheduled_tasks', message: null, key }) + await produce({ topic: 'scheduled_tasks', message: null, key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -54,7 +54,7 @@ describe('dlq handling', () => { test.concurrent(`handles invalid JSON`, async () => { const key = uuidv4() - await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key }) + await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -69,6 +69,7 @@ describe('dlq handling', () => { topic: 'scheduled_tasks', message: Buffer.from(JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 })), key, + waitForAck: true, }) await waitForExpect(() => { @@ -84,6 +85,7 @@ describe('dlq handling', () => { topic: 'scheduled_tasks', message: Buffer.from(JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' })), key, + waitForAck: true, }) await waitForExpect(() => { @@ -104,7 +106,7 @@ describe('dlq handling', () => { // NOTE: we don't actually care too much about the contents of the // message, just that it triggeres the consumer to try to process it. - await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '' }) + await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '', waitForAck: true }) await waitForExpect(async () => { const metricAfter = await getMetric({ diff --git a/plugin-server/functional_tests/session-recordings.test.ts b/plugin-server/functional_tests/session-recordings.test.ts index 62075bc6bd10f..783fbdbeb43cd 100644 --- a/plugin-server/functional_tests/session-recordings.test.ts +++ b/plugin-server/functional_tests/session-recordings.test.ts @@ -173,7 +173,12 @@ test.skip('consumer updates timestamp exported to prometheus', async () => { }, }) - await produce({ topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, message: Buffer.from(''), key: '' }) + await produce({ + topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + message: Buffer.from(''), + key: '', + waitForAck: true, + }) await waitForExpect(async () => { const metricAfter = await getMetric({ @@ -245,6 +250,7 @@ test.skip(`handles message with no token or with token and no associated team_id topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, message: Buffer.from(JSON.stringify({ uuid: noTokenUuid, data: JSON.stringify({}) })), key: noTokenKey, + waitForAck: true, }) await produce({ topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, @@ -252,6 +258,7 @@ test.skip(`handles message with no token or with token and no associated team_id JSON.stringify({ uuid: noAssociatedTeamUuid, token: 'no associated team', data: JSON.stringify({}) }) ), key: noAssociatedTeamKey, + waitForAck: true, }) await capture(makeSessionMessage(teamId, 'should be ingested')) diff --git a/plugin-server/src/kafka/producer.ts b/plugin-server/src/kafka/producer.ts index 7029a26c79fbd..062785f902bc4 100644 --- a/plugin-server/src/kafka/producer.ts +++ b/plugin-server/src/kafka/producer.ts @@ -7,6 +7,7 @@ import { NumberNullUndefined, ProducerGlobalConfig, } from 'node-rdkafka' +import { Summary } from 'prom-client' import { getSpan } from '../sentry' import { status } from '../utils/status' @@ -17,6 +18,13 @@ export type KafkaProducerConfig = { KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES: number } +export const ingestEventKafkaProduceLatency = new Summary({ + name: 'ingest_event_kafka_produce_latency', + help: 'Wait time for individual Kafka produces', + labelNames: ['topic', 'waitForAck'], + percentiles: [0.5, 0.9, 0.95, 0.99], +}) + // Kafka production related functions using node-rdkafka. export const createKafkaProducer = async (globalConfig: ProducerGlobalConfig, producerConfig: KafkaProducerConfig) => { const producer = new RdKafkaProducer({ @@ -71,18 +79,22 @@ export const produce = async ({ value, key, headers = [], - waitForAck = true, + waitForAck, }: { producer: RdKafkaProducer topic: string value: MessageValue key: MessageKey headers?: MessageHeader[] - waitForAck?: boolean + waitForAck: boolean }): Promise => { status.debug('📤', 'Producing message', { topic: topic }) const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' }) return await new Promise((resolve, reject) => { + const produceTimer = ingestEventKafkaProduceLatency + .labels({ topic, waitForAck: waitForAck.toString() }) + .startTimer() + if (waitForAck) { producer.produce( topic, @@ -100,6 +112,7 @@ export const produce = async ({ resolve(offset) } + produceTimer() produceSpan?.finish() } ) @@ -112,6 +125,7 @@ export const produce = async ({ produceSpan?.finish() }) resolve(undefined) + produceTimer() } }) } diff --git a/plugin-server/src/main/graphile-worker/schedule.ts b/plugin-server/src/main/graphile-worker/schedule.ts index d50c672cea428..16435d02c0466 100644 --- a/plugin-server/src/main/graphile-worker/schedule.ts +++ b/plugin-server/src/main/graphile-worker/schedule.ts @@ -56,8 +56,11 @@ export async function runScheduledTasks( for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId }) await server.kafkaProducer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS, - messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], + }, + waitForAck: true, }) graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc() } diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 749e41c18c335..588c2c92beb86 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -15,6 +15,7 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics' import { ingestEventBatchingBatchCountSummary, ingestEventBatchingInputLengthSummary, + ingestEventEachBatchKafkaAckWait, ingestionOverflowingMessagesTotal, ingestionParallelism, ingestionParallelismPotential, @@ -41,7 +42,7 @@ type IngestionSplitBatch = { type IngestResult = { // Promises that the batch handler should await on before committing offsets, // contains the Kafka producer ACKs, to avoid blocking after every message. - promises?: Array> + ackPromises?: Array> } async function handleProcessingError( @@ -166,7 +167,7 @@ export async function eachBatchParallelIngestion( return await runner.runEventPipeline(pluginEvent) })) as IngestResult - result.promises?.forEach((promise) => + result.ackPromises?.forEach((promise) => processingPromises.push( promise.catch(async (error) => { await handleProcessingError(error, message, pluginEvent, queue) @@ -227,7 +228,9 @@ export async function eachBatchParallelIngestion( // impact the success. Delaying ACKs allows the producer to write in big batches for // better throughput and lower broker load. const awaitSpan = transaction.startChild({ op: 'awaitACKs', data: { promiseCount: processingPromises.length } }) + const kafkaAckWaitMetric = ingestEventEachBatchKafkaAckWait.startTimer() await Promise.all(processingPromises) + kafkaAckWaitMetric() awaitSpan.finish() for (const message of messages) { diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts b/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts index 42c1b06a27b5d..60563b6cabaaa 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/metrics.ts @@ -41,3 +41,9 @@ export const ingestEventBatchingBatchCountSummary = new Summary({ help: 'Number of batches of events', percentiles: [0.5, 0.9, 0.95, 0.99], }) + +export const ingestEventEachBatchKafkaAckWait = new Summary({ + name: 'ingest_event_each_batch_kafka_ack_wait', + help: 'Wait time for the batch of Kafka ACKs at the end of eachBatchParallelIngestion', + percentiles: [0.5, 0.9, 0.95, 0.99], +}) diff --git a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts index 94549340da4fe..605a812068c51 100644 --- a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts @@ -54,8 +54,11 @@ export const startJobsConsumer = async ({ }) // TODO: handle resolving offsets asynchronously await producer.queueMessage({ - topic: KAFKA_JOBS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) resolveOffset(message.offset) continue @@ -71,8 +74,11 @@ export const startJobsConsumer = async ({ }) // TODO: handle resolving offsets asynchronously await producer.queueMessage({ - topic: KAFKA_JOBS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) resolveOffset(message.offset) continue diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts index 3de544ce2d0a4..83ea62fdfdd6f 100644 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -163,8 +163,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = value: message.value, }) await producer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) continue } @@ -181,8 +184,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = error: error.stack ?? error, }) await producer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) continue } @@ -190,8 +196,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) { status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task) await producer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) continue } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index 5729da5cb373e..1c581451e44ec 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -163,6 +163,7 @@ export class ConsoleLogsIngester { topic: KAFKA_LOG_ENTRIES, value: Buffer.from(JSON.stringify(cle)), key: event.session_id, + waitForAck: true, }) ) } catch (error) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index 632f695a158f5..029f28f20bb9a 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -171,6 +171,7 @@ export class ReplayEventsIngester { topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, value: Buffer.from(JSON.stringify(replayRecord)), key: event.session_id, + waitForAck: true, }), ] } catch (error) { diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 2baa10671a91e..c7b6ce86a895a 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -707,7 +707,7 @@ export class DB { }) } - await this.kafkaProducer.queueMessages(kafkaMessages) + await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) return person } @@ -759,7 +759,7 @@ export class DB { if (tx) { kafkaMessages.push(message) } else { - await this.kafkaProducer.queueMessage(message) + await this.kafkaProducer.queueMessage({ kafkaMessage: message, waitForAck: true }) } status.debug( @@ -829,7 +829,7 @@ export class DB { public async addDistinctId(person: Person, distinctId: string): Promise { const kafkaMessages = await this.addDistinctIdPooled(person, distinctId) if (kafkaMessages.length) { - await this.kafkaProducer.queueMessages(kafkaMessages) + await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) } } @@ -1072,15 +1072,15 @@ export class DB { pluginLogEntryCounter.labels({ plugin_id: String(pluginConfig.plugin_id), source }).inc() try { - await this.kafkaProducer.queueSingleJsonMessage( - KAFKA_PLUGIN_LOG_ENTRIES, - parsedEntry.id, - parsedEntry, + await this.kafkaProducer.queueSingleJsonMessage({ + topic: KAFKA_PLUGIN_LOG_ENTRIES, + key: parsedEntry.id, + object: parsedEntry, // For logs, we relax our durability requirements a little and // do not wait for acks that Kafka has persisted the message to // disk. - false - ) + waitForAck: false, + }) } catch (e) { captureException(e, { tags: { team_id: entry.pluginConfig.team_id } }) console.error('Failed to produce message', e, parsedEntry) @@ -1409,19 +1409,22 @@ export class DB { version: number ): Promise { await this.kafkaProducer.queueMessage({ - topic: KAFKA_GROUPS, - messages: [ - { - value: JSON.stringify({ - group_type_index: groupTypeIndex, - group_key: groupKey, - team_id: teamId, - group_properties: JSON.stringify(properties), - created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision), - version, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_GROUPS, + messages: [ + { + value: JSON.stringify({ + group_type_index: groupTypeIndex, + group_key: groupKey, + team_id: teamId, + group_properties: JSON.stringify(properties), + created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision), + version, + }), + }, + ], + }, + waitForAck: true, }) } diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 0a50533a1dbdb..098a44e7d4aa6 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -159,13 +159,16 @@ export async function createHub( // chained, and if we do not manage to produce then the chain will be // broken. await kafkaProducer.queueMessage({ - topic: KAFKA_JOBS, - messages: [ - { - value: Buffer.from(JSON.stringify(job)), - key: Buffer.from(job.pluginConfigTeam.toString()), - }, - ], + kafkaMessage: { + topic: KAFKA_JOBS, + messages: [ + { + value: Buffer.from(JSON.stringify(job)), + key: Buffer.from(job.pluginConfigTeam.toString()), + }, + ], + }, + waitForAck: true, }) } diff --git a/plugin-server/src/utils/db/kafka-producer-wrapper.ts b/plugin-server/src/utils/db/kafka-producer-wrapper.ts index 8f7cef4c06b30..0ea1e01c5099f 100644 --- a/plugin-server/src/utils/db/kafka-producer-wrapper.ts +++ b/plugin-server/src/utils/db/kafka-producer-wrapper.ts @@ -35,7 +35,7 @@ export class KafkaProducerWrapper { key: MessageKey topic: string headers?: MessageHeader[] - waitForAck?: boolean + waitForAck: boolean }): Promise { try { kafkaProducerMessagesQueuedCounter.labels({ topic_name: topic }).inc() @@ -66,7 +66,7 @@ export class KafkaProducerWrapper { } } - async queueMessage(kafkaMessage: ProducerRecord, waitForAck?: boolean) { + async queueMessage({ kafkaMessage, waitForAck }: { kafkaMessage: ProducerRecord; waitForAck: boolean }) { return await Promise.all( kafkaMessage.messages.map((message) => this.produce({ @@ -80,23 +80,34 @@ export class KafkaProducerWrapper { ) } - async queueMessages(kafkaMessages: ProducerRecord[], waitForAck?: boolean): Promise { - await Promise.all(kafkaMessages.map((message) => this.queueMessage(message, waitForAck))) + async queueMessages({ + kafkaMessages, + waitForAck, + }: { + kafkaMessages: ProducerRecord[] + waitForAck: boolean + }): Promise { + await Promise.all(kafkaMessages.map((kafkaMessage) => this.queueMessage({ kafkaMessage, waitForAck }))) } - async queueSingleJsonMessage( - topic: string, - key: Message['key'], - object: Record, - waitForAck?: boolean - ): Promise { - await this.queueMessage( - { + async queueSingleJsonMessage({ + topic, + key, + object, + waitForAck, + }: { + topic: string + key: Message['key'] + object: Record + waitForAck: boolean + }): Promise { + await this.queueMessage({ + kafkaMessage: { topic, messages: [{ key, value: JSON.stringify(object) }], }, - waitForAck - ) + waitForAck, + }) } public async flush() { diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index d8f52a7401150..5e0a83c92ae31 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -183,8 +183,11 @@ export class AppMetrics { })) await this.kafkaProducer.queueMessage({ - topic: KAFKA_APP_METRICS, - messages: kafkaMessages, + kafkaMessage: { + topic: KAFKA_APP_METRICS, + messages: kafkaMessages, + }, + waitForAck: true, }) status.debug('🚽', `Finished flushing app metrics, took ${Date.now() - startTime}ms`) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 7d71548381b16..6ae2248513073 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -25,7 +25,7 @@ import { processPersonsStep } from './processPersonsStep' export type EventPipelineResult = { // Promises that the batch handler should await on before committing offsets, // contains the Kafka producer ACKs, to avoid blocking after every message. - promises?: Array> + ackPromises?: Array> // Only used in tests // TODO: update to test for side-effects of running the pipeline rather than // this return type. @@ -78,14 +78,14 @@ export class EventPipelineRunner { drop_cause: 'disallowed', }) .inc() - return this.registerLastStep('eventDisallowedStep', null, [event]) + return this.registerLastStep('eventDisallowedStep', [event]) } let result: EventPipelineResult const eventWithTeam = await this.runStep(populateTeamDataStep, [this, event], event.team_id || -1) if (eventWithTeam != null) { result = await this.runEventPipelineSteps(eventWithTeam) } else { - result = this.registerLastStep('populateTeamDataStep', null, [event]) + result = this.registerLastStep('populateTeamDataStep', [event]) } eventProcessedAndIngestedCounter.inc() return result @@ -120,7 +120,7 @@ export class EventPipelineRunner { const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id) if (processedEvent == null) { - return this.registerLastStep('pluginsProcessEventStep', event.team_id, [event]) + return this.registerLastStep('pluginsProcessEventStep', [event]) } const [normalizedEvent, person] = await this.runStep(processPersonsStep, [this, processedEvent], event.team_id) @@ -132,17 +132,12 @@ export class EventPipelineRunner { event.team_id ) - return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person], [eventAck]) + return this.registerLastStep('createEventStep', [rawClickhouseEvent, person], [eventAck]) } - registerLastStep( - stepName: string, - teamId: number | null, - args: any[], - promises?: Array> - ): EventPipelineResult { + registerLastStep(stepName: string, args: any[], ackPromises?: Array>): EventPipelineResult { pipelineLastStepCounter.labels(stepName).inc() - return { promises: promises, lastStep: stepName, args } + return { ackPromises, lastStep: stepName, args } } protected runStep any>( @@ -218,7 +213,7 @@ export class EventPipelineRunner { teamId, `plugin_server_ingest_event:${currentStepName}` ) - await this.hub.db.kafkaProducer!.queueMessage(message) + await this.hub.db.kafkaProducer!.queueMessage({ kafkaMessage: message, waitForAck: true }) } catch (dlqError) { status.info('🔔', `Errored trying to add event to dead letter queue. Error: ${dlqError}`) Sentry.captureException(dlqError, { diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index b2356f3652662..525bbbf84c910 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -453,7 +453,7 @@ export class PersonState { olderCreatedAt, // Keep the oldest created_at (i.e. the first time we've seen either person) properties ) - await this.db.kafkaProducer.queueMessages(kafkaMessages) + await this.db.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) return mergedPerson } @@ -767,7 +767,7 @@ export class DeferredPersonOverrideWorker { // Postgres for some reason -- the same row state should be // generated each call, and the receiving ReplacingMergeTree will // ensure we keep only the latest version after all writes settle.) - await this.kafkaProducer.queueMessages(messages, true) + await this.kafkaProducer.queueMessages({ kafkaMessages: messages, waitForAck: true }) return rows.length } diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts index c52ef4ebba78e..9488ee759581b 100644 --- a/plugin-server/src/worker/ingestion/utils.ts +++ b/plugin-server/src/worker/ingestion/utils.ts @@ -80,18 +80,21 @@ export async function captureIngestionWarning( const limiter_key = `${teamId}:${type}:${debounce?.key || ''}` if (!!debounce?.alwaysSend || IngestionWarningLimiter.consume(limiter_key, 1)) { await kafkaProducer.queueMessage({ - topic: KAFKA_INGESTION_WARNINGS, - messages: [ - { - value: JSON.stringify({ - team_id: teamId, - type: type, - source: 'plugin-server', - details: JSON.stringify(details), - timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), - }), - }, - ], + kafkaMessage: { + topic: KAFKA_INGESTION_WARNINGS, + messages: [ + { + value: JSON.stringify({ + team_id: teamId, + type: type, + source: 'plugin-server', + details: JSON.stringify(details), + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + }), + }, + ], + }, + waitForAck: true, }) } else { return Promise.resolve() diff --git a/plugin-server/src/worker/vm/extensions/posthog.ts b/plugin-server/src/worker/vm/extensions/posthog.ts index c7a0a7124c50d..34e9cb2befd1c 100644 --- a/plugin-server/src/worker/vm/extensions/posthog.ts +++ b/plugin-server/src/worker/vm/extensions/posthog.ts @@ -29,22 +29,25 @@ async function queueEvent(hub: Hub, pluginConfig: PluginConfig, data: InternalDa const partitionKey = partitionKeyHash.digest('hex') await hub.kafkaProducer.queueMessage({ - topic: hub.KAFKA_CONSUMPTION_TOPIC!, - messages: [ - { - key: partitionKey, - value: JSON.stringify({ - distinct_id: data.distinct_id, - ip: '', - site_url: '', - data: JSON.stringify(data), - team_id: pluginConfig.team_id, - now: data.timestamp, - sent_at: data.timestamp, - uuid: data.uuid, - } as RawEventMessage), - }, - ], + kafkaMessage: { + topic: hub.KAFKA_CONSUMPTION_TOPIC!, + messages: [ + { + key: partitionKey, + value: JSON.stringify({ + distinct_id: data.distinct_id, + ip: '', + site_url: '', + data: JSON.stringify(data), + team_id: pluginConfig.team_id, + now: data.timestamp, + sent_at: data.timestamp, + uuid: data.uuid, + } as RawEventMessage), + }, + ], + }, + waitForAck: true, }) } diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 2adc7567c8a5d..14448f196f9be 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -367,9 +367,10 @@ describe('DB', () => { expect(updatedPerson.properties).toEqual({ c: 'aaa' }) // verify correct Kafka message was sent - expect(db.kafkaProducer!.queueMessage).toHaveBeenLastCalledWith( - generateKafkaPersonUpdateMessage(updatedPerson) - ) + expect(db.kafkaProducer!.queueMessage).toHaveBeenLastCalledWith({ + kafkaMessage: generateKafkaPersonUpdateMessage(updatedPerson), + waitForAck: true, + }) }) }) @@ -416,7 +417,7 @@ describe('DB', () => { await delayUntilEventIngested(fetchPersonsRows, 2) const kafkaMessages = await db.deletePerson(person) - await db.kafkaProducer.queueMessages(kafkaMessages) + await db.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) await db.kafkaProducer.flush() const persons = await delayUntilEventIngested(fetchPersonsRows, 3) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index 851bb23e2ac14..774475a5b34aa 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -97,20 +97,23 @@ describe('eachBatchParallelIngestion with overflow consume', () => { expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1) expect(consume).toHaveBeenCalledWith('1:ingestion_capacity_overflow:id', 1) expect(mockQueueMessage).toHaveBeenCalledWith({ - topic: 'clickhouse_ingestion_warnings_test', - messages: [ - { - value: JSON.stringify({ - team_id: 1, - type: 'ingestion_capacity_overflow', - source: 'plugin-server', - details: JSON.stringify({ - overflowDistinctId: 'id', + kafkaMessage: { + topic: 'clickhouse_ingestion_warnings_test', + messages: [ + { + value: JSON.stringify({ + team_id: 1, + type: 'ingestion_capacity_overflow', + source: 'plugin-server', + details: JSON.stringify({ + overflowDistinctId: 'id', + }), + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), }), - timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), - }), - }, - ], + }, + ], + }, + waitForAck: true, }) // Event is processed diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index 667c278d243f1..b0e61e62fdd66 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -408,7 +408,7 @@ describe('eachBatchX', () => { const batch = createBatch(captureEndpointEvent) runEventPipeline.mockImplementationOnce(() => Promise.resolve({ - promises: [Promise.resolve(), Promise.reject('deferred nopes out')], + ackPromises: [Promise.resolve(), Promise.reject('deferred nopes out')], }) ) const tokenBlockList = buildStringMatcher('another_token,more_token', false) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts index 42dfb9e55b5c1..6698b40a8ca6a 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts @@ -80,6 +80,7 @@ describe('console log ingester', () => { timestamp: '1970-01-01 00:00:00.000', }) ), + waitForAck: true, }, ], ]) @@ -124,6 +125,7 @@ describe('console log ingester', () => { timestamp: '1970-01-01 00:00:00.000', }) ), + waitForAck: true, }, ], [ @@ -142,6 +144,7 @@ describe('console log ingester', () => { timestamp: '1970-01-01 00:00:00.000', }) ), + waitForAck: true, }, ], ]) @@ -181,6 +184,7 @@ describe('console log ingester', () => { timestamp: '1970-01-01 00:00:00.000', }) ), + waitForAck: true, }, ], ]) diff --git a/plugin-server/tests/main/jobs/schedule.test.ts b/plugin-server/tests/main/jobs/schedule.test.ts index 150d171f97d3b..b6b52c892f23d 100644 --- a/plugin-server/tests/main/jobs/schedule.test.ts +++ b/plugin-server/tests/main/jobs/schedule.test.ts @@ -37,120 +37,147 @@ describe('Graphile Worker schedule', () => { } as any) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(1, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '1', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 1, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '1', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 1, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(2, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '2', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 2, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '2', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 2, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(3, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '3', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 3, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '3', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 3, + }), + }, + ], + }, + waitForAck: true, }) await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour', { job: { run_at: new Date() }, } as any) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(4, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '4', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 4, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '4', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 4, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(5, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '5', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 5, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '5', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 5, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(6, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '6', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 6, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '6', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 6, + }), + }, + ], + }, + waitForAck: true, }) await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay', { job: { run_at: new Date() }, } as any) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(7, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '7', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 7, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '7', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 7, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(8, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '8', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 8, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '8', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 8, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(9, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '9', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 9, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '9', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 9, + }), + }, + ], + }, + waitForAck: true, }) }) }) diff --git a/plugin-server/tests/worker/console.test.ts b/plugin-server/tests/worker/console.test.ts index 18c3480989700..4535f10f6e327 100644 --- a/plugin-server/tests/worker/console.test.ts +++ b/plugin-server/tests/worker/console.test.ts @@ -42,10 +42,10 @@ describe('console extension', () => { await (console[typeMethod](...args) as unknown as Promise) expect(queueSingleJsonMessageSpy).toHaveBeenCalledTimes(1) - expect(queueSingleJsonMessageSpy).toHaveBeenCalledWith( - KAFKA_PLUGIN_LOG_ENTRIES, - expect.any(String), - { + expect(queueSingleJsonMessageSpy).toHaveBeenCalledWith({ + topic: KAFKA_PLUGIN_LOG_ENTRIES, + key: expect.any(String), + object: { source: PluginLogEntrySource.Console, type, id: expect.any(String), @@ -56,8 +56,8 @@ describe('console extension', () => { message: expectedFinalMessage, instance_id: hub.instanceId.toString(), }, - false - ) + waitForAck: false, + }) }) }) }) diff --git a/plugin-server/tests/worker/ingestion/__snapshots__/app-metrics.test.ts.snap b/plugin-server/tests/worker/ingestion/__snapshots__/app-metrics.test.ts.snap index 10cf219b7e43b..1894a82b49dbd 100644 --- a/plugin-server/tests/worker/ingestion/__snapshots__/app-metrics.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/__snapshots__/app-metrics.test.ts.snap @@ -4,12 +4,15 @@ exports[`AppMetrics() flush() flushes queued messages 1`] = ` Array [ Array [ Object { - "messages": Array [ - Object { - "value": "{\\"timestamp\\":\\"1970-01-01 00:16:40.000\\",\\"team_id\\":2,\\"plugin_config_id\\":2,\\"job_id\\":\\"000-000\\",\\"category\\":\\"processEvent\\",\\"successes\\":1,\\"successes_on_retry\\":0,\\"failures\\":0}", - }, - ], - "topic": "clickhouse_app_metrics_test", + "kafkaMessage": Object { + "messages": Array [ + Object { + "value": "{\\"timestamp\\":\\"1970-01-01 00:16:40.000\\",\\"team_id\\":2,\\"plugin_config_id\\":2,\\"job_id\\":\\"000-000\\",\\"category\\":\\"processEvent\\",\\"successes\\":1,\\"successes_on_retry\\":0,\\"failures\\":0}", + }, + ], + "topic": "clickhouse_app_metrics_test", + }, + "waitForAck": true, }, ], ] diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 4bfc79f5e2379..364483f7c09a6 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -219,7 +219,9 @@ describe('EventPipelineRunner', () => { await runner.runEventPipeline(pipelineEvent) expect(hub.db.kafkaProducer.queueMessage).toHaveBeenCalledTimes(1) - expect(JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].messages[0].value)).toMatchObject({ + expect( + JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].kafkaMessage.messages[0].value) + ).toMatchObject({ team_id: 2, distinct_id: 'my_id', error: 'Event ingestion failed. Error: testError', diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index 5cdf1246c53f5..5c764e5809b40 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -339,7 +339,7 @@ describe('postgres parity', () => { // move distinct ids from person to to anotherPerson const kafkaMessages = await hub.db.moveDistinctIds(person, anotherPerson) - await hub.db!.kafkaProducer!.queueMessages(kafkaMessages) + await hub.db!.kafkaProducer!.queueMessages({ kafkaMessages, waitForAck: true }) await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(anotherPerson, Database.ClickHouse), 2) // it got added @@ -395,7 +395,7 @@ describe('postgres parity', () => { // delete person await hub.db.postgres.transaction(PostgresUse.COMMON_WRITE, '', async (client) => { const deletePersonMessage = await hub.db.deletePerson(person, client) - await hub.db!.kafkaProducer!.queueMessage(deletePersonMessage[0]) + await hub.db!.kafkaProducer!.queueMessage({ kafkaMessage: deletePersonMessage[0], waitForAck: true }) }) await delayUntilEventIngested(async () => diff --git a/plugin-server/tests/worker/vm.test.ts b/plugin-server/tests/worker/vm.test.ts index 7e3769de61328..5f1f727d4dbeb 100644 --- a/plugin-server/tests/worker/vm.test.ts +++ b/plugin-server/tests/worker/vm.test.ts @@ -689,10 +689,10 @@ describe('vm tests', () => { await vm.methods.processEvent!(event) expect(queueSingleJsonMessageSpy).toHaveBeenCalledTimes(1) - expect(queueSingleJsonMessageSpy).toHaveBeenCalledWith( - KAFKA_PLUGIN_LOG_ENTRIES, - expect.any(String), - { + expect(queueSingleJsonMessageSpy).toHaveBeenCalledWith({ + topic: KAFKA_PLUGIN_LOG_ENTRIES, + key: expect.any(String), + object: { id: expect.any(String), instance_id: hub.instanceId.toString(), message: 'logged event', @@ -703,8 +703,8 @@ describe('vm tests', () => { timestamp: expect.any(String), type: PluginLogEntryType.Log, }, - false - ) + waitForAck: false, + }) }) test('fetch', async () => { @@ -969,8 +969,8 @@ describe('vm tests', () => { expect(response).toBe('haha') expect(queueMessageSpy).toHaveBeenCalledTimes(1) - expect(queueMessageSpy.mock.calls[0][0].topic).toEqual(KAFKA_EVENTS_PLUGIN_INGESTION) - const parsedMessage = JSON.parse(queueMessageSpy.mock.calls[0][0].messages[0].value!.toString()) + expect(queueMessageSpy.mock.calls[0][0].kafkaMessage.topic).toEqual(KAFKA_EVENTS_PLUGIN_INGESTION) + const parsedMessage = JSON.parse(queueMessageSpy.mock.calls[0][0].kafkaMessage.messages[0].value!.toString()) expect(JSON.parse(parsedMessage.data)).toMatchObject({ distinct_id: 'plugin-id-60', event: 'my-new-event', @@ -998,8 +998,8 @@ describe('vm tests', () => { expect(response).toBe('haha') expect(queueMessageSpy).toHaveBeenCalledTimes(1) - expect(queueMessageSpy.mock.calls[0][0].topic).toEqual(KAFKA_EVENTS_PLUGIN_INGESTION) - const parsedMessage = JSON.parse(queueMessageSpy.mock.calls[0][0].messages[0].value!.toString()) + expect(queueMessageSpy.mock.calls[0][0].kafkaMessage.topic).toEqual(KAFKA_EVENTS_PLUGIN_INGESTION) + const parsedMessage = JSON.parse(queueMessageSpy.mock.calls[0][0].kafkaMessage.messages[0].value!.toString()) expect(JSON.parse(parsedMessage.data)).toMatchObject({ timestamp: '2020-02-23T02:15:00Z', // taken out of the properties distinct_id: 'plugin-id-60', @@ -1025,8 +1025,8 @@ describe('vm tests', () => { expect(response).toBe('haha') expect(response).toBe('haha') expect(queueMessageSpy).toHaveBeenCalledTimes(1) - expect(queueMessageSpy.mock.calls[0][0].topic).toEqual(KAFKA_EVENTS_PLUGIN_INGESTION) - const parsedMessage = JSON.parse(queueMessageSpy.mock.calls[0][0].messages[0].value!.toString()) + expect(queueMessageSpy.mock.calls[0][0].kafkaMessage.topic).toEqual(KAFKA_EVENTS_PLUGIN_INGESTION) + const parsedMessage = JSON.parse(queueMessageSpy.mock.calls[0][0].kafkaMessage.messages[0].value!.toString()) expect(JSON.parse(parsedMessage.data)).toMatchObject({ distinct_id: 'custom id', event: 'my-new-event',