diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index ab93012d312b8..380cd8d39d812 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -157,8 +157,8 @@ abstract class CdpConsumerBase { const messages = [...this.messagesToProduce] this.messagesToProduce = [] - await this.kafkaProducer!.queueMessages({ - kafkaMessages: messages.map((x) => ({ + await this.kafkaProducer!.queueMessages( + messages.map((x) => ({ topic: x.topic, messages: [ { @@ -166,8 +166,8 @@ abstract class CdpConsumerBase { key: x.key, }, ], - })), - }).catch((reason) => { + })) + ).catch((reason) => { status.error('⚠️', `failed to produce message: ${reason}`) }) } diff --git a/plugin-server/src/kafka/producer.ts b/plugin-server/src/kafka/producer.ts index c6f8f78cdb755..b025fb7f2dd7f 100644 --- a/plugin-server/src/kafka/producer.ts +++ b/plugin-server/src/kafka/producer.ts @@ -1,4 +1,4 @@ -import { Message, ProducerRecord } from 'kafkajs' +import { Message } from 'kafkajs' import { ClientMetrics, HighLevelProducer, @@ -29,11 +29,11 @@ import { createRdConnectionConfigFromEnvVars } from './config' export type MessageKey = Exclude -export type TopicMessages = { +export type TopicMessage = { topic: string messages: { - value: string | Buffer - key: MessageKey + value: string | Buffer | null + key?: MessageKey }[] } @@ -145,15 +145,11 @@ export class KafkaProducerWrapper { } } - async queueMessages({ - kafkaMessages: kafkaMessage, - }: { - kafkaMessages: ProducerRecord | ProducerRecord[] - }): Promise { - const records = Array.isArray(kafkaMessage) ? kafkaMessage : [kafkaMessage] + async queueMessages(topicMessages: TopicMessage | TopicMessage[]): Promise { + topicMessages = Array.isArray(topicMessages) ? topicMessages : [topicMessages] await Promise.all( - records.map((record) => { + topicMessages.map((record) => { return Promise.all( record.messages.map((message) => this.produce({ @@ -167,6 +163,7 @@ export class KafkaProducerWrapper { }) ) } + public async flush() { status.debug('📤', 'flushing_producer') diff --git a/plugin-server/src/main/graphile-worker/schedule.ts b/plugin-server/src/main/graphile-worker/schedule.ts index b447ffe561790..fa1ba04684366 100644 --- a/plugin-server/src/main/graphile-worker/schedule.ts +++ b/plugin-server/src/main/graphile-worker/schedule.ts @@ -56,10 +56,8 @@ export async function runScheduledTasks( for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId }) await server.kafkaProducer.queueMessages({ - kafkaMessages: { - topic: KAFKA_SCHEDULED_TASKS, - messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], - }, + topic: KAFKA_SCHEDULED_TASKS, + messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], }) graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc() } diff --git a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts index 9539130c5a629..f21b324604901 100644 --- a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts @@ -55,10 +55,8 @@ export const startJobsConsumer = async ({ }) // TODO: handle resolving offsets asynchronously await producer.queueMessages({ - kafkaMessages: { - topic: KAFKA_JOBS_DLQ, - messages: [{ value: message.value, key: message.key }], - }, + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], }) resolveOffset(message.offset) continue @@ -74,10 +72,8 @@ export const startJobsConsumer = async ({ }) // TODO: handle resolving offsets asynchronously await producer.queueMessages({ - kafkaMessages: { - topic: KAFKA_JOBS_DLQ, - messages: [{ value: message.value, key: message.key }], - }, + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], }) resolveOffset(message.offset) continue diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts index 6af58c891500c..8ce5fdbefecae 100644 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -165,10 +165,8 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = value: message.value, }) await producer.queueMessages({ - kafkaMessages: { - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], - }, + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], }) continue } @@ -185,10 +183,8 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = error: error.stack ?? error, }) await producer.queueMessages({ - kafkaMessages: { - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], - }, + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], }) continue } @@ -196,10 +192,8 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) { status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task) await producer.queueMessages({ - kafkaMessages: { - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], - }, + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], }) continue } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index f2b21c17d7831..ea0f548cccb8c 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -156,13 +156,11 @@ export class ConsoleLogsIngester { return [ this.producer.queueMessages({ - kafkaMessages: { - topic: KAFKA_LOG_ENTRIES, - messages: consoleLogEvents.map((cle: ConsoleLogEntry) => ({ - value: JSON.stringify(cle), - key: event.session_id, - })), - }, + topic: KAFKA_LOG_ENTRIES, + messages: consoleLogEvents.map((cle: ConsoleLogEntry) => ({ + value: JSON.stringify(cle), + key: event.session_id, + })), }), ] } catch (error) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index 28998e1e19ac3..1fc12f95cac8f 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -171,15 +171,13 @@ export class ReplayEventsIngester { return [ this.producer.queueMessages({ - kafkaMessages: { - topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, - messages: [ - { - value: JSON.stringify(replayRecord), - key: event.session_id, - }, - ], - }, + topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, + messages: [ + { + value: JSON.stringify(replayRecord), + key: event.session_id, + }, + ], }), ] } catch (error) { diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 25511be67c72c..eeee1c49b1072 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -3,12 +3,11 @@ import { CacheOptions, Properties } from '@posthog/plugin-scaffold' import { captureException } from '@sentry/node' import { Pool as GenericPool } from 'generic-pool' import Redis from 'ioredis' -import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' import { QueryResult } from 'pg' import { KAFKA_GROUPS, KAFKA_PERSON_DISTINCT_ID, KAFKA_PLUGIN_LOG_ENTRIES } from '../../config/kafka-topics' -import { KafkaProducerWrapper } from '../../kafka/producer' +import { KafkaProducerWrapper, TopicMessage } from '../../kafka/producer' import { Action, ClickHouseEvent, @@ -697,7 +696,7 @@ export class DB { }) } - await this.kafkaProducer.queueMessages({ kafkaMessages }) + await this.kafkaProducer.queueMessages(kafkaMessages) return person } @@ -706,7 +705,7 @@ export class DB { person: InternalPerson, update: Partial, tx?: TransactionClient - ): Promise<[InternalPerson, ProducerRecord[]]> { + ): Promise<[InternalPerson, TopicMessage[]]> { let versionString = 'COALESCE(version, 0)::numeric + 1' if (update.version) { versionString = update.version.toString() @@ -758,7 +757,7 @@ export class DB { return [updatedPerson, [kafkaMessage]] } - public async deletePerson(person: InternalPerson, tx?: TransactionClient): Promise { + public async deletePerson(person: InternalPerson, tx?: TransactionClient): Promise { const { rows } = await this.postgres.query<{ version: string }>( tx ?? PostgresUse.COMMON_WRITE, 'DELETE FROM posthog_person WHERE team_id = $1 AND id = $2 RETURNING version', @@ -766,7 +765,7 @@ export class DB { 'deletePerson' ) - let kafkaMessages: ProducerRecord[] = [] + let kafkaMessages: TopicMessage[] = [] if (rows.length > 0) { const [row] = rows @@ -881,7 +880,7 @@ export class DB { ): Promise { const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version, tx) if (kafkaMessages.length) { - await this.kafkaProducer.queueMessages({ kafkaMessages }) + await this.kafkaProducer.queueMessages(kafkaMessages) } } @@ -890,7 +889,7 @@ export class DB { distinctId: string, version: number, tx?: TransactionClient - ): Promise { + ): Promise { const insertResult = await this.postgres.query( tx ?? PostgresUse.COMMON_WRITE, // NOTE: Keep this in sync with the posthog_persondistinctid INSERT in `createPerson` @@ -923,7 +922,7 @@ export class DB { source: InternalPerson, target: InternalPerson, tx?: TransactionClient - ): Promise { + ): Promise { let movedDistinctIdResult: QueryResult | null = null try { movedDistinctIdResult = await this.postgres.query( @@ -1133,10 +1132,8 @@ export class DB { // disk. void this.kafkaProducer .queueMessages({ - kafkaMessages: { - topic: KAFKA_PLUGIN_LOG_ENTRIES, - messages: [{ key: parsedEntry.id, value: JSON.stringify(parsedEntry) }], - }, + topic: KAFKA_PLUGIN_LOG_ENTRIES, + messages: [{ key: parsedEntry.id, value: JSON.stringify(parsedEntry) }], }) .catch((error) => { status.warn('⚠️', 'Failed to produce plugin log entry', { @@ -1424,21 +1421,19 @@ export class DB { version: number ): Promise { await this.kafkaProducer.queueMessages({ - kafkaMessages: { - topic: KAFKA_GROUPS, - messages: [ - { - value: JSON.stringify({ - group_type_index: groupTypeIndex, - group_key: groupKey, - team_id: teamId, - group_properties: JSON.stringify(properties), - created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision), - version, - }), - }, - ], - }, + topic: KAFKA_GROUPS, + messages: [ + { + value: JSON.stringify({ + group_type_index: groupTypeIndex, + group_key: groupKey, + team_id: teamId, + group_properties: JSON.stringify(properties), + created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision), + version, + }), + }, + ], }) } diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index e2bfc977cba31..c646d5cb62046 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -149,15 +149,13 @@ export async function createHub( // chained, and if we do not manage to produce then the chain will be // broken. await kafkaProducer.queueMessages({ - kafkaMessages: { - topic: KAFKA_JOBS, - messages: [ - { - value: Buffer.from(JSON.stringify(job)), - key: Buffer.from(job.pluginConfigTeam.toString()), - }, - ], - }, + topic: KAFKA_JOBS, + messages: [ + { + value: Buffer.from(JSON.stringify(job)), + key: Buffer.from(job.pluginConfigTeam.toString()), + }, + ], }) } diff --git a/plugin-server/src/utils/db/utils.ts b/plugin-server/src/utils/db/utils.ts index bf23716bc5c63..d8de36befb6d5 100644 --- a/plugin-server/src/utils/db/utils.ts +++ b/plugin-server/src/utils/db/utils.ts @@ -1,6 +1,6 @@ import { Properties } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' -import { ProducerRecord } from 'kafkajs' +import { TopicMessage } from 'kafka/producer' import { Counter } from 'prom-client' import { defaultConfig } from '../../config/config' @@ -179,7 +179,7 @@ export function hasDifferenceWithProposedNewNormalisationMode(properties: Proper return !areMapsEqual(setOnce, filteredSetOnce) } -export function generateKafkaPersonUpdateMessage(person: InternalPerson, isDeleted = false): ProducerRecord { +export function generateKafkaPersonUpdateMessage(person: InternalPerson, isDeleted = false): TopicMessage { return { topic: KAFKA_PERSON, messages: [ diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index 3dd0c6fb009d8..4273118db9a41 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -164,7 +164,7 @@ export class AppMetrics { this.queueSize = 0 this.queuedData = {} - const kafkaMessages: Message[] = Object.values(queue).map((value) => ({ + const messages: Message[] = Object.values(queue).map((value) => ({ value: JSON.stringify({ timestamp: castTimestampOrNow(DateTime.fromMillis(value.lastTimestamp), TimestampFormat.ClickHouse), team_id: value.metric.teamId, @@ -183,10 +183,8 @@ export class AppMetrics { })) await this.kafkaProducer.queueMessages({ - kafkaMessages: { - topic: KAFKA_APP_METRICS, - messages: kafkaMessages, - }, + topic: KAFKA_APP_METRICS, + messages: messages, }) status.debug('🚽', `Finished flushing app metrics, took ${Date.now() - startTime}ms`) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts index 728f5a14ffbb7..3ba826d288529 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts @@ -36,13 +36,11 @@ export async function extractHeatmapDataStep( acks.push( runner.hub.kafkaProducer.queueMessages({ - kafkaMessages: { - topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC, - messages: heatmapEvents.map((rawEvent) => ({ - key: eventUuid, - value: JSON.stringify(rawEvent), - })), - }, + topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC, + messages: heatmapEvents.map((rawEvent) => ({ + key: eventUuid, + value: JSON.stringify(rawEvent), + })), }) ) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts index 17bb002636e56..d8f81ee12d3fb 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts @@ -8,15 +8,13 @@ export function produceExceptionSymbolificationEventStep( ): Promise { return runner.hub.kafkaProducer .queueMessages({ - kafkaMessages: { - topic: runner.hub.EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC, - messages: [ - { - key: String(event.team_id), - value: Buffer.from(JSON.stringify(event)), - }, - ], - }, + topic: runner.hub.EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC, + messages: [ + { + key: String(event.team_id), + value: Buffer.from(JSON.stringify(event)), + }, + ], }) .catch((error) => { status.warn('⚠️', 'Failed to produce exception event for symbolification', { diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index e0176e1e3a07a..1eca41f5edbe3 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -352,7 +352,7 @@ export class EventPipelineRunner { teamId, `plugin_server_ingest_event:${currentStepName}` ) - await this.hub.db.kafkaProducer!.queueMessages({ kafkaMessages: message }) + await this.hub.db.kafkaProducer!.queueMessages(message) } catch (dlqError) { status.info('🔔', `Errored trying to add event to dead letter queue. Error: ${dlqError}`) Sentry.captureException(dlqError, { diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 93a45be845571..aee156fe3b808 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -1,11 +1,11 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' -import { ProducerRecord } from 'kafkajs' import LRU from 'lru-cache' import { DateTime } from 'luxon' import { Counter } from 'prom-client' import { ONE_HOUR } from '../../config/constants' +import { TopicMessage } from '../../kafka/producer' import { InternalPerson, Person, PropertyUpdateOperation } from '../../types' import { DB } from '../../utils/db/db' import { PostgresUse, TransactionClient } from '../../utils/db/postgres' @@ -301,7 +301,7 @@ export class PersonState { if (Object.keys(update).length > 0) { const [updatedPerson, kafkaMessages] = await this.db.updatePersonDeprecated(person, update) - const kafkaAck = this.db.kafkaProducer.queueMessages({ kafkaMessages }) + const kafkaAck = this.db.kafkaProducer.queueMessages(kafkaMessages) return [updatedPerson, kafkaAck] } @@ -682,14 +682,14 @@ export class PersonState { const properties: Properties = { ...otherPerson.properties, ...mergeInto.properties } this.applyEventPropertyUpdates(properties) - const [mergedPerson, kafkaMessages] = await this.handleMergeTransaction( + const [mergedPerson, kafkaAcks] = await this.handleMergeTransaction( mergeInto, otherPerson, olderCreatedAt, // Keep the oldest created_at (i.e. the first time we've seen either person) properties ) - return [mergedPerson, kafkaMessages] + return [mergedPerson, kafkaAcks] } private isMergeAllowed(mergeFrom: InternalPerson): boolean { @@ -712,7 +712,7 @@ export class PersonState { }) .inc() - const [mergedPerson, kafkaMessages]: [InternalPerson, ProducerRecord[]] = await this.db.postgres.transaction( + const [mergedPerson, kafkaMessages]: [InternalPerson, TopicMessage[]] = await this.db.postgres.transaction( PostgresUse.COMMON_WRITE, 'mergePeople', async (tx) => { @@ -768,7 +768,7 @@ export class PersonState { }) .inc() - const kafkaAck = this.db.kafkaProducer.queueMessages({ kafkaMessages }) + const kafkaAck = this.db.kafkaProducer.queueMessages(kafkaMessages) return [mergedPerson, kafkaAck] } diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts index cc1af0538a2fe..7b9aa44ea0a70 100644 --- a/plugin-server/src/worker/ingestion/utils.ts +++ b/plugin-server/src/worker/ingestion/utils.ts @@ -1,8 +1,7 @@ import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold' -import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' -import { KafkaProducerWrapper } from '../../kafka/producer' +import { KafkaProducerWrapper, TopicMessage } from '../../kafka/producer' import { PipelineEvent, TeamId, TimestampFormat } from '../../types' import { safeClickhouseString } from '../../utils/db/utils' import { status } from '../../utils/status' @@ -21,7 +20,7 @@ export function generateEventDeadLetterQueueMessage( error: unknown, teamId: number, errorLocation = 'plugin_server_ingest_event' -): ProducerRecord { +): TopicMessage { let errorMessage = 'Event ingestion failed. ' if (error instanceof Error) { errorMessage += `Error: ${error.message}` @@ -49,7 +48,7 @@ export function generateEventDeadLetterQueueMessage( team_id: event.team_id || teamId, } - const message = { + return { topic: KAFKA_EVENTS_DEAD_LETTER_QUEUE, messages: [ { @@ -57,7 +56,6 @@ export function generateEventDeadLetterQueueMessage( }, ], } - return message } // These get displayed under Data Management > Ingestion Warnings @@ -82,20 +80,18 @@ export async function captureIngestionWarning( if (!!debounce?.alwaysSend || IngestionWarningLimiter.consume(limiter_key, 1)) { void kafkaProducer .queueMessages({ - kafkaMessages: { - topic: KAFKA_INGESTION_WARNINGS, - messages: [ - { - value: JSON.stringify({ - team_id: teamId, - type: type, - source: 'plugin-server', - details: JSON.stringify(details), - timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), - }), - }, - ], - }, + topic: KAFKA_INGESTION_WARNINGS, + messages: [ + { + value: JSON.stringify({ + team_id: teamId, + type: type, + source: 'plugin-server', + details: JSON.stringify(details), + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + }), + }, + ], }) .catch((error) => { status.warn('⚠️', 'Failed to produce ingestion warning', { diff --git a/plugin-server/src/worker/vm/extensions/posthog.ts b/plugin-server/src/worker/vm/extensions/posthog.ts index bcc1b670a9897..7cf98dd81a939 100644 --- a/plugin-server/src/worker/vm/extensions/posthog.ts +++ b/plugin-server/src/worker/vm/extensions/posthog.ts @@ -29,24 +29,22 @@ async function queueEvent(hub: Hub, pluginConfig: PluginConfig, data: InternalDa const partitionKey = partitionKeyHash.digest('hex') await hub.kafkaProducer.queueMessages({ - kafkaMessages: { - topic: hub.KAFKA_CONSUMPTION_TOPIC!, - messages: [ - { - key: partitionKey, - value: JSON.stringify({ - distinct_id: data.distinct_id, - ip: '', - site_url: '', - data: JSON.stringify(data), - team_id: pluginConfig.team_id, - now: data.timestamp, - sent_at: data.timestamp, - uuid: data.uuid, - } as RawEventMessage), - }, - ], - }, + topic: hub.KAFKA_CONSUMPTION_TOPIC!, + messages: [ + { + key: partitionKey, + value: JSON.stringify({ + distinct_id: data.distinct_id, + ip: '', + site_url: '', + data: JSON.stringify(data), + team_id: pluginConfig.team_id, + now: data.timestamp, + sent_at: data.timestamp, + uuid: data.uuid, + } as RawEventMessage), + }, + ], }) } diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 5d9d0ebfd25ed..e37df66ac71ac 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -351,7 +351,7 @@ describe('DB', () => { describe('updatePerson', () => { it('Clickhouse and Postgres are in sync if multiple updates concurrently', async () => { - jest.spyOn(db.kafkaProducer!, 'queueMessage') + jest.spyOn(db.kafkaProducer!, 'queueMessages') const team = await getFirstTeam(hub) const uuid = new UUIDT().toString() const distinctId = 'distinct_id1' @@ -364,9 +364,7 @@ describe('DB', () => { const updateTs = DateTime.fromISO('2000-04-04T11:42:06.502Z').toUTC() const update = { created_at: updateTs } const [updatedPerson, kafkaMessages] = await db.updatePersonDeprecated(personProvided, update) - await hub.db.kafkaProducer.queueMessages({ - kafkaMessages, - }) + await hub.db.kafkaProducer.queueMessages(kafkaMessages) // verify we have the correct update in Postgres db const personDbAfter = await fetchPersonByPersonId(personDbBefore.team_id, personDbBefore.id) @@ -379,9 +377,9 @@ describe('DB', () => { expect(updatedPerson.properties).toEqual({ c: 'aaa' }) // verify correct Kafka message was sent - expect(db.kafkaProducer!.queueMessage).toHaveBeenLastCalledWith({ - kafkaMessage: generateKafkaPersonUpdateMessage(updatedPerson), - }) + expect(db.kafkaProducer!.queueMessages).toHaveBeenLastCalledWith( + generateKafkaPersonUpdateMessage(updatedPerson) + ) }) }) @@ -426,14 +424,12 @@ describe('DB', () => { const [_p, updatePersonKafkaMessages] = await db.updatePersonDeprecated(person, { properties: { foo: 'bar' }, }) - await hub.db.kafkaProducer.queueMessages({ - kafkaMessages: updatePersonKafkaMessages, - }) + await hub.db.kafkaProducer.queueMessages(updatePersonKafkaMessages) await db.kafkaProducer.flush() await delayUntilEventIngested(fetchPersonsRows, 2) const kafkaMessages = await db.deletePerson(person) - await db.kafkaProducer.queueMessages({ kafkaMessages }) + await db.kafkaProducer.queueMessages(kafkaMessages) await db.kafkaProducer.flush() const persons = await delayUntilEventIngested(fetchPersonsRows, 3) diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 09c023774f3e2..e33da08a760d7 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -199,9 +199,7 @@ test('merge people', async () => { created_at: DateTime.fromISO('2019-07-01T00:00:00Z'), }) - await hub.db.kafkaProducer.queueMessages({ - kafkaMessages: [...kafkaMessages0, ...kafkaMessages1], - }) + await hub.db.kafkaProducer.queueMessages([...kafkaMessages0, ...kafkaMessages1]) await processEvent( 'person_1', diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index b9852965f2637..d055493668a7d 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -46,7 +46,7 @@ describe('postgres parity', () => { await resetTestDatabaseClickhouse(extraServerConfig) console.log('[TEST] Starting plugins server') const startResponse = await startPluginsServer(extraServerConfig, makePiscina, { ingestion: true }) - hub = startResponse.hub + hub = startResponse.hub! stopServer = startResponse.stop teamId++ console.log('[TEST] Setting up seed data') @@ -181,9 +181,7 @@ describe('postgres parity', () => { is_identified: true, }) - await hub.db.kafkaProducer.queueMessages({ - kafkaMessages, - }) + await hub.db.kafkaProducer.queueMessages(kafkaMessages) await delayUntilEventIngested(async () => (await hub.db.fetchPersons(Database.ClickHouse)).filter((p) => p.is_identified) @@ -211,9 +209,7 @@ describe('postgres parity', () => { is_identified: false, }) - await hub.db.kafkaProducer.queueMessages({ - kafkaMessages: kafkaMessages2, - }) + await hub.db.kafkaProducer.queueMessages(kafkaMessages2) expect(updatedPerson.version).toEqual(2) @@ -353,7 +349,7 @@ describe('postgres parity', () => { // move distinct ids from person to to anotherPerson const kafkaMessages = await hub.db.moveDistinctIds(person, anotherPerson) - await hub.db!.kafkaProducer!.queueMessages({ kafkaMessages }) + await hub.db!.kafkaProducer!.queueMessages(kafkaMessages) await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(anotherPerson, Database.ClickHouse), 2) // it got added @@ -409,9 +405,7 @@ describe('postgres parity', () => { // delete person await hub.db.postgres.transaction(PostgresUse.COMMON_WRITE, '', async (client) => { const deletePersonMessage = await hub.db.deletePerson(person, client) - await hub.db!.kafkaProducer!.queueMessages({ - kafkaMessages: deletePersonMessage[0], - }) + await hub.db!.kafkaProducer!.queueMessages(deletePersonMessage[0]) }) await delayUntilEventIngested(async () =>