From c446696d0bca14e281e412a39c283576a2eaa859 Mon Sep 17 00:00:00 2001 From: Piotr Mankowski Date: Thu, 16 Nov 2023 16:06:08 -0800 Subject: [PATCH] Kafka workflow updates (#92) * Kafka workflow updates * Fixes to workflow --- config/mediator_docker.json | 2 +- debug.docker-compose.yml | 3 +- package.json | 2 +- src/__data__/sample_ADT.txt | 9 +- src/lib/kafkaConsumerUtil.ts | 120 +++++++++++------- src/lib/kafkaProducerUtil.ts | 1 + src/server/kafkaWorkers.ts | 41 +++--- src/workflows/botswana/IpmsWorkflows.ts | 20 ++- src/workflows/botswana/helpers.ts | 4 +- src/workflows/botswana/hl7Workflows.ts | 2 +- .../botswana/patientIdentityWorkflows.ts | 19 +-- src/workflows/botswana/workflowHandler.ts | 60 ++++++--- 12 files changed, 175 insertions(+), 108 deletions(-) diff --git a/config/mediator_docker.json b/config/mediator_docker.json index 9094b5d..0c9be8d 100644 --- a/config/mediator_docker.json +++ b/config/mediator_docker.json @@ -1,6 +1,6 @@ { "urn": "urn:mediator:shared-health-record", - "version": "v0.12.0-rc.3", + "version": "v0.12.0-rc.5", "name": "Shared Health Record", "description": "Shared Health Record", "defaultChannelConfig": [ diff --git a/debug.docker-compose.yml b/debug.docker-compose.yml index bcee27a..49b9405 100644 --- a/debug.docker-compose.yml +++ b/debug.docker-compose.yml @@ -18,8 +18,7 @@ services: container_name: shr restart: unless-stopped hostname: shr - restart: unless-stopped - image: itechuw/shared-health-record:debug + image: itechuw/shared-health-record:debug-1 build: context: ./ args: diff --git a/package.json b/package.json index a871e46..4938664 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "shared-health-registry", - "version": "v0.12.0-rc.3", + "version": "v0.12.0-rc.5", "description": "Open Implementation of an OpenHIE Shared Health Record", "main": "app.js", "scripts": { diff --git a/src/__data__/sample_ADT.txt b/src/__data__/sample_ADT.txt index 28b3c2e..d9c3928 100644 --- a/src/__data__/sample_ADT.txt +++ b/src/__data__/sample_ADT.txt @@ -7,4 +7,11 @@ PV2|||||||||||1||||||||||||||RF|||||||||||N| ROL|1|AD|AT|K^Naicker^Kalvin^^^^^^^^^^XX| ROL|2|AD|PP|GENPRI00^Doctor^Private^^^^^^^^^^XX| GT1|1||Test^ADT^Message||PLOT 123^^Gaborone^B^0101|00267891608025|||||SF|OMANG123| -IN1|1|Citizens||Citizens|Botswana Government^Ministry of Health^Gaborone^B^0101| \ No newline at end of file +IN1|1|Citizens||Citizens|Botswana Government^Ministry of Health^Gaborone^B^0101| + +MSH|^~\&|ADM||||202311150750||ADT^A04|292304|D|2.4|||AL|NE| +EVN||202311150749|||INFCE^INTERFACE|202311150000| +PID|1||GG00042904^^^^MR^GGC~677827885^^^^SS^GGC~GG42786^^^^PI^GGC~TEST0106399^^^^HUB^GGC||TestPatient201^Testing201^^^^^L||20051017|F||CT|Plot 40095^^Gaborone^B^0101||123456789|||M||ZG0000044670| +PV1|1|O|XEXTE15||||ZZHIEPROV^Provider^HIE^^^^^^^^^^XX|||||||||||POV||U|||||||||||||||||||GGC||REG|||202311150000| +PV2|||||||||||1|||||||||||||||||||||||||N| +ROL|1|AD|AT|ZZHIEPROV^Provider^HIE^^^^^^^^^^XX| \ No newline at end of file diff --git a/src/lib/kafkaConsumerUtil.ts b/src/lib/kafkaConsumerUtil.ts index 43ef37f..2c5e2bc 100644 --- a/src/lib/kafkaConsumerUtil.ts +++ b/src/lib/kafkaConsumerUtil.ts @@ -1,83 +1,111 @@ -import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs'; -import logger from './winston'; +import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs' +import logger from './winston' +import { WorkflowResult } from '../workflows/botswana/workflowHandler' -export type EachMessageCallback = (topic: string, partition: number, message: Message) => Promise; +export type EachMessageCallback = ( + topic: string, + partition: number, + message: Message, +) => Promise export class KafkaConsumerUtil { - private consumer: Consumer | null = null; + private consumer: Consumer | null = null constructor(private config: KafkaConfig, private topics: string[], private groupId: string) {} public async init(): Promise { try { - this.consumer = await this.createConsumer(); + this.consumer = await this.createConsumer() } catch (err) { - console.error('Failed to initialize consumer:', err); - throw err; + console.error('Failed to initialize consumer:', err) + throw err } } private async createConsumer(): Promise { - const kafka = new Kafka(this.config); - const consumer = kafka.consumer({ groupId: this.groupId }); - await consumer.connect(); + const kafka = new Kafka(this.config) + const consumer = kafka.consumer({ + groupId: this.groupId, + sessionTimeout: 120000, // 2 minutes + heartbeatInterval: 30000, // 30 seconds + }) + + await consumer.connect() + for (const topic of this.topics) { - await consumer.subscribe({ topic, fromBeginning: false }); + await consumer.subscribe({ topic, fromBeginning: false }) } - return consumer; + return consumer } public async consumeTransactionally(eachMessageCallback: EachMessageCallback): Promise { if (!this.consumer) { - throw new Error('Consumer is not initialized.'); + throw new Error('Consumer is not initialized.') } await this.consumer.run({ eachBatchAutoResolve: false, - eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }: EachBatchPayload) => { - const { topic, partition } = batch; - + eachBatch: async ({ + batch, + resolveOffset, + heartbeat, + isRunning, + isStale, + }: EachBatchPayload) => { + const { topic, partition } = batch + for (const message of batch.messages) { - if (!isRunning() || isStale()) return; - - logger.info({ - topic, - partition, - offset: message.offset, - value: message.value?.toString(), - }); - - const maxRetries = 6; - let retryCount = 0; - let retryDelay = 1000; - + if (!isRunning() || isStale()) return + + logger.info( + `Consumer | Recieved message from topic ${topic} on partition ${partition} with offset ${message.offset}`, + ) + + const maxRetries = 2 + let retryCount = 0 + let retryDelay = 2000 + let res: WorkflowResult | null = null + while (retryCount < maxRetries) { + logger.info(`Processing message for ${topic} with retry count ${retryCount}...`) try { - await eachMessageCallback(topic, partition, message); - resolveOffset(message.offset); - await heartbeat(); - break; // Break the loop if processing succeeds - } catch (error) { - logger.error(`Error processing message ${message.offset}: ${error}`); - retryCount++; - if (retryCount >= maxRetries) { - logger.error(`Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`); - // TODO: handle with DLQ - break; + res = await eachMessageCallback(topic, partition, message) + + if (res.success) { + logger.info(`Workflow result succeeded!`) + resolveOffset(message.offset) + await heartbeat() + break // Break the loop if processing succeeds } + } else { + logger.error(`Workflow result did not succeed: ${res.result}`) } - await new Promise(resolve => setTimeout(resolve, retryDelay)); - retryDelay *= 2; // Double the delay for the next retry - await heartbeat(); // Important to call heartbeat to keep the session alive + } catch (error) { + logger.error(`Error processing message ${message.offset}: ${error}`) + } + + // Otherwise, retry, both on error, and if the message is not processed successfully + retryCount++ + if (retryCount >= maxRetries) { + logger.error( + `Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`, + ) + resolveOffset(message.offset) + + // TODO: handle with DLQ + break } + await new Promise(resolve => setTimeout(resolve, retryDelay)) + retryDelay *= 20 // Double the delay for the next retry + await heartbeat() // Important to call heartbeat to keep the session alive } } }, - }); + }) } public async shutdown(): Promise { if (this.consumer) { - await this.consumer.disconnect(); + await this.consumer.disconnect() } } -} \ No newline at end of file +} diff --git a/src/lib/kafkaProducerUtil.ts b/src/lib/kafkaProducerUtil.ts index 9467781..81177f1 100644 --- a/src/lib/kafkaProducerUtil.ts +++ b/src/lib/kafkaProducerUtil.ts @@ -66,6 +66,7 @@ export class KafkaProducerUtil { await transaction.send(record) } await transaction.commit() + logger.info('Message sent transactionally.') this.onDeliveryReport({ status: 'committed' }) } catch (err) { await transaction.abort() diff --git a/src/server/kafkaWorkers.ts b/src/server/kafkaWorkers.ts index d1b87ff..a05cfdb 100644 --- a/src/server/kafkaWorkers.ts +++ b/src/server/kafkaWorkers.ts @@ -1,6 +1,6 @@ import { KafkaConfig, Message, logLevel } from 'kafkajs' import logger from '../lib/winston' -import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler' +import { WorkflowHandler, WorkflowResult, topicList } from '../workflows/botswana/workflowHandler' import { config } from '../lib/config' import { KafkaConsumerUtil } from '../lib/kafkaConsumerUtil' @@ -8,15 +8,13 @@ const errorTypes = ['unhandledRejection', 'uncaughtException'] const signalTraps: NodeJS.Signals[] = ['SIGTERM', 'SIGINT', 'SIGUSR2'] const brokers = config.get('taskRunner:brokers') || ['kafka:9092'] -let consumer: KafkaConsumerUtil | null = null; +let consumer: KafkaConsumerUtil | null = null const consumerConfig: KafkaConfig = { clientId: 'shr-consumer', brokers: brokers, - logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR -}; - - + logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR, +} /** * Example Botswana Workflow: (synchronous for now) @@ -58,21 +56,24 @@ export async function run() { } async function shutdownConsumer() { - if (consumer) - await consumer.shutdown() + if (consumer) await consumer.shutdown() } const initAndConsume = async (topics: string[]) => { - const consumer = new KafkaConsumerUtil(consumerConfig, topics, "shr-consumer-group"); - await consumer.init(); - consumer.consumeTransactionally(processMessage); // No await here - return consumer; -}; + const consumer = new KafkaConsumerUtil(consumerConfig, topics, 'shr-consumer-group') + await consumer.init() + consumer.consumeTransactionally(processMessage) // No await here + return consumer +} + +async function processMessage( + topic: string, + partition: number, + message: Message, +): Promise { + // There is no additional error handling in this message, since any exceptions or problems will need to be + // logged and handled by the Kafka consumer retry logic in the KafkaConsumerUtil class. -async function processMessage(topic: string, partition: number, message: Message): Promise { - // There is no additional error handling in this message, since any exceptions or problems will need to be - // logged and handled by the Kafka consumer retry logic in the KafkaConsumerUtil class. - logger.info(`Recieved message from topic ${topic} on partition ${partition}`) let val = '' @@ -81,7 +82,7 @@ async function processMessage(topic: string, partition: number, message: Message if (message.value) { val = message.value.toString() } - + // This method needs to bubble up any exceptions to the Kafka consumer retry logic in the KafkaConsumerUtil class. - WorkflowHandler.executeTopicWorkflow(topic, val) -} \ No newline at end of file + return await WorkflowHandler.executeTopicWorkflow(topic, val) +} diff --git a/src/workflows/botswana/IpmsWorkflows.ts b/src/workflows/botswana/IpmsWorkflows.ts index e239c28..b6599d4 100644 --- a/src/workflows/botswana/IpmsWorkflows.ts +++ b/src/workflows/botswana/IpmsWorkflows.ts @@ -92,8 +92,9 @@ export async function sendOrmToIpms(bundles: any): Promise { 'based-on': entry.resource.id, } - const fetchedBundle = // TODO: Retry logic - await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json() + const fetchedBundle = < + R4.IBundle // TODO: Retry logic + >await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json() if (fetchedBundle && fetchedBundle.entry && srBundle.entry) { // Add child ServiceRequests if any exist @@ -187,13 +188,15 @@ export async function handleAdtFromIpms(adtMessage: string): Promise { i => i.system && i.system == config.get('bwConfig:omangSystemUrl'), ) - if (omangEntry) { - omang = omangEntry.value! + if (omangEntry && omangEntry.value) { + omang = omangEntry.value } else { logger.error( 'Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.', ) - return registrationBundle + throw new IpmsWorkflowError( + `Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.`, + ) } // Find all patients with this Omang. @@ -216,7 +219,7 @@ export async function handleAdtFromIpms(adtMessage: string): Promise { if ( e.resource && e.resource.resourceType == 'Task' && - e.resource.status == TaskStatusKind._requested + (e.resource.status == TaskStatusKind._accepted || config.get('bwConfig:devTaskStatus')) ) { // Grab bundle for task: options.searchParams = { @@ -230,12 +233,15 @@ export async function handleAdtFromIpms(adtMessage: string): Promise { return { patient: patient, taskBundle: taskBundle } } } + return { patient: undefined, taskBundle: undefined } + } else { + logger.error('Could not find patient tasks!') + return { patient: undefined, taskBundle: undefined } } } } catch (e) { logger.error('Could not process ADT!\n' + e) throw new IpmsWorkflowError('Could not process ADT!\n' + e) - return { patient: undefined, taskBundle: undefined } } } diff --git a/src/workflows/botswana/helpers.ts b/src/workflows/botswana/helpers.ts index e15be5d..5fbf6bd 100644 --- a/src/workflows/botswana/helpers.ts +++ b/src/workflows/botswana/helpers.ts @@ -71,8 +71,8 @@ export function getBundleEntries( export async function postWithRetry( crUrl: string, options: OptionsOfTextResponseBody, - retryLimit = 5, - timeout = 1000, + retryLimit = 2, + timeout = 3000, ) { for (let attempt = 1; attempt <= retryLimit; attempt++) { try { diff --git a/src/workflows/botswana/hl7Workflows.ts b/src/workflows/botswana/hl7Workflows.ts index 3ea3bd6..ef93954 100644 --- a/src/workflows/botswana/hl7Workflows.ts +++ b/src/workflows/botswana/hl7Workflows.ts @@ -43,7 +43,7 @@ export default class Hl7WorkflowsBw { static async handleAdtMessage(hl7Msg: string): Promise { try { - WorkflowHandler.sendPayload({ message: hl7Msg }, topicList.HANDLE_ADT_FROM_IPMS) + WorkflowHandler.sendPayloadWithRetryDMQ({ message: hl7Msg }, topicList.HANDLE_ADT_FROM_IPMS) } catch (error: any) { // TODO: Major Error - send to DMQ or handle otherwise logger.error(`Could not translate and save ADT message!\n${JSON.stringify(error)}`) diff --git a/src/workflows/botswana/patientIdentityWorkflows.ts b/src/workflows/botswana/patientIdentityWorkflows.ts index a167c1f..f34dc0e 100644 --- a/src/workflows/botswana/patientIdentityWorkflows.ts +++ b/src/workflows/botswana/patientIdentityWorkflows.ts @@ -1,7 +1,7 @@ -import { R4 } from "@ahryman40k/ts-fhir-types" -import config from "../../lib/config" -import { postWithRetry } from "./helpers" -import logger from "../../lib/winston" +import { R4 } from '@ahryman40k/ts-fhir-types' +import config from '../../lib/config' +import { postWithRetry } from './helpers' +import logger from '../../lib/winston' /** * updateCrPatient @@ -13,7 +13,7 @@ export async function updateCrPatient(bundle: R4.IBundle): Promise { let pat: R4.IPatient const patResult = bundle.entry!.find(entry => { - return entry.resource && entry.resource.resourceType == 'Patient' + return entry.resource && entry.resource.resourceType === 'Patient' }) const options = { @@ -28,14 +28,18 @@ export async function updateCrPatient(bundle: R4.IBundle): Promise { options.json = pat } - const crResult = await postWithRetry(crUrl, options, config.get('bwConfig:retryCount'), config.get('bwConfig:retryDelay')) + const crResult = await postWithRetry( + crUrl, + options, + config.get('bwConfig:retryCount'), + config.get('bwConfig:retryDelay'), + ) logger.debug(`CR Patient Update Result: ${JSON.stringify(crResult)}`) return bundle } - /** * * @param labBundle @@ -58,4 +62,3 @@ export async function saveIpmsPatient(registrationBundle: R4.IBundle): Promise { - logger.info('Delivery report:', report) + logger.info('Message delivered!') }) // Static instance of the Kafka producer. @@ -111,11 +117,12 @@ export class WorkflowHandler { } } - static async executeTopicWorkflow(topic: string, val: any) { + static async executeTopicWorkflow(topic: string, val: any): Promise { let response: any let enrichedBundle let origBundle let hl7Message + let successFlag = true // Each of these topics holds a workflow that is atomic. If any required part fails, then // the entire workflow fails and the message is retried by the Kafka consumer logic. @@ -149,12 +156,20 @@ export class WorkflowHandler { const adtRes = await handleAdtFromIpms(hl7Message) - this.sendPayloadWithRetryDMQ(adtRes.patient, topicList.SAVE_IPMS_PATIENT) + if (adtRes && adtRes.patient) { + this.sendPayloadWithRetryDMQ(adtRes.patient, topicList.SAVE_IPMS_PATIENT) + } - enrichedBundle = await sendOrmToIpms(adtRes) + if (adtRes && adtRes.taskBundle && adtRes.patient) { + enrichedBundle = await sendOrmToIpms(adtRes) - // Succeed only if this bundle saves successfully - response = await saveBundle(enrichedBundle) + // Succeed only if this bundle saves successfully + response = await saveBundle(enrichedBundle) + } else { + response = adtRes + successFlag = false + logger.error(`Could not handle ADT from IPMS!\n${JSON.stringify(adtRes)}`) + } break } @@ -172,22 +187,29 @@ export class WorkflowHandler { break } case topicList.SAVE_IPMS_PATIENT: { - origBundle = JSON.parse(val).bundle - response = await saveIpmsPatient(origBundle) + const patient: IPatient = JSON.parse(val) + const bundle: IBundle = { + resourceType: 'Bundle', + entry: [ + { + resource: patient, + }, + ], + } + response = await saveIpmsPatient(bundle) break } + default: { break } } - await new Promise(resolve => setTimeout(resolve, 300)) + await new Promise(resolve => setTimeout(resolve, 100)) - return response + return { success: successFlag, result: response } } catch (e) { logger.error('Could not execute Kafka consumer callback workflow!\nerror: ' + e) - throw new KafkaCallbackError( - 'Could not execute Kafka consumer callback workflow!\nerror: ' + e, - ) + return { success: false, result: `${e}` } } } @@ -215,7 +237,7 @@ export class WorkflowHandler { ] try { - logger.info(`Sending payload to topic ${topic}: ${JSON.stringify(payload)}`) + logger.info(`Sending payload to topic ${topic}!`) await this.kafka.sendMessageTransactionally(records) } catch (err) { console.error(`Error sending payload to topic ${topic}: ${err}`) @@ -234,14 +256,16 @@ export class WorkflowHandler { public static async sendPayloadWithRetryDMQ( payload: any, topic: string, - maxRetries = 5, - retryDelay = 1000, + maxRetries = 2, + retryDelay = 3000, ) { await this.initKafkaProducer() let val = '' if (payload && (payload.bundle || payload.resourceType)) { val = JSON.stringify(payload) + } else if (payload && payload.message) { + val = payload.message } else { val = payload } @@ -258,9 +282,7 @@ export class WorkflowHandler { while (attempt < maxRetries) { try { - logger.info( - `Attempt ${attempt + 1}: Sending payload to topic ${topic}: ${JSON.stringify(payload)}`, - ) + logger.info(`Attempt ${attempt + 1}: Sending payload to topic ${topic}!`) await this.kafka.sendMessageTransactionally(records) return // Success, exit the function. } catch (err) {