Skip to content

Commit

Permalink
Merge pull request #1188 from jembi/PLAT-707-kafka-integration
Browse files Browse the repository at this point in the history
PLAT-707 Add kafka integration
  • Loading branch information
nour-borgi authored May 23, 2023
2 parents b68dce2 + fe1df1e commit ee71267
Show file tree
Hide file tree
Showing 18 changed files with 861 additions and 334 deletions.
4 changes: 3 additions & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ The following config option are provided by the OpenHIM. All of these options ha
// forward them to other services
"httpsPort": 5000,
// The timeout for requests that the OpenHIM makes to other services (in milliseconds)
"timeout": 60000
"timeout": 60000,
// The comma seperated list of kafka broker connection strings
"kafkaBrokers": "localhost:9092,localhost:9093"
},
"api": {
// The session secret key used for the hashing of signed cookie (used to detect if the client modified the cookie)
Expand Down
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"externalHostname": "localhost",
"httpPort": 5001,
"httpsPort": 5000,
"timeout": 60000
"timeout": 60000,
"kafkaBrokers": "localhost:9092"
},
"bodyCull":{
"enabled":true,
Expand Down
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"glossy": "0.1.7",
"handlebars": "^4.7.7",
"jsonwebtoken": "^8.5.1",
"kafkajs": "^2.2.4",
"kcors": "2.2.2",
"koa": "^2.13.0",
"koa-bodyparser": "^4.3.0",
Expand Down
4 changes: 2 additions & 2 deletions src/api/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ export async function updateChannel(ctx, channelId) {
}
}

function processPostDeleteTriggers(channel) {
async function processPostDeleteTriggers(channel) {
if (channel.type) {
if (
(channel.type === 'tcp' || channel.type === 'tls') &&
Expand Down Expand Up @@ -536,7 +536,7 @@ export async function removeChannel(ctx, channelId) {
ctx.body = 'The channel was successfully deleted'
logger.info(`User ${ctx.authenticated.email} removed channel with id ${id}`)

return processPostDeleteTriggers(channel)
return await processPostDeleteTriggers(channel)
} catch (err) {
// Error! So inform the user
utils.logAndSetResponse(
Expand Down
79 changes: 79 additions & 0 deletions src/kafkaProducer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logger from 'winston'
import {Kafka, logLevel} from 'kafkajs'
import {config} from './config'

config.router = config.get('router')

// Customize Kafka logs
function kafkaLogger() {
const toWinstonLogLevel = level => {
switch (level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return 'error'
case logLevel.WARN:
return 'warn'
case logLevel.INFO:
return 'info'
case logLevel.DEBUG:
return 'debug'
}
}
return ({level, log}) => {
const {message, ...extra} = log
logger[toWinstonLogLevel(level)]({
message,
extra
})
}
}

export class KafkaProducer {
_producer = null
_isConnected = false

constructor(clientId, timeout) {
if (clientId) {
let brokers = config.router.kafkaBrokers
brokers = brokers.replace(/"/g, '').split(',')

const kafka = new Kafka({
brokers: brokers,
clientId: clientId,
requestTimeout: timeout,
connectionTimeout: timeout,
logLevel: logLevel.DEBUG,
logCreator: kafkaLogger
})

this._producer = kafka.producer()

this._producer.on(this._producer.events.DISCONNECT, () => {
this._isConnected = false
})
}
}

get isConnected() {
return this._isConnected
}

get producer() {
return this._producer
}

async connect() {
// Not catching the error to throw the original error message
await this._producer.connect()
this._isConnected = true
}

async disconnect() {
try {
await this._producer.disconnect()
this._isConnected = false
} catch (err) {
logger.error(err.message)
}
}
}
39 changes: 39 additions & 0 deletions src/kafkaProducerManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import {KafkaProducer} from './kafkaProducer.js'

export class KafkaProducerManager {
static kafkaSet = {}

static async getProducer(channelName, clientId, timeout) {
const kafkaInstance = this.findOrAddConnection(
channelName,
clientId,
timeout
)
if (!kafkaInstance.isConnected) await kafkaInstance.connect()

return kafkaInstance.producer
}

static findOrAddConnection(channelName, clientId, timeout) {
let kafkaInstance = this.getKafkaInstance(channelName, clientId, timeout)
if (!kafkaInstance) {
kafkaInstance = new KafkaProducer(clientId, timeout)
this.kafkaSet[`urn:${channelName}:${clientId}:${timeout}`] = kafkaInstance
}

return kafkaInstance
}

static async removeConnection(channelName, clientId, timeout) {
const kafkaInstance = this.getKafkaInstance(channelName, clientId, timeout)

if (kafkaInstance) {
if (kafkaInstance.isConnected) await kafkaInstance.disconnect()
delete this.kafkaSet[`urn:${channelName}:${clientId}:${timeout}`]
}
}

static getKafkaInstance(channelName, clientId, timeout) {
return this.kafkaSet[`urn:${channelName}:${clientId}:${timeout}`]
}
}
Loading

0 comments on commit ee71267

Please sign in to comment.