From 9e5d1dc002ae1e2fbb70757b819153892d6feef3 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 25 Dec 2024 16:26:20 +0000 Subject: [PATCH] Fixes --- plugin-server/src/kafka/producer.ts | 8 ++++++++ plugin-server/tests/worker/ingestion/app-metrics.test.ts | 8 ++++---- plugin-server/tests/worker/vm.test.ts | 6 +++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/plugin-server/src/kafka/producer.ts b/plugin-server/src/kafka/producer.ts index 8b56dbb31f565..c6f8f78cdb755 100644 --- a/plugin-server/src/kafka/producer.ts +++ b/plugin-server/src/kafka/producer.ts @@ -29,6 +29,14 @@ import { createRdConnectionConfigFromEnvVars } from './config' export type MessageKey = Exclude +export type TopicMessages = { + topic: string + messages: { + value: string | Buffer + key: MessageKey + }[] +} + export class KafkaProducerWrapper { /** Kafka producer used for syncing Postgres and ClickHouse person data. */ public producer: HighLevelProducer diff --git a/plugin-server/tests/worker/ingestion/app-metrics.test.ts b/plugin-server/tests/worker/ingestion/app-metrics.test.ts index c0d818d795391..3dc574e979390 100644 --- a/plugin-server/tests/worker/ingestion/app-metrics.test.ts +++ b/plugin-server/tests/worker/ingestion/app-metrics.test.ts @@ -257,7 +257,7 @@ describe('AppMetrics()', () => { describe('flush()', () => { it('flushes queued messages', async () => { - const spy = jest.spyOn(kafkaProducer, 'queueMessage') + const spy = jest.spyOn(kafkaProducer, 'queueMessages') await appMetrics.queueMetric({ ...metric, jobId: '000-000', successes: 1 }, timestamp) await appMetrics.flush() @@ -268,7 +268,7 @@ describe('AppMetrics()', () => { it('does nothing if nothing queued', async () => { await appMetrics.flush() - expect(kafkaProducer.queueMessage).not.toHaveBeenCalled() + expect(kafkaProducer.queueMessages).not.toHaveBeenCalled() }) }) @@ -281,7 +281,7 @@ describe('AppMetrics()', () => { APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5, }) // doesn't flush again on the next call, i.e. flust metrics were reset - jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve()) + jest.spyOn(hub.kafkaProducer, 'queueMessages').mockReturnValue(Promise.resolve()) }) afterEach(async () => { await closeHub(hub) @@ -292,7 +292,7 @@ describe('AppMetrics()', () => { beforeEach(async () => { await resetTestDatabaseClickhouse() - jest.mocked(hub.kafkaProducer.queueMessage).mockRestore() + jest.mocked(hub.kafkaProducer.queueMessages).mockRestore() }) it('can read its own writes', async () => { diff --git a/plugin-server/tests/worker/vm.test.ts b/plugin-server/tests/worker/vm.test.ts index a98af0de3e718..975e49a177a7a 100644 --- a/plugin-server/tests/worker/vm.test.ts +++ b/plugin-server/tests/worker/vm.test.ts @@ -961,7 +961,7 @@ describe('vm tests', () => { await resetTestDatabase(indexJs) const vm = await createReadyPluginConfigVm(hub, pluginConfig39, indexJs) - const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessage') + const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessages') const response = await vm.tasks.schedule.runEveryMinute.exec() @@ -990,7 +990,7 @@ describe('vm tests', () => { await resetTestDatabase(indexJs) const vm = await createReadyPluginConfigVm(hub, pluginConfig39, indexJs) - const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessage') + const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessages') const response = await vm.tasks.schedule.runEveryMinute.exec() @@ -1016,7 +1016,7 @@ describe('vm tests', () => { await resetTestDatabase(indexJs) const vm = await createReadyPluginConfigVm(hub, pluginConfig39, indexJs) - const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessage') + const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessages') const response = await vm.tasks.schedule.runEveryMinute.exec()