diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index abbd770d7bb77d..c6ff46bf5bf6d5 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -106,6 +106,7 @@ export const capture = async ({ }) ), key: teamId ? teamId.toString() : '', + waitForAck: true, }) } diff --git a/plugin-server/functional_tests/jobs-consumer.test.ts b/plugin-server/functional_tests/jobs-consumer.test.ts index 30e2abd9af282d..353bd3518397ea 100644 --- a/plugin-server/functional_tests/jobs-consumer.test.ts +++ b/plugin-server/functional_tests/jobs-consumer.test.ts @@ -43,7 +43,7 @@ describe('dlq handling', () => { test.concurrent(`handles empty messages`, async () => { const key = uuidv4() - await produce({ topic: 'jobs', message: null, key }) + await produce({ topic: 'jobs', message: null, key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -54,7 +54,7 @@ describe('dlq handling', () => { test.concurrent(`handles invalid JSON`, async () => { const key = uuidv4() - await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key }) + await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -72,7 +72,7 @@ describe('dlq handling', () => { labels: { topic: 'jobs', partition: '0', groupId: 'jobs-inserter' }, }) - await produce({ topic: 'jobs', message: Buffer.from(''), key: '' }) + await produce({ topic: 'jobs', message: Buffer.from(''), key: '', waitForAck: true }) await waitForExpect(async () => { const metricAfter = await getMetric({ diff --git a/plugin-server/functional_tests/kafka.ts b/plugin-server/functional_tests/kafka.ts index c2ab7ac87a6ab0..f431488b290ac4 100644 --- a/plugin-server/functional_tests/kafka.ts +++ b/plugin-server/functional_tests/kafka.ts @@ -36,7 +36,17 @@ export async function createKafkaProducer() { return producer } -export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) { +export async function produce({ + topic, + message, + key, + waitForAck, +}: { + topic: string + message: Buffer | null + key: string + waitForAck: boolean +}) { producer = producer ?? (await createKafkaProducer()) - await defaultProduce({ producer, topic, value: message, key: Buffer.from(key) }) + await defaultProduce({ producer, topic, value: message, key: Buffer.from(key), waitForAck }) } diff --git a/plugin-server/functional_tests/scheduled-tasks-runner.test.ts b/plugin-server/functional_tests/scheduled-tasks-runner.test.ts index 3e3345245a6446..48764ae7f90a70 100644 --- a/plugin-server/functional_tests/scheduled-tasks-runner.test.ts +++ b/plugin-server/functional_tests/scheduled-tasks-runner.test.ts @@ -43,7 +43,7 @@ describe('dlq handling', () => { test.concurrent(`handles empty messages`, async () => { const key = uuidv4() - await produce({ topic: 'scheduled_tasks', message: null, key }) + await produce({ topic: 'scheduled_tasks', message: null, key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -54,7 +54,7 @@ describe('dlq handling', () => { test.concurrent(`handles invalid JSON`, async () => { const key = uuidv4() - await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key }) + await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key, waitForAck: true }) await waitForExpect(() => { const messages = dlq.filter((message) => message.key?.toString() === key) @@ -69,6 +69,7 @@ describe('dlq handling', () => { topic: 'scheduled_tasks', message: Buffer.from(JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 })), key, + waitForAck: true, }) await waitForExpect(() => { @@ -84,6 +85,7 @@ describe('dlq handling', () => { topic: 'scheduled_tasks', message: Buffer.from(JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' })), key, + waitForAck: true, }) await waitForExpect(() => { @@ -104,7 +106,7 @@ describe('dlq handling', () => { // NOTE: we don't actually care too much about the contents of the // message, just that it triggeres the consumer to try to process it. - await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '' }) + await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '', waitForAck: true }) await waitForExpect(async () => { const metricAfter = await getMetric({ diff --git a/plugin-server/functional_tests/session-recordings.test.ts b/plugin-server/functional_tests/session-recordings.test.ts index 62075bc6bd10f7..783fbdbeb43cd6 100644 --- a/plugin-server/functional_tests/session-recordings.test.ts +++ b/plugin-server/functional_tests/session-recordings.test.ts @@ -173,7 +173,12 @@ test.skip('consumer updates timestamp exported to prometheus', async () => { }, }) - await produce({ topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, message: Buffer.from(''), key: '' }) + await produce({ + topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + message: Buffer.from(''), + key: '', + waitForAck: true, + }) await waitForExpect(async () => { const metricAfter = await getMetric({ @@ -245,6 +250,7 @@ test.skip(`handles message with no token or with token and no associated team_id topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, message: Buffer.from(JSON.stringify({ uuid: noTokenUuid, data: JSON.stringify({}) })), key: noTokenKey, + waitForAck: true, }) await produce({ topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, @@ -252,6 +258,7 @@ test.skip(`handles message with no token or with token and no associated team_id JSON.stringify({ uuid: noAssociatedTeamUuid, token: 'no associated team', data: JSON.stringify({}) }) ), key: noAssociatedTeamKey, + waitForAck: true, }) await capture(makeSessionMessage(teamId, 'should be ingested')) diff --git a/plugin-server/src/kafka/producer.ts b/plugin-server/src/kafka/producer.ts index 7029a26c79fbdf..a5ed4e072cb55b 100644 --- a/plugin-server/src/kafka/producer.ts +++ b/plugin-server/src/kafka/producer.ts @@ -71,14 +71,14 @@ export const produce = async ({ value, key, headers = [], - waitForAck = true, + waitForAck, }: { producer: RdKafkaProducer topic: string value: MessageValue key: MessageKey headers?: MessageHeader[] - waitForAck?: boolean + waitForAck: boolean }): Promise => { status.debug('📤', 'Producing message', { topic: topic }) const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' }) diff --git a/plugin-server/src/main/graphile-worker/schedule.ts b/plugin-server/src/main/graphile-worker/schedule.ts index d50c672cea4282..16435d02c0466f 100644 --- a/plugin-server/src/main/graphile-worker/schedule.ts +++ b/plugin-server/src/main/graphile-worker/schedule.ts @@ -56,8 +56,11 @@ export async function runScheduledTasks( for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId }) await server.kafkaProducer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS, - messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], + }, + waitForAck: true, }) graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc() } diff --git a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts index 94549340da4fe4..605a812068c516 100644 --- a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts @@ -54,8 +54,11 @@ export const startJobsConsumer = async ({ }) // TODO: handle resolving offsets asynchronously await producer.queueMessage({ - topic: KAFKA_JOBS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) resolveOffset(message.offset) continue @@ -71,8 +74,11 @@ export const startJobsConsumer = async ({ }) // TODO: handle resolving offsets asynchronously await producer.queueMessage({ - topic: KAFKA_JOBS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) resolveOffset(message.offset) continue diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts index 3de544ce2d0a44..83ea62fdfdd6fe 100644 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -163,8 +163,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = value: message.value, }) await producer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) continue } @@ -181,8 +184,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = error: error.stack ?? error, }) await producer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) continue } @@ -190,8 +196,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) = if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) { status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task) await producer.queueMessage({ - topic: KAFKA_SCHEDULED_TASKS_DLQ, - messages: [{ value: message.value, key: message.key }], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }, + waitForAck: true, }) continue } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts index 5729da5cb373e7..1c581451e44eca 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/console-logs-ingester.ts @@ -163,6 +163,7 @@ export class ConsoleLogsIngester { topic: KAFKA_LOG_ENTRIES, value: Buffer.from(JSON.stringify(cle)), key: event.session_id, + waitForAck: true, }) ) } catch (error) { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts index 632f695a158f58..029f28f20bb9aa 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts @@ -171,6 +171,7 @@ export class ReplayEventsIngester { topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, value: Buffer.from(JSON.stringify(replayRecord)), key: event.session_id, + waitForAck: true, }), ] } catch (error) { diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 2baa10671a91e5..c7b6ce86a895a3 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -707,7 +707,7 @@ export class DB { }) } - await this.kafkaProducer.queueMessages(kafkaMessages) + await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) return person } @@ -759,7 +759,7 @@ export class DB { if (tx) { kafkaMessages.push(message) } else { - await this.kafkaProducer.queueMessage(message) + await this.kafkaProducer.queueMessage({ kafkaMessage: message, waitForAck: true }) } status.debug( @@ -829,7 +829,7 @@ export class DB { public async addDistinctId(person: Person, distinctId: string): Promise { const kafkaMessages = await this.addDistinctIdPooled(person, distinctId) if (kafkaMessages.length) { - await this.kafkaProducer.queueMessages(kafkaMessages) + await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) } } @@ -1072,15 +1072,15 @@ export class DB { pluginLogEntryCounter.labels({ plugin_id: String(pluginConfig.plugin_id), source }).inc() try { - await this.kafkaProducer.queueSingleJsonMessage( - KAFKA_PLUGIN_LOG_ENTRIES, - parsedEntry.id, - parsedEntry, + await this.kafkaProducer.queueSingleJsonMessage({ + topic: KAFKA_PLUGIN_LOG_ENTRIES, + key: parsedEntry.id, + object: parsedEntry, // For logs, we relax our durability requirements a little and // do not wait for acks that Kafka has persisted the message to // disk. - false - ) + waitForAck: false, + }) } catch (e) { captureException(e, { tags: { team_id: entry.pluginConfig.team_id } }) console.error('Failed to produce message', e, parsedEntry) @@ -1409,19 +1409,22 @@ export class DB { version: number ): Promise { await this.kafkaProducer.queueMessage({ - topic: KAFKA_GROUPS, - messages: [ - { - value: JSON.stringify({ - group_type_index: groupTypeIndex, - group_key: groupKey, - team_id: teamId, - group_properties: JSON.stringify(properties), - created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision), - version, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_GROUPS, + messages: [ + { + value: JSON.stringify({ + group_type_index: groupTypeIndex, + group_key: groupKey, + team_id: teamId, + group_properties: JSON.stringify(properties), + created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision), + version, + }), + }, + ], + }, + waitForAck: true, }) } diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 0a50533a1dbdb5..098a44e7d4aa63 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -159,13 +159,16 @@ export async function createHub( // chained, and if we do not manage to produce then the chain will be // broken. await kafkaProducer.queueMessage({ - topic: KAFKA_JOBS, - messages: [ - { - value: Buffer.from(JSON.stringify(job)), - key: Buffer.from(job.pluginConfigTeam.toString()), - }, - ], + kafkaMessage: { + topic: KAFKA_JOBS, + messages: [ + { + value: Buffer.from(JSON.stringify(job)), + key: Buffer.from(job.pluginConfigTeam.toString()), + }, + ], + }, + waitForAck: true, }) } diff --git a/plugin-server/src/utils/db/kafka-producer-wrapper.ts b/plugin-server/src/utils/db/kafka-producer-wrapper.ts index 8f7cef4c06b30d..0ea1e01c5099f0 100644 --- a/plugin-server/src/utils/db/kafka-producer-wrapper.ts +++ b/plugin-server/src/utils/db/kafka-producer-wrapper.ts @@ -35,7 +35,7 @@ export class KafkaProducerWrapper { key: MessageKey topic: string headers?: MessageHeader[] - waitForAck?: boolean + waitForAck: boolean }): Promise { try { kafkaProducerMessagesQueuedCounter.labels({ topic_name: topic }).inc() @@ -66,7 +66,7 @@ export class KafkaProducerWrapper { } } - async queueMessage(kafkaMessage: ProducerRecord, waitForAck?: boolean) { + async queueMessage({ kafkaMessage, waitForAck }: { kafkaMessage: ProducerRecord; waitForAck: boolean }) { return await Promise.all( kafkaMessage.messages.map((message) => this.produce({ @@ -80,23 +80,34 @@ export class KafkaProducerWrapper { ) } - async queueMessages(kafkaMessages: ProducerRecord[], waitForAck?: boolean): Promise { - await Promise.all(kafkaMessages.map((message) => this.queueMessage(message, waitForAck))) + async queueMessages({ + kafkaMessages, + waitForAck, + }: { + kafkaMessages: ProducerRecord[] + waitForAck: boolean + }): Promise { + await Promise.all(kafkaMessages.map((kafkaMessage) => this.queueMessage({ kafkaMessage, waitForAck }))) } - async queueSingleJsonMessage( - topic: string, - key: Message['key'], - object: Record, - waitForAck?: boolean - ): Promise { - await this.queueMessage( - { + async queueSingleJsonMessage({ + topic, + key, + object, + waitForAck, + }: { + topic: string + key: Message['key'] + object: Record + waitForAck: boolean + }): Promise { + await this.queueMessage({ + kafkaMessage: { topic, messages: [{ key, value: JSON.stringify(object) }], }, - waitForAck - ) + waitForAck, + }) } public async flush() { diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index d8f52a74011503..5e0a83c92ae310 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -183,8 +183,11 @@ export class AppMetrics { })) await this.kafkaProducer.queueMessage({ - topic: KAFKA_APP_METRICS, - messages: kafkaMessages, + kafkaMessage: { + topic: KAFKA_APP_METRICS, + messages: kafkaMessages, + }, + waitForAck: true, }) status.debug('🚽', `Finished flushing app metrics, took ${Date.now() - startTime}ms`) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 9ecc377dc9f76b..6ae2248513073b 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -213,7 +213,7 @@ export class EventPipelineRunner { teamId, `plugin_server_ingest_event:${currentStepName}` ) - await this.hub.db.kafkaProducer!.queueMessage(message) + await this.hub.db.kafkaProducer!.queueMessage({ kafkaMessage: message, waitForAck: true }) } catch (dlqError) { status.info('🔔', `Errored trying to add event to dead letter queue. Error: ${dlqError}`) Sentry.captureException(dlqError, { diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index b2356f36526624..525bbbf84c910f 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -453,7 +453,7 @@ export class PersonState { olderCreatedAt, // Keep the oldest created_at (i.e. the first time we've seen either person) properties ) - await this.db.kafkaProducer.queueMessages(kafkaMessages) + await this.db.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) return mergedPerson } @@ -767,7 +767,7 @@ export class DeferredPersonOverrideWorker { // Postgres for some reason -- the same row state should be // generated each call, and the receiving ReplacingMergeTree will // ensure we keep only the latest version after all writes settle.) - await this.kafkaProducer.queueMessages(messages, true) + await this.kafkaProducer.queueMessages({ kafkaMessages: messages, waitForAck: true }) return rows.length } diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts index c52ef4ebba78e7..9488ee759581b4 100644 --- a/plugin-server/src/worker/ingestion/utils.ts +++ b/plugin-server/src/worker/ingestion/utils.ts @@ -80,18 +80,21 @@ export async function captureIngestionWarning( const limiter_key = `${teamId}:${type}:${debounce?.key || ''}` if (!!debounce?.alwaysSend || IngestionWarningLimiter.consume(limiter_key, 1)) { await kafkaProducer.queueMessage({ - topic: KAFKA_INGESTION_WARNINGS, - messages: [ - { - value: JSON.stringify({ - team_id: teamId, - type: type, - source: 'plugin-server', - details: JSON.stringify(details), - timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), - }), - }, - ], + kafkaMessage: { + topic: KAFKA_INGESTION_WARNINGS, + messages: [ + { + value: JSON.stringify({ + team_id: teamId, + type: type, + source: 'plugin-server', + details: JSON.stringify(details), + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + }), + }, + ], + }, + waitForAck: true, }) } else { return Promise.resolve() diff --git a/plugin-server/src/worker/vm/extensions/posthog.ts b/plugin-server/src/worker/vm/extensions/posthog.ts index c7a0a7124c50d1..34e9cb2befd1c2 100644 --- a/plugin-server/src/worker/vm/extensions/posthog.ts +++ b/plugin-server/src/worker/vm/extensions/posthog.ts @@ -29,22 +29,25 @@ async function queueEvent(hub: Hub, pluginConfig: PluginConfig, data: InternalDa const partitionKey = partitionKeyHash.digest('hex') await hub.kafkaProducer.queueMessage({ - topic: hub.KAFKA_CONSUMPTION_TOPIC!, - messages: [ - { - key: partitionKey, - value: JSON.stringify({ - distinct_id: data.distinct_id, - ip: '', - site_url: '', - data: JSON.stringify(data), - team_id: pluginConfig.team_id, - now: data.timestamp, - sent_at: data.timestamp, - uuid: data.uuid, - } as RawEventMessage), - }, - ], + kafkaMessage: { + topic: hub.KAFKA_CONSUMPTION_TOPIC!, + messages: [ + { + key: partitionKey, + value: JSON.stringify({ + distinct_id: data.distinct_id, + ip: '', + site_url: '', + data: JSON.stringify(data), + team_id: pluginConfig.team_id, + now: data.timestamp, + sent_at: data.timestamp, + uuid: data.uuid, + } as RawEventMessage), + }, + ], + }, + waitForAck: true, }) } diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 2adc7567c8a5de..14448f196f9be9 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -367,9 +367,10 @@ describe('DB', () => { expect(updatedPerson.properties).toEqual({ c: 'aaa' }) // verify correct Kafka message was sent - expect(db.kafkaProducer!.queueMessage).toHaveBeenLastCalledWith( - generateKafkaPersonUpdateMessage(updatedPerson) - ) + expect(db.kafkaProducer!.queueMessage).toHaveBeenLastCalledWith({ + kafkaMessage: generateKafkaPersonUpdateMessage(updatedPerson), + waitForAck: true, + }) }) }) @@ -416,7 +417,7 @@ describe('DB', () => { await delayUntilEventIngested(fetchPersonsRows, 2) const kafkaMessages = await db.deletePerson(person) - await db.kafkaProducer.queueMessages(kafkaMessages) + await db.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) await db.kafkaProducer.flush() const persons = await delayUntilEventIngested(fetchPersonsRows, 3) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts index 42dfb9e55b5c1c..d5d0a38191a841 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/console-log-ingester.test.ts @@ -80,6 +80,7 @@ describe('console log ingester', () => { timestamp: '1970-01-01 00:00:00.000', }) ), + waitForAck: true, }, ], ]) @@ -124,6 +125,7 @@ describe('console log ingester', () => { timestamp: '1970-01-01 00:00:00.000', }) ), + waitForAck: true, }, ], [ @@ -181,6 +183,7 @@ describe('console log ingester', () => { timestamp: '1970-01-01 00:00:00.000', }) ), + waitForAck: true, }, ], ]) diff --git a/plugin-server/tests/main/jobs/schedule.test.ts b/plugin-server/tests/main/jobs/schedule.test.ts index 150d171f97d3b4..36735d810aa587 100644 --- a/plugin-server/tests/main/jobs/schedule.test.ts +++ b/plugin-server/tests/main/jobs/schedule.test.ts @@ -37,56 +37,68 @@ describe('Graphile Worker schedule', () => { } as any) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(1, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '1', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 1, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '1', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 1, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(2, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '2', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 2, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '2', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 2, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(3, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '3', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 3, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '3', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 3, + }), + }, + ], + }, + waitForAck: true, }) await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour', { job: { run_at: new Date() }, } as any) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(4, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '4', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 4, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '4', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 4, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(5, { topic: KAFKA_SCHEDULED_TASKS, @@ -101,56 +113,68 @@ describe('Graphile Worker schedule', () => { ], }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(6, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '6', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 6, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '6', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 6, + }), + }, + ], + }, + waitForAck: true, }) await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay', { job: { run_at: new Date() }, } as any) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(7, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '7', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 7, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '7', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 7, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(8, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '8', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 8, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '8', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 8, + }), + }, + ], + }, + waitForAck: true, }) expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(9, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '9', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 9, - }), - }, - ], + kafkaMessage: { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '9', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 9, + }), + }, + ], + }, + waitForAck: true, }) }) }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 4bfc79f5e23794..364483f7c09a6f 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -219,7 +219,9 @@ describe('EventPipelineRunner', () => { await runner.runEventPipeline(pipelineEvent) expect(hub.db.kafkaProducer.queueMessage).toHaveBeenCalledTimes(1) - expect(JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].messages[0].value)).toMatchObject({ + expect( + JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].kafkaMessage.messages[0].value) + ).toMatchObject({ team_id: 2, distinct_id: 'my_id', error: 'Event ingestion failed. Error: testError', diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index 5cdf1246c53f5b..5c764e5809b40a 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -339,7 +339,7 @@ describe('postgres parity', () => { // move distinct ids from person to to anotherPerson const kafkaMessages = await hub.db.moveDistinctIds(person, anotherPerson) - await hub.db!.kafkaProducer!.queueMessages(kafkaMessages) + await hub.db!.kafkaProducer!.queueMessages({ kafkaMessages, waitForAck: true }) await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(anotherPerson, Database.ClickHouse), 2) // it got added @@ -395,7 +395,7 @@ describe('postgres parity', () => { // delete person await hub.db.postgres.transaction(PostgresUse.COMMON_WRITE, '', async (client) => { const deletePersonMessage = await hub.db.deletePerson(person, client) - await hub.db!.kafkaProducer!.queueMessage(deletePersonMessage[0]) + await hub.db!.kafkaProducer!.queueMessage({ kafkaMessage: deletePersonMessage[0], waitForAck: true }) }) await delayUntilEventIngested(async () =>