Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nour-borgi committed May 23, 2023
1 parent 38ec400 commit fe1df1e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
8 changes: 3 additions & 5 deletions src/kafkaProducerManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ export class KafkaProducerManager {
timeout
)
if (!kafkaInstance.isConnected) await kafkaInstance.connect()
if (!kafkaInstance.isConnected)
throw new Error('Kafka Producer failed to connect.')

return kafkaInstance.producer
}
Expand All @@ -20,7 +18,7 @@ export class KafkaProducerManager {
let kafkaInstance = this.getKafkaInstance(channelName, clientId, timeout)
if (!kafkaInstance) {
kafkaInstance = new KafkaProducer(clientId, timeout)
this.kafkaSet[`${channelName}${clientId}${timeout}`] = kafkaInstance
this.kafkaSet[`urn:${channelName}:${clientId}:${timeout}`] = kafkaInstance
}

return kafkaInstance
Expand All @@ -31,11 +29,11 @@ export class KafkaProducerManager {

if (kafkaInstance) {
if (kafkaInstance.isConnected) await kafkaInstance.disconnect()
delete this.kafkaSet[`${channelName}${clientId}${timeout}`]
delete this.kafkaSet[`urn:${channelName}:${clientId}:${timeout}`]
}
}

static getKafkaInstance(channelName, clientId, timeout) {
return this.kafkaSet[`${channelName}${clientId}${timeout}`]
return this.kafkaSet[`urn:${channelName}:${clientId}:${timeout}`]
}
}
11 changes: 6 additions & 5 deletions src/model/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,14 @@ ChannelSchema.pre('save', async function (next) {
}
])

let originalKafkaItem
// We need to cross reference the original, not-yet modified, routes
// against the incoming dirty routes to see if any were removed and if so remove them from the manager
if (Array.isArray(originalKafkaDetails))
originalKafkaDetails = originalKafkaDetails[0]
if (originalKafkaDetails && originalKafkaDetails.routes.length > 0) {
for (let route of originalKafkaDetails.routes) {
const isTimeoutUpdated = originalKafkaDetails.timeout !== this.timeout
originalKafkaItem = originalKafkaDetails[0]
if (originalKafkaItem && originalKafkaItem.routes.length > 0) {
for (let route of originalKafkaItem.routes) {
const isTimeoutUpdated = originalKafkaItem.timeout !== this.timeout
const matchingRoute = kafkaRoutes.find(
e => e.kafkaClientId === route.kafkaClientId && e.name === route.name
)
Expand All @@ -263,7 +264,7 @@ ChannelSchema.pre('save', async function (next) {
// if timeout is null on the original document, it was set to the default
// so pull that out from the config before trying to remove connections
const originalTimeout =
originalKafkaDetails.timeout ?? +config.router.timeout
originalKafkaItem.timeout ?? +config.router.timeout
await KafkaProducerManager.removeConnection(
this.name,
route.kafkaClientId,
Expand Down

0 comments on commit fe1df1e

Please sign in to comment.