Skip to content

Commit

Permalink
Kafka workflow updates (#92)
Browse files Browse the repository at this point in the history
* Kafka workflow updates

* Fixes to workflow
  • Loading branch information
pmanko authored Nov 17, 2023
1 parent e9ef551 commit c446696
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 108 deletions.
2 changes: 1 addition & 1 deletion config/mediator_docker.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"urn": "urn:mediator:shared-health-record",
"version": "v0.12.0-rc.3",
"version": "v0.12.0-rc.5",
"name": "Shared Health Record",
"description": "Shared Health Record",
"defaultChannelConfig": [
Expand Down
3 changes: 1 addition & 2 deletions debug.docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ services:
container_name: shr
restart: unless-stopped
hostname: shr
restart: unless-stopped
image: itechuw/shared-health-record:debug
image: itechuw/shared-health-record:debug-1
build:
context: ./
args:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "shared-health-registry",
"version": "v0.12.0-rc.3",
"version": "v0.12.0-rc.5",
"description": "Open Implementation of an OpenHIE Shared Health Record",
"main": "app.js",
"scripts": {
Expand Down
9 changes: 8 additions & 1 deletion src/__data__/sample_ADT.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,11 @@ PV2|||||||||||1||||||||||||||RF|||||||||||N|
ROL|1|AD|AT|K^Naicker^Kalvin^^^^^^^^^^XX|
ROL|2|AD|PP|GENPRI00^Doctor^Private^^^^^^^^^^XX|
GT1|1||Test^ADT^Message||PLOT 123^^Gaborone^B^0101|00267891608025|||||SF|OMANG123|
IN1|1|Citizens||Citizens|Botswana Government^Ministry of Health^Gaborone^B^0101|
IN1|1|Citizens||Citizens|Botswana Government^Ministry of Health^Gaborone^B^0101|

MSH|^~\&|ADM||||202311150750||ADT^A04|292304|D|2.4|||AL|NE|
EVN||202311150749|||INFCE^INTERFACE|202311150000|
PID|1||GG00042904^^^^MR^GGC~677827885^^^^SS^GGC~GG42786^^^^PI^GGC~TEST0106399^^^^HUB^GGC||TestPatient201^Testing201^^^^^L||20051017|F||CT|Plot 40095^^Gaborone^B^0101||123456789|||M||ZG0000044670|
PV1|1|O|XEXTE15||||ZZHIEPROV^Provider^HIE^^^^^^^^^^XX|||||||||||POV||U|||||||||||||||||||GGC||REG|||202311150000|
PV2|||||||||||1|||||||||||||||||||||||||N|
ROL|1|AD|AT|ZZHIEPROV^Provider^HIE^^^^^^^^^^XX|
120 changes: 74 additions & 46 deletions src/lib/kafkaConsumerUtil.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,111 @@
import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs';
import logger from './winston';
import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs'
import logger from './winston'
import { WorkflowResult } from '../workflows/botswana/workflowHandler'

export type EachMessageCallback = (topic: string, partition: number, message: Message) => Promise<void>;
export type EachMessageCallback = (
topic: string,
partition: number,
message: Message,
) => Promise<WorkflowResult>

export class KafkaConsumerUtil {
private consumer: Consumer | null = null;
private consumer: Consumer | null = null

constructor(private config: KafkaConfig, private topics: string[], private groupId: string) {}

public async init(): Promise<void> {
try {
this.consumer = await this.createConsumer();
this.consumer = await this.createConsumer()
} catch (err) {
console.error('Failed to initialize consumer:', err);
throw err;
console.error('Failed to initialize consumer:', err)
throw err
}
}

private async createConsumer(): Promise<Consumer> {
const kafka = new Kafka(this.config);
const consumer = kafka.consumer({ groupId: this.groupId });
await consumer.connect();
const kafka = new Kafka(this.config)
const consumer = kafka.consumer({
groupId: this.groupId,
sessionTimeout: 120000, // 2 minutes
heartbeatInterval: 30000, // 30 seconds
})

await consumer.connect()

for (const topic of this.topics) {
await consumer.subscribe({ topic, fromBeginning: false });
await consumer.subscribe({ topic, fromBeginning: false })
}
return consumer;
return consumer
}

public async consumeTransactionally(eachMessageCallback: EachMessageCallback): Promise<void> {
if (!this.consumer) {
throw new Error('Consumer is not initialized.');
throw new Error('Consumer is not initialized.')
}

await this.consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }: EachBatchPayload) => {
const { topic, partition } = batch;

eachBatch: async ({
batch,
resolveOffset,
heartbeat,
isRunning,
isStale,
}: EachBatchPayload) => {
const { topic, partition } = batch

for (const message of batch.messages) {
if (!isRunning() || isStale()) return;

logger.info({
topic,
partition,
offset: message.offset,
value: message.value?.toString(),
});

const maxRetries = 6;
let retryCount = 0;
let retryDelay = 1000;

if (!isRunning() || isStale()) return

logger.info(
`Consumer | Recieved message from topic ${topic} on partition ${partition} with offset ${message.offset}`,
)

const maxRetries = 2
let retryCount = 0
let retryDelay = 2000
let res: WorkflowResult | null = null

while (retryCount < maxRetries) {
logger.info(`Processing message for ${topic} with retry count ${retryCount}...`)
try {
await eachMessageCallback(topic, partition, message);
resolveOffset(message.offset);
await heartbeat();
break; // Break the loop if processing succeeds
} catch (error) {
logger.error(`Error processing message ${message.offset}: ${error}`);
retryCount++;
if (retryCount >= maxRetries) {
logger.error(`Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`);
// TODO: handle with DLQ
break;
res = await eachMessageCallback(topic, partition, message)

if (res.success) {
logger.info(`Workflow result succeeded!`)
resolveOffset(message.offset)
await heartbeat()
break // Break the loop if processing succeeds }
} else {
logger.error(`Workflow result did not succeed: ${res.result}`)
}
await new Promise(resolve => setTimeout(resolve, retryDelay));
retryDelay *= 2; // Double the delay for the next retry
await heartbeat(); // Important to call heartbeat to keep the session alive
} catch (error) {
logger.error(`Error processing message ${message.offset}: ${error}`)
}

// Otherwise, retry, both on error, and if the message is not processed successfully
retryCount++
if (retryCount >= maxRetries) {
logger.error(
`Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`,
)
resolveOffset(message.offset)

// TODO: handle with DLQ
break
}
await new Promise(resolve => setTimeout(resolve, retryDelay))
retryDelay *= 20 // Double the delay for the next retry
await heartbeat() // Important to call heartbeat to keep the session alive
}
}
},
});
})
}

public async shutdown(): Promise<void> {
if (this.consumer) {
await this.consumer.disconnect();
await this.consumer.disconnect()
}
}
}
}
1 change: 1 addition & 0 deletions src/lib/kafkaProducerUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class KafkaProducerUtil {
await transaction.send(record)
}
await transaction.commit()
logger.info('Message sent transactionally.')
this.onDeliveryReport({ status: 'committed' })
} catch (err) {
await transaction.abort()
Expand Down
41 changes: 21 additions & 20 deletions src/server/kafkaWorkers.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import { KafkaConfig, Message, logLevel } from 'kafkajs'
import logger from '../lib/winston'
import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler'
import { WorkflowHandler, WorkflowResult, topicList } from '../workflows/botswana/workflowHandler'
import { config } from '../lib/config'
import { KafkaConsumerUtil } from '../lib/kafkaConsumerUtil'

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps: NodeJS.Signals[] = ['SIGTERM', 'SIGINT', 'SIGUSR2']
const brokers = config.get('taskRunner:brokers') || ['kafka:9092']

let consumer: KafkaConsumerUtil | null = null;
let consumer: KafkaConsumerUtil | null = null

const consumerConfig: KafkaConfig = {
clientId: 'shr-consumer',
brokers: brokers,
logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR
};


logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR,
}

/**
* Example Botswana Workflow: (synchronous for now)
Expand Down Expand Up @@ -58,21 +56,24 @@ export async function run() {
}

async function shutdownConsumer() {
if (consumer)
await consumer.shutdown()
if (consumer) await consumer.shutdown()
}

const initAndConsume = async (topics: string[]) => {
const consumer = new KafkaConsumerUtil(consumerConfig, topics, "shr-consumer-group");
await consumer.init();
consumer.consumeTransactionally(processMessage); // No await here
return consumer;
};
const consumer = new KafkaConsumerUtil(consumerConfig, topics, 'shr-consumer-group')
await consumer.init()
consumer.consumeTransactionally(processMessage) // No await here
return consumer
}

async function processMessage(
topic: string,
partition: number,
message: Message,
): Promise<WorkflowResult> {
// There is no additional error handling in this message, since any exceptions or problems will need to be
// logged and handled by the Kafka consumer retry logic in the KafkaConsumerUtil class.

async function processMessage(topic: string, partition: number, message: Message): Promise<void> {
// There is no additional error handling in this message, since any exceptions or problems will need to be
// logged and handled by the Kafka consumer retry logic in the KafkaConsumerUtil class.

logger.info(`Recieved message from topic ${topic} on partition ${partition}`)

let val = ''
Expand All @@ -81,7 +82,7 @@ async function processMessage(topic: string, partition: number, message: Message
if (message.value) {
val = message.value.toString()
}

// This method needs to bubble up any exceptions to the Kafka consumer retry logic in the KafkaConsumerUtil class.
WorkflowHandler.executeTopicWorkflow(topic, val)
}
return await WorkflowHandler.executeTopicWorkflow(topic, val)
}
20 changes: 13 additions & 7 deletions src/workflows/botswana/IpmsWorkflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ export async function sendOrmToIpms(bundles: any): Promise<R4.IBundle> {
'based-on': entry.resource.id,
}

const fetchedBundle = <R4.IBundle>// TODO: Retry logic
await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json()
const fetchedBundle = <
R4.IBundle // TODO: Retry logic
>await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json()

if (fetchedBundle && fetchedBundle.entry && srBundle.entry) {
// Add child ServiceRequests if any exist
Expand Down Expand Up @@ -187,13 +188,15 @@ export async function handleAdtFromIpms(adtMessage: string): Promise<any> {
i => i.system && i.system == config.get('bwConfig:omangSystemUrl'),
)

if (omangEntry) {
omang = omangEntry.value!
if (omangEntry && omangEntry.value) {
omang = omangEntry.value
} else {
logger.error(
'Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.',
)
return registrationBundle
throw new IpmsWorkflowError(
`Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.`,
)
}

// Find all patients with this Omang.
Expand All @@ -216,7 +219,7 @@ export async function handleAdtFromIpms(adtMessage: string): Promise<any> {
if (
e.resource &&
e.resource.resourceType == 'Task' &&
e.resource.status == TaskStatusKind._requested
(e.resource.status == TaskStatusKind._accepted || config.get('bwConfig:devTaskStatus'))
) {
// Grab bundle for task:
options.searchParams = {
Expand All @@ -230,12 +233,15 @@ export async function handleAdtFromIpms(adtMessage: string): Promise<any> {
return { patient: patient, taskBundle: taskBundle }
}
}
return { patient: undefined, taskBundle: undefined }
} else {
logger.error('Could not find patient tasks!')
return { patient: undefined, taskBundle: undefined }
}
}
} catch (e) {
logger.error('Could not process ADT!\n' + e)
throw new IpmsWorkflowError('Could not process ADT!\n' + e)
return { patient: undefined, taskBundle: undefined }
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/workflows/botswana/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ export function getBundleEntries(
export async function postWithRetry(
crUrl: string,
options: OptionsOfTextResponseBody,
retryLimit = 5,
timeout = 1000,
retryLimit = 2,
timeout = 3000,
) {
for (let attempt = 1; attempt <= retryLimit; attempt++) {
try {
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/botswana/hl7Workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default class Hl7WorkflowsBw {

static async handleAdtMessage(hl7Msg: string): Promise<void> {
try {
WorkflowHandler.sendPayload({ message: hl7Msg }, topicList.HANDLE_ADT_FROM_IPMS)
WorkflowHandler.sendPayloadWithRetryDMQ({ message: hl7Msg }, topicList.HANDLE_ADT_FROM_IPMS)
} catch (error: any) {
// TODO: Major Error - send to DMQ or handle otherwise
logger.error(`Could not translate and save ADT message!\n${JSON.stringify(error)}`)
Expand Down
Loading

0 comments on commit c446696

Please sign in to comment.