From 2c62156da87f5feb6be2c2fcc05fee5e509415af Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 25 Dec 2024 16:23:20 +0000 Subject: [PATCH] fixes --- plugin-server/src/main/ingestion-queues/jobs-consumer.ts | 2 +- .../tests/main/ingestion-queues/each-batch.test.ts | 2 +- .../main/ingestion-queues/session-recording/utils.test.ts | 4 ++-- plugin-server/tests/main/jobs/schedule.test.ts | 2 +- plugin-server/tests/worker/ingestion/app-metrics.test.ts | 2 +- .../tests/worker/ingestion/event-pipeline/runner.test.ts | 8 ++++---- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts index ac39df772008e..9539130c5a629 100644 --- a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts @@ -2,8 +2,8 @@ import { EachBatchHandler, Kafka } from 'kafkajs' import { Counter } from 'prom-client' import { KAFKA_JOBS, KAFKA_JOBS_DLQ } from '../../config/kafka-topics' +import { KafkaProducerWrapper } from '../../kafka/producer' import { EnqueuedPluginJob, JobName, PluginsServerConfig } from '../../types' -import { KafkaProducerWrapper } from '../../utils/kafka/producer' import { status } from '../../utils/status' import { GraphileWorker } from '../graphile-worker/graphile-worker' import { instrumentEachBatchKafkaJS, setupEventHandlers } from './kafka-queue' diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index adac7d05fd456..fa269e88f6cc1 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -138,7 +138,7 @@ describe('eachBatchX', () => { TASKS_PER_WORKER: 10, INGESTION_CONCURRENCY: 4, kafkaProducer: { - queueMessage: jest.fn(), + queueMessages: jest.fn(), }, pluginConfigsPerTeam: new Map(), }, diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index 8917df7dd4779..163c43735c11d 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -69,7 +69,7 @@ describe('session-recording utils', () => { let fakeProducer: KafkaProducerWrapper beforeEach(() => { Settings.now = () => new Date('2023-08-30T19:15:54.887316+00:00').getTime() - fakeProducer = { queueMessage: jest.fn() } as unknown as KafkaProducerWrapper + fakeProducer = { queueMessages: jest.fn() } as unknown as KafkaProducerWrapper }) it('can parse a message correctly', async () => { @@ -347,7 +347,7 @@ describe('session-recording utils', () => { let fakeProducer: KafkaProducerWrapper beforeEach(() => { Settings.now = () => new Date('2023-08-30T19:15:54.887316+00:00').getTime() - fakeProducer = { queueMessage: jest.fn() } as unknown as KafkaProducerWrapper + fakeProducer = { queueMessages: jest.fn() } as unknown as KafkaProducerWrapper }) it('can parse and reduce a batch of messages', async () => { diff --git a/plugin-server/tests/main/jobs/schedule.test.ts b/plugin-server/tests/main/jobs/schedule.test.ts index 02dc82a0dca08..7c9cef0338d08 100644 --- a/plugin-server/tests/main/jobs/schedule.test.ts +++ b/plugin-server/tests/main/jobs/schedule.test.ts @@ -27,7 +27,7 @@ describe('Graphile Worker schedule', () => { runEveryDay: [7, 8, 9], }, kafkaProducer: { - queueMessage: jest.fn(), + queueMessages: jest.fn(), } as unknown as KafkaProducerWrapper, USE_KAFKA_FOR_SCHEDULED_TASKS: true, } diff --git a/plugin-server/tests/worker/ingestion/app-metrics.test.ts b/plugin-server/tests/worker/ingestion/app-metrics.test.ts index fc39515118e23..c0d818d795391 100644 --- a/plugin-server/tests/worker/ingestion/app-metrics.test.ts +++ b/plugin-server/tests/worker/ingestion/app-metrics.test.ts @@ -26,7 +26,7 @@ describe('AppMetrics()', () => { kafkaProducer = { producer: jest.fn(), produce: jest.fn(), - queueMessage: jest.fn(), + queueMessages: jest.fn(), flush: jest.fn(), disconnect: jest.fn(), } as unknown as KafkaProducerWrapper diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 6e54ec8f3c5aa..11a1f75e7089e 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -110,12 +110,12 @@ describe('EventPipelineRunner', () => { beforeEach(() => { hub = { - kafkaProducer: { queueMessage: jest.fn() }, + kafkaProducer: { queueMessages: jest.fn() }, teamManager: { fetchTeam: jest.fn(() => {}), }, db: { - kafkaProducer: { queueMessage: jest.fn() }, + kafkaProducer: { queueMessages: jest.fn() }, fetchPerson: jest.fn(), }, eventsToDropByToken: createEventsToDropByToken('drop_token:drop_id,drop_token_all:*'), @@ -288,7 +288,7 @@ describe('EventPipelineRunner', () => { const hub: any = { db: { - kafkaProducer: { queueMessage: jest.fn() }, + kafkaProducer: { queueMessages: jest.fn() }, }, } const runner = new TestEventPipelineRunner(hub, event, new EventsProcessor(hub)) @@ -416,7 +416,7 @@ describe('EventPipelineRunner $process_person_profile=false', () => { const hub: any = { db: { - kafkaProducer: { queueMessage: jest.fn() }, + kafkaProducer: { queueMessages: jest.fn() }, }, } const runner = new TestEventPipelineRunner(hub, event, new EventsProcessor(hub))