Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent aaadad3 commit 2c62156
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { EachBatchHandler, Kafka } from 'kafkajs'
import { Counter } from 'prom-client'

import { KAFKA_JOBS, KAFKA_JOBS_DLQ } from '../../config/kafka-topics'
import { KafkaProducerWrapper } from '../../kafka/producer'
import { EnqueuedPluginJob, JobName, PluginsServerConfig } from '../../types'
import { KafkaProducerWrapper } from '../../utils/kafka/producer'
import { status } from '../../utils/status'
import { GraphileWorker } from '../graphile-worker/graphile-worker'
import { instrumentEachBatchKafkaJS, setupEventHandlers } from './kafka-queue'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ describe('eachBatchX', () => {
TASKS_PER_WORKER: 10,
INGESTION_CONCURRENCY: 4,
kafkaProducer: {
queueMessage: jest.fn(),
queueMessages: jest.fn(),
},
pluginConfigsPerTeam: new Map(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe('session-recording utils', () => {
let fakeProducer: KafkaProducerWrapper
beforeEach(() => {
Settings.now = () => new Date('2023-08-30T19:15:54.887316+00:00').getTime()
fakeProducer = { queueMessage: jest.fn() } as unknown as KafkaProducerWrapper
fakeProducer = { queueMessages: jest.fn() } as unknown as KafkaProducerWrapper
})

it('can parse a message correctly', async () => {
Expand Down Expand Up @@ -347,7 +347,7 @@ describe('session-recording utils', () => {
let fakeProducer: KafkaProducerWrapper
beforeEach(() => {
Settings.now = () => new Date('2023-08-30T19:15:54.887316+00:00').getTime()
fakeProducer = { queueMessage: jest.fn() } as unknown as KafkaProducerWrapper
fakeProducer = { queueMessages: jest.fn() } as unknown as KafkaProducerWrapper
})

it('can parse and reduce a batch of messages', async () => {
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/tests/main/jobs/schedule.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('Graphile Worker schedule', () => {
runEveryDay: [7, 8, 9],
},
kafkaProducer: {
queueMessage: jest.fn(),
queueMessages: jest.fn(),
} as unknown as KafkaProducerWrapper,
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
}
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/tests/worker/ingestion/app-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('AppMetrics()', () => {
kafkaProducer = {
producer: jest.fn(),
produce: jest.fn(),
queueMessage: jest.fn(),
queueMessages: jest.fn(),
flush: jest.fn(),
disconnect: jest.fn(),
} as unknown as KafkaProducerWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ describe('EventPipelineRunner', () => {

beforeEach(() => {
hub = {
kafkaProducer: { queueMessage: jest.fn() },
kafkaProducer: { queueMessages: jest.fn() },
teamManager: {
fetchTeam: jest.fn(() => {}),
},
db: {
kafkaProducer: { queueMessage: jest.fn() },
kafkaProducer: { queueMessages: jest.fn() },
fetchPerson: jest.fn(),
},
eventsToDropByToken: createEventsToDropByToken('drop_token:drop_id,drop_token_all:*'),
Expand Down Expand Up @@ -288,7 +288,7 @@ describe('EventPipelineRunner', () => {

const hub: any = {
db: {
kafkaProducer: { queueMessage: jest.fn() },
kafkaProducer: { queueMessages: jest.fn() },
},
}
const runner = new TestEventPipelineRunner(hub, event, new EventsProcessor(hub))
Expand Down Expand Up @@ -416,7 +416,7 @@ describe('EventPipelineRunner $process_person_profile=false', () => {

const hub: any = {
db: {
kafkaProducer: { queueMessage: jest.fn() },
kafkaProducer: { queueMessages: jest.fn() },
},
}
const runner = new TestEventPipelineRunner(hub, event, new EventsProcessor(hub))
Expand Down

0 comments on commit 2c62156

Please sign in to comment.