Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent 2c62156 commit 9e5d1dc
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
8 changes: 8 additions & 0 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import { createRdConnectionConfigFromEnvVars } from './config'

export type MessageKey = Exclude<RdKafkaMessageKey, undefined>

export type TopicMessages = {
topic: string
messages: {
value: string | Buffer
key: MessageKey
}[]
}

export class KafkaProducerWrapper {
/** Kafka producer used for syncing Postgres and ClickHouse person data. */
public producer: HighLevelProducer
Expand Down
8 changes: 4 additions & 4 deletions plugin-server/tests/worker/ingestion/app-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ describe('AppMetrics()', () => {

describe('flush()', () => {
it('flushes queued messages', async () => {
const spy = jest.spyOn(kafkaProducer, 'queueMessage')
const spy = jest.spyOn(kafkaProducer, 'queueMessages')

await appMetrics.queueMetric({ ...metric, jobId: '000-000', successes: 1 }, timestamp)
await appMetrics.flush()
Expand All @@ -268,7 +268,7 @@ describe('AppMetrics()', () => {
it('does nothing if nothing queued', async () => {
await appMetrics.flush()

expect(kafkaProducer.queueMessage).not.toHaveBeenCalled()
expect(kafkaProducer.queueMessages).not.toHaveBeenCalled()
})
})

Expand All @@ -281,7 +281,7 @@ describe('AppMetrics()', () => {
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5,
})
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
jest.spyOn(hub.kafkaProducer, 'queueMessages').mockReturnValue(Promise.resolve())
})
afterEach(async () => {
await closeHub(hub)
Expand All @@ -292,7 +292,7 @@ describe('AppMetrics()', () => {

beforeEach(async () => {
await resetTestDatabaseClickhouse()
jest.mocked(hub.kafkaProducer.queueMessage).mockRestore()
jest.mocked(hub.kafkaProducer.queueMessages).mockRestore()
})

it('can read its own writes', async () => {
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/tests/worker/vm.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ describe('vm tests', () => {
await resetTestDatabase(indexJs)
const vm = await createReadyPluginConfigVm(hub, pluginConfig39, indexJs)

const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessage')
const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessages')

const response = await vm.tasks.schedule.runEveryMinute.exec()

Expand Down Expand Up @@ -990,7 +990,7 @@ describe('vm tests', () => {
await resetTestDatabase(indexJs)
const vm = await createReadyPluginConfigVm(hub, pluginConfig39, indexJs)

const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessage')
const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessages')

const response = await vm.tasks.schedule.runEveryMinute.exec()

Expand All @@ -1016,7 +1016,7 @@ describe('vm tests', () => {
await resetTestDatabase(indexJs)
const vm = await createReadyPluginConfigVm(hub, pluginConfig39, indexJs)

const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessage')
const queueMessageSpy = jest.spyOn(hub.kafkaProducer, 'queueMessages')

const response = await vm.tasks.schedule.runEveryMinute.exec()

Expand Down

0 comments on commit 9e5d1dc

Please sign in to comment.