Skip to content

Commit

Permalink
Update kafka consumer and producer logic (#78)
Browse files Browse the repository at this point in the history
* Kafka update

* Kafka fixes

* Consumer fixes

* Fixed

* Cleanup
  • Loading branch information
pmanko authored Oct 14, 2023
1 parent 499a8ee commit b0b978d
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 83 deletions.
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import logger from './lib/winston'
import { run } from './server/kafkaWorkers'
import MllpAdapter from './server/mllpAdapter'
import { ShrMediator } from './server/shrMediator'
import { LabWorkflowsBw } from './workflows/labWorkflowsBw'

if (require.main === module) {
if (config.get('app:port')) {
Expand Down
29 changes: 0 additions & 29 deletions src/lib/kafka.ts

This file was deleted.

64 changes: 64 additions & 0 deletions src/lib/kafkaConsumerUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Kafka, Consumer, EachBatchPayload, Transaction, Message, KafkaConfig, EachMessagePayload } from 'kafkajs';
import logger from './winston';

export type EachMessageCallback = (topic: string, partition: number, message: Message) => Promise<void>;

export class KafkaConsumerUtil {
private consumer: Consumer | null = null;

constructor(private config: KafkaConfig, private topics: string[], private groupId: string) {}

public async init(): Promise<void> {
try {
this.consumer = await this.createConsumer();
} catch (err) {
console.error('Failed to initialize consumer:', err);
throw err;
}
}

private async createConsumer(): Promise<Consumer> {
const kafka = new Kafka(this.config);
const consumer = kafka.consumer({ groupId: this.groupId });
await consumer.connect();
for (const topic of this.topics) {
await consumer.subscribe({ topic, fromBeginning: false });
}
return consumer;
}

public async consumeTransactionally(eachMessageCallback: EachMessageCallback): Promise<void> {
if (!this.consumer) {
throw new Error('Consumer is not initialized.');
}

await this.consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }: EachBatchPayload) => {
const { topic, partition } = batch;

for (const message of batch.messages) {
if (!isRunning() || isStale()) return;

logger.info({
topic,
partition,
offset: message.offset,
value: message.value?.toString(),
});

await eachMessageCallback(topic, partition, message)

resolveOffset(message.offset);
await heartbeat();
}
},
});
}

public async shutdown(): Promise<void> {
if (this.consumer) {
await this.consumer.disconnect();
}
}
}
84 changes: 84 additions & 0 deletions src/lib/kafkaProducerUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Kafka, KafkaConfig, Producer, ProducerConfig, ProducerRecord, Transaction } from 'kafkajs';
import config from './config'
import logger from './winston';

type DeliveryReportCallback = (report: any) => void;

/**
* KafkaUtil class provides utility functions to interact with Kafka producer.
*/
export class KafkaProducerUtil {
private producer: Producer | null = null;

/**
* Creates an instance of KafkaUtil.
* @param {KafkaConfig} config - Configuration object for Kafka producer.
* @param {DeliveryReportCallback} onDeliveryReport - Callback function to handle delivery reports.
*/
constructor(private config: KafkaConfig, private onDeliveryReport: DeliveryReportCallback) {}

/**
* Initializes Kafka producer.
* @returns {Promise<void>} Promise that resolves when producer is initialized.
* @throws {Error} If producer initialization fails.
*/
public async init(): Promise<void> {
try {
this.producer = await this.createProducer();
} catch (err) {
console.error('Failed to initialize producer:', err);
throw err;
}
}

/**
* Creates Kafka producer.
* @returns {Promise<Producer>} Promise that resolves with Kafka producer instance.
*/
private async createProducer(): Promise<Producer> {
logger.info('Creating Kafka producer...');
const kafka = new Kafka(this.config);
const producer = kafka.producer({transactionalId: 'shr-producer-transaction', idempotent: true, maxInFlightRequests: 1});
await producer.connect();
return producer;
}

/**
* Sends message using transaction.
* @param {ProducerRecord[]} records - Array of producer records to send.
* @returns {Promise<void>} Promise that resolves when message is sent transactionally.
* @throws {Error} If producer is not initialized or transaction fails.
*/
public async sendMessageTransactionally(records: ProducerRecord[]): Promise<void> {
if (!this.producer) {
logger.error('Producer is not initialized.')
throw new Error('Producer is not initialized.');
}

const transaction: Transaction = await this.producer.transaction();
try {
logger.info('Sending the following records transactionally:');
logger.info(JSON.stringify(records, null, 2));
for (const record of records) {
await transaction.send(record);
}
await transaction.commit();
this.onDeliveryReport({ status: 'committed' });
} catch (err) {
await transaction.abort();
this.onDeliveryReport({ status: 'aborted' });
throw err;
}
}

/**
* Gracefully shuts down Kafka producer.
* @returns {Promise<void>} Promise that resolves when producer is disconnected.
*/
public async shutdown(): Promise<void> {
logger.info('Shutting down Kafka producer...');
if (this.producer) {
await this.producer.disconnect();
}
}
}
93 changes: 51 additions & 42 deletions src/server/kafkaWorkers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import { Consumer } from 'kafkajs'
import { consumer } from '../lib/kafka'
import { KafkaConfig, logLevel, Message } from 'kafkajs'
import logger from '../lib/winston'
import { LabWorkflowsBw, topicList } from '../workflows/labWorkflowsBw'
import { config } from '../lib/config'
import { KafkaConsumerUtil } from '../lib/kafkaConsumerUtil'

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps: NodeJS.Signals[] = ['SIGTERM', 'SIGINT', 'SIGUSR2']
const brokers = config.get('taskRunner:brokers') || ['kafka:9092']

let consumer: KafkaConsumerUtil | null = null;

const consumerConfig: KafkaConfig = {
clientId: 'shr-consumer',
brokers: brokers,
logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR
};



/**
* Example Botswana Workflow: (synchronous for now)
Expand All @@ -18,49 +31,14 @@ const signalTraps: NodeJS.Signals[] = ['SIGTERM', 'SIGINT', 'SIGUSR2']
*/

export async function run() {
const k: Consumer = consumer

await k.connect()

for (const val of Object.values(topicList)) {
await k.subscribe({ topic: val, fromBeginning: false })
}

await k.run({
eachMessage: async function ({ topic, partition, message }) {
logger.info(`Recieved message from topic ${topic}`)

try {
let val = ''
const res = null

if (message.value) {
val = message.value.toString()
}

// logger.info("\n\n#########\nReceived: ", {
// partition,
// offset: message.offset,
// value: val
// });

LabWorkflowsBw.executeTopicWorkflow(topic, val)
} catch (error) {
logger.error(`Could not complete task from topic ${topic}!`)

logger.error(error)
}

//logger.info(`\n\n##########\nResult: ${JSON.stringify(res)}\n###############`)
},
})
consumer = await initAndConsume(Object.values(topicList))

errorTypes.map(type => {
process.on(type, async e => {
try {
console.log(`process.on ${type}`)
console.error(e)
await k.disconnect()
logger.error(`process.on ${type}`)
logger.error(e)
await shutdownConsumer()
process.exit(0)
} catch (_) {
process.exit(1)
Expand All @@ -71,10 +49,41 @@ export async function run() {
signalTraps.map(type => {
process.once(type, async () => {
try {
await k.disconnect()
await shutdownConsumer()
} finally {
process.kill(process.pid, type)
}
})
})
}

async function shutdownConsumer() {
if (consumer)
await consumer.shutdown()
}

const initAndConsume = async (topics: string[]) => {
const consumer = new KafkaConsumerUtil(consumerConfig, topics, "shr-consumer-group");
await consumer.init();
consumer.consumeTransactionally(processMessage); // No await here
return consumer;
};

async function processMessage(topic: string, partition: number, message: Message): Promise<void> {
logger.info(`Recieved message from topic ${topic} on partition ${partition}`)

try {
let val = ''
const res = null

if (message.value) {
val = message.value.toString()
}

LabWorkflowsBw.executeTopicWorkflow(topic, val)
} catch (error) {
logger.error(`Could not complete task from topic ${topic}!`)

logger.error(error)
}
}
33 changes: 32 additions & 1 deletion src/server/shrMediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import shrApp from '../lib/shr'
import logger from '../lib/winston'

import medUtils from 'openhim-mediator-utils'
import { error } from 'console'
import { LabWorkflowsBw } from '../workflows/labWorkflowsBw'


const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps: NodeJS.Signals[] = ['SIGTERM', 'SIGINT', 'SIGUSR2']
const env = process.env.NODE_ENV || 'development'
const appConfig = JSON.parse(
fs.readFileSync(`${__dirname}/../../config/config_${env}.json`, 'utf-8'),
Expand All @@ -20,18 +25,44 @@ export class ShrMediator {
this.config = medConfig
}

public start(callback: any) {
public async start(callback: any) {
logger.info('Running SHR as a mediator with' + `${__dirname}/${this.config}`)
try {
await LabWorkflowsBw.initKafkaProducer()
medUtils.registerMediator(
config.get('mediator:api'),
this.config,
ShrMediator.registrationCallback(callback),
)
} catch (e: any) {
logger.error(`Could not start SHR as a Mediator!\n${JSON.stringify(e)}`)
await LabWorkflowsBw.shutdownKafkaProducer()
process.exit(1)
}

errorTypes.map(type => {
process.on(type, async e => {
try {
logger.error(`process.on ${type}`)
logger.error(e)
await LabWorkflowsBw.shutdownKafkaProducer()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})

signalTraps.map(type => {
process.once(type, async () => {
try {
await LabWorkflowsBw.shutdownKafkaProducer()
} finally {
process.kill(process.pid, type)
}
})
})

}

private static registrationCallback(callback: any) {
Expand Down
7 changes: 3 additions & 4 deletions src/workflows/hl7WorkflowsBw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { BundleTypeKind, IBundle } from '@ahryman40k/ts-fhir-types/lib/R4'
import got from 'got/dist/source'
import config from '../lib/config'
import logger from '../lib/winston'
import { sendPayload } from '../lib/kafka'
import { topicList } from './labWorkflowsBw'
import { LabWorkflowsBw, topicList } from './labWorkflowsBw'
import sleep from 'sleep-promise'

export default class Hl7WorkflowsBw {
Expand All @@ -31,7 +30,7 @@ export default class Hl7WorkflowsBw {
)

if (translatedBundle != this.errorBundle && translatedBundle.entry) {
sendPayload({ bundle: translatedBundle }, topicList.HANDLE_ORU_FROM_IPMS)
LabWorkflowsBw.sendPayload({ bundle: translatedBundle }, topicList.HANDLE_ORU_FROM_IPMS)
return translatedBundle
} else {
return this.errorBundle
Expand All @@ -53,7 +52,7 @@ export default class Hl7WorkflowsBw {
// Save to SHR??
// let resultBundle: R4.IBundle = await saveBundle(translatedBundle)

sendPayload({ bundle: translatedBundle }, topicList.SAVE_IPMS_PATIENT)
LabWorkflowsBw.sendPayload({ bundle: translatedBundle }, topicList.SAVE_IPMS_PATIENT)

return translatedBundle
} else {
Expand Down
Loading

0 comments on commit b0b978d

Please sign in to comment.