Skip to content

Commit

Permalink
Fix up
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent 96f469b commit f97321c
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers'
import { HogWatcherState } from '../../src/cdp/hog-watcher'
import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types'
import { KafkaProducerWrapper, MessageKey, TopicMessage } from '../../src/kafka/producer'
import { Hub, Team } from '../../src/types'
import { closeHub, createHub } from '../../src/utils/db/hub'
import { getFirstTeam, resetTestDatabase } from '../helpers/sql'
Expand Down Expand Up @@ -52,32 +53,53 @@ jest.mock('../../src/kafka/producer', () => {
},
disconnect: jest.fn(),
produce: jest.fn(),
queueMessages: jest.fn(),
queueMessages: jest.fn(() => Promise.resolve()),
}

const MockKafkaProducer = {
create: jest.fn(() => Promise.resolve(mockKafkaProducer)),
}
return {
KafkaProducerWrapper: MockKafkaProducer,
_producer: mockKafkaProducer,
}
})

const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch

const mockProducer = await require('../../src/kafka/producer').KafkaProducerWrapper.create()
const mockProducer = require('../../src/kafka/producer')._producer as KafkaProducerWrapper

jest.setTimeout(1000)

const decodeKafkaMessage = (message: any): any => {
return {
...message,
value: JSON.parse(message.value.toString()),
type DecodedKafkaMessage = {
topic: string
key?: MessageKey
value: Record<string, unknown>
}

const decodeKafkaMessage = (topicMessages: TopicMessage | TopicMessage[]): DecodedKafkaMessage[] => {
topicMessages = Array.isArray(topicMessages) ? topicMessages : [topicMessages]

const result: DecodedKafkaMessage[] = []

for (const topicMessage of topicMessages) {
for (const message of topicMessage.messages) {
result.push({
topic: topicMessage.topic,
key: message.key,
value: message.value ? JSON.parse(message.value.toString()) : null,
})
}
}

return result
}

const decodeAllKafkaMessages = (): any[] => {
return mockProducer.produce.mock.calls.map((x) => decodeKafkaMessage(x[0]))
const decodeAllKafkaMessages = (): DecodedKafkaMessage[] => {
return jest
.mocked(mockProducer.queueMessages)
.mock.calls.map((x) => decodeKafkaMessage(x[0]))
.reduce((acc, x) => acc.concat(x), [])
}

describe('CDP Processed Events Consumer', () => {
Expand Down

0 comments on commit f97321c

Please sign in to comment.