Skip to content

Commit

Permalink
Fix plugin server tests
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 20, 2024
1 parent e6e4730 commit 770c3af
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 8 deletions.
12 changes: 12 additions & 0 deletions plugin-server/src/kafka/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ export const ensureTopicExists = async (adminClient: IAdminClient, topic: string
)
}

export const deleteTopic = async (adminClient: IAdminClient, topic: string, timeout: number) => {
return await new Promise<boolean>((resolve) =>
adminClient.deleteTopic(topic, timeout, (error: LibrdKafkaError) => {
if (error) {
resolve(false)
} else {
resolve(true)
}
})
)
}

export const createAdminClient = (connectionConfig: GlobalConfig) => {
return AdminClient.create(connectionConfig)
}
7 changes: 5 additions & 2 deletions plugin-server/tests/cdp/cdp-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ jest.mock('../../src/utils/fetch', () => {

const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch

const TOTAL_TIMEOUT = 20000
const KAFKA_OBSERVE_TIMEOUT = 10000

describe('CDP E2E', () => {
jest.setTimeout(10000)
jest.setTimeout(TOTAL_TIMEOUT)
describe.each(['kafka', 'cyclotron'])('e2e fetch call: %s', (mode) => {
let processedEventsConsumer: CdpProcessedEventsConsumer
let functionProcessor: CdpFunctionCallbackConsumer
Expand Down Expand Up @@ -124,7 +127,7 @@ describe('CDP E2E', () => {

await waitForExpect(() => {
expect(kafkaObserver.messages).toHaveLength(7)
}, 5000)
}, KAFKA_OBSERVE_TIMEOUT)

expect(mockFetch).toHaveBeenCalledTimes(1)

Expand Down
23 changes: 17 additions & 6 deletions plugin-server/tests/cdp/helpers/kafka-observer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { KafkaConsumer, Message } from 'node-rdkafka'

import { createAdminClient, ensureTopicExists } from '../../../src/kafka/admin'
import { createAdminClient, deleteTopic, ensureTopicExists } from '../../../src/kafka/admin'
import { createRdConnectionConfigFromEnvVars } from '../../../src/kafka/config'
import { createKafkaConsumer } from '../../../src/kafka/consumer'
import { Hub } from '../../../src/types'
Expand All @@ -17,12 +17,20 @@ export type TestKafkaObserver = {
}

export const createKafkaObserver = async (hub: Hub, topics: string[]): Promise<TestKafkaObserver> => {
const consumer = await createKafkaConsumer({
...createRdConnectionConfigFromEnvVars(hub),
'group.id': `test-group-${new UUIDT().toString()}`,
})
const groupId = `test-group-${new UUIDT().toString()}`
const consumer = await createKafkaConsumer(
{
...createRdConnectionConfigFromEnvVars(hub),
'group.id': groupId,
'queued.min.messages': 1,
},
{
'auto.offset.reset': 'earliest',
}
)

const adminClient = createAdminClient(createRdConnectionConfigFromEnvVars(hub))
await Promise.all(topics.map((topic) => deleteTopic(adminClient, topic, 1000)))
await Promise.all(topics.map((topic) => ensureTopicExists(adminClient, topic, 1000)))
adminClient.disconnect()

Expand All @@ -40,7 +48,10 @@ export const createKafkaObserver = async (hub: Hub, topics: string[]): Promise<T
}
const newMessages = await new Promise<Message[]>((res, rej) =>
consumer.consume(10, (err, messages) => (err ? rej(err) : res(messages)))
)
).catch((err) => {
console.log('Error consuming messages', { err })
return []
})

messages.push(
...newMessages.map((message) => ({
Expand Down

0 comments on commit 770c3af

Please sign in to comment.