Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent b68bd7b commit 3a13bbe
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 51 deletions.
23 changes: 0 additions & 23 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ export class KafkaProducerWrapper {
topic: record.topic,
key: message.key ? Buffer.from(message.key) : null,
value: message.value ? Buffer.from(message.value) : null,
headers: convertKafkaJSHeadersToRdKafkaHeaders(message.headers),
})
)
)
Expand Down Expand Up @@ -196,28 +195,6 @@ export class KafkaProducerWrapper {
}
}

export const convertKafkaJSHeadersToRdKafkaHeaders = (headers: Message['headers'] = undefined) =>
// We need to convert from KafkaJS headers to rdkafka
// headers format. The former has type
// { [key: string]: string | Buffer | (string |
// Buffer)[] | undefined }
// while the latter has type
// { [key: string]: Buffer }[]. The formers values that
// are arrays need to be converted into an array of
// objects with a single key-value pair, and the
// undefined values need to be filtered out.
headers
? Object.entries(headers)
.flatMap(([key, value]) =>
value === undefined
? []
: Array.isArray(value)
? value.map((v) => ({ key, value: Buffer.from(v) }))
: [{ key, value: Buffer.from(value) }]
)
.map(({ key, value }) => ({ [key]: value }))
: undefined

export const kafkaProducerMessagesQueuedCounter = new Counter({
name: 'kafka_producer_messages_queued_total',
help: 'Count of messages queued to the Kafka producer, by destination topic.',
Expand Down
27 changes: 0 additions & 27 deletions plugin-server/tests/kafka/kafka-producer-wrapper.test.ts

This file was deleted.

2 changes: 1 addition & 1 deletion plugin-server/tests/main/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ describe('DB', () => {
'group_key',
{ prop: 'val' },
TIMESTAMP,
{ prop: TIMESTAMP.toISO() },
{ prop: TIMESTAMP.toISO()! },
{ prop: PropertyUpdateOperation.Set },
1
)
Expand Down

0 comments on commit 3a13bbe

Please sign in to comment.