diff --git a/.eslintcache b/.eslintcache deleted file mode 100644 index 774b6b0..0000000 --- a/.eslintcache +++ /dev/null @@ -1 +0,0 @@ -[{"/u01/code/shared-health-record/src/lib/kafkaConsumerUtil.ts":"1","/u01/code/shared-health-record/src/workflows/botswana/IpmsWorkflows.ts":"2","/u01/code/shared-health-record/src/workflows/botswana/helpers.ts":"3","/u01/code/shared-health-record/src/workflows/botswana/patientIdentityWorkflows.ts":"4","/u01/code/shared-health-record/src/workflows/botswana/workflowHandler.ts":"5","/u01/code/shared-health-record/src/workflows/botswana/locationWorkflows.ts":"6"},{"size":3503,"mtime":1700178819830,"results":"7","hashOfConfig":"8"},{"size":12350,"mtime":1700178820038,"results":"9","hashOfConfig":"8"},{"size":3130,"mtime":1700255872114},{"size":1486,"mtime":1700178820182,"results":"10","hashOfConfig":"8"},{"size":11215,"mtime":1700178820270,"results":"11","hashOfConfig":"8"},{"size":6760,"mtime":1700255448566,"results":"12","hashOfConfig":"8"},{"filePath":"13","messages":"14","suppressedMessages":"15","errorCount":0,"fatalErrorCount":0,"warningCount":0,"fixableErrorCount":0,"fixableWarningCount":0},"29hqh",{"filePath":"16","messages":"17","suppressedMessages":"18","errorCount":0,"fatalErrorCount":0,"warningCount":14,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},{"filePath":"19","messages":"20","suppressedMessages":"21","errorCount":0,"fatalErrorCount":0,"warningCount":2,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},{"filePath":"22","messages":"23","suppressedMessages":"24","errorCount":0,"fatalErrorCount":0,"warningCount":10,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},{"filePath":"25","messages":"26","suppressedMessages":"27","errorCount":0,"fatalErrorCount":0,"warningCount":0,"fixableErrorCount":0,"fixableWarningCount":0},"/u01/code/shared-health-record/src/lib/kafkaConsumerUtil.ts",[],[],"/u01/code/shared-health-record/src/workflows/botswana/IpmsWorkflows.ts",["28","29","30","31","32","33","34","35","36","37","38","39","40","41"],[],"/u01/code/shared-health-record/src/workflows/botswana/patientIdentityWorkflows.ts",["42","43"],[],"/u01/code/shared-health-record/src/workflows/botswana/workflowHandler.ts",["44","45","46","47","48","49","50","51","52","53"],["54"],"/u01/code/shared-health-record/src/workflows/botswana/locationWorkflows.ts",[],[],{"ruleId":"55","severity":1,"message":"56","line":63,"column":46,"nodeType":"57","messageId":"58","endLine":63,"endColumn":49,"suggestions":"59"},{"ruleId":"60","severity":1,"message":"61","line":70,"column":20,"nodeType":"62","messageId":"63","endLine":70,"endColumn":36,"suggestions":"64"},{"ruleId":"55","severity":1,"message":"56","line":70,"column":55,"nodeType":"57","messageId":"58","endLine":70,"endColumn":58,"suggestions":"65"},{"ruleId":"55","severity":1,"message":"56","line":143,"column":23,"nodeType":"57","messageId":"58","endLine":143,"endColumn":26,"suggestions":"66"},{"ruleId":"55","severity":1,"message":"56","line":162,"column":70,"nodeType":"57","messageId":"58","endLine":162,"endColumn":73,"suggestions":"67"},{"ruleId":"60","severity":1,"message":"61","line":180,"column":22,"nodeType":"62","messageId":"63","endLine":180,"endColumn":47,"suggestions":"68"},{"ruleId":"60","severity":1,"message":"61","line":218,"column":25,"nodeType":"62","messageId":"63","endLine":218,"endColumn":44},{"ruleId":"60","severity":1,"message":"61","line":257,"column":9,"nodeType":"62","messageId":"63","endLine":258,"endColumn":21},{"ruleId":"60","severity":1,"message":"61","line":257,"column":9,"nodeType":"62","messageId":"63","endLine":257,"endColumn":94,"suggestions":"69"},{"ruleId":"60","severity":1,"message":"61","line":262,"column":9,"nodeType":"62","messageId":"63","endLine":264,"endColumn":21},{"ruleId":"60","severity":1,"message":"61","line":262,"column":9,"nodeType":"62","messageId":"63","endLine":264,"endColumn":11,"suggestions":"70"},{"ruleId":"60","severity":1,"message":"61","line":268,"column":9,"nodeType":"62","messageId":"63","endLine":269,"endColumn":21},{"ruleId":"60","severity":1,"message":"61","line":268,"column":9,"nodeType":"62","messageId":"63","endLine":268,"endColumn":98,"suggestions":"71"},{"ruleId":"60","severity":1,"message":"61","line":280,"column":17,"nodeType":"62","messageId":"63","endLine":280,"endColumn":34},{"ruleId":"60","severity":1,"message":"61","line":15,"column":21,"nodeType":"62","messageId":"63","endLine":15,"endColumn":34,"suggestions":"72"},{"ruleId":"60","severity":1,"message":"61","line":27,"column":24,"nodeType":"62","messageId":"63","endLine":27,"endColumn":43},{"ruleId":"73","severity":1,"message":"74","line":19,"column":7,"nodeType":"75","messageId":"76","endLine":19,"endColumn":10},{"ruleId":"77","severity":1,"message":"78","line":19,"column":7,"nodeType":"75","messageId":"76","endLine":19,"endColumn":10},{"ruleId":"73","severity":1,"message":"79","line":29,"column":7,"nodeType":"75","messageId":"76","endLine":29,"endColumn":25},{"ruleId":"77","severity":1,"message":"80","line":29,"column":7,"nodeType":"75","messageId":"76","endLine":29,"endColumn":25},{"ruleId":"73","severity":1,"message":"81","line":97,"column":64,"nodeType":"75","messageId":"76","endLine":97,"endColumn":70},{"ruleId":"77","severity":1,"message":"82","line":97,"column":64,"nodeType":"75","messageId":"76","endLine":97,"endColumn":70},{"ruleId":"55","severity":1,"message":"56","line":120,"column":57,"nodeType":"57","messageId":"58","endLine":120,"endColumn":60,"suggestions":"83"},{"ruleId":"55","severity":1,"message":"56","line":121,"column":19,"nodeType":"57","messageId":"58","endLine":121,"endColumn":22,"suggestions":"84"},{"ruleId":"55","severity":1,"message":"56","line":222,"column":44,"nodeType":"57","messageId":"58","endLine":222,"endColumn":47,"suggestions":"85"},{"ruleId":"55","severity":1,"message":"56","line":257,"column":14,"nodeType":"57","messageId":"58","endLine":257,"endColumn":17,"suggestions":"86"},{"ruleId":"87","severity":2,"message":"88","line":19,"column":13,"nodeType":"89","messageId":"90","endLine":19,"endColumn":27,"suppressions":"91"},"@typescript-eslint/no-explicit-any","Unexpected any. Specify a different type.","TSAnyKeyword","unexpectedAny",["92","93"],"@typescript-eslint/no-non-null-assertion","Forbidden non-null assertion.","TSNonNullExpression","noNonNull",["94"],["95","96"],["97","98"],["99","100"],["101"],["102"],["103"],["104"],["105"],"unused-imports/no-unused-vars","'hl7' is assigned a value but never used. Allowed unused vars must match /^_/u.","Identifier","unusedVar","@typescript-eslint/no-unused-vars","'hl7' is assigned a value but never used.","'KafkaCallbackError' is defined but never used. Allowed unused vars must match /^_/u.","'KafkaCallbackError' is defined but never used.","'report' is defined but never used. Allowed unused args must match /^_/u.","'report' is defined but never used.",["106","107"],["108","109"],["110","111"],["112","113"],"@typescript-eslint/no-var-requires","Require statement not part of import statement.","CallExpression","noVarReqs",["114"],{"messageId":"115","fix":"116","desc":"117"},{"messageId":"118","fix":"119","desc":"120"},{"messageId":"121","fix":"122","desc":"123"},{"messageId":"115","fix":"124","desc":"117"},{"messageId":"118","fix":"125","desc":"120"},{"messageId":"115","fix":"126","desc":"117"},{"messageId":"118","fix":"127","desc":"120"},{"messageId":"115","fix":"128","desc":"117"},{"messageId":"118","fix":"129","desc":"120"},{"messageId":"121","fix":"130","desc":"123"},{"messageId":"121","fix":"131","desc":"123"},{"messageId":"121","fix":"132","desc":"123"},{"messageId":"121","fix":"133","desc":"123"},{"messageId":"121","fix":"134","desc":"123"},{"messageId":"115","fix":"135","desc":"117"},{"messageId":"118","fix":"136","desc":"120"},{"messageId":"115","fix":"137","desc":"117"},{"messageId":"118","fix":"138","desc":"120"},{"messageId":"115","fix":"139","desc":"117"},{"messageId":"118","fix":"140","desc":"120"},{"messageId":"115","fix":"141","desc":"117"},{"messageId":"118","fix":"142","desc":"120"},{"kind":"143","justification":"144"},"suggestUnknown",{"range":"145","text":"146"},"Use `unknown` instead, this will force you to explicitly, and safely assert the type is correct.","suggestNever",{"range":"147","text":"148"},"Use `never` instead, this is useful when instantiating generic type parameters that you don't need to know the type of.","suggestOptionalChain",{"range":"149","text":"150"},"Consider using the optional chain operator `?.` instead. This operator includes runtime checks, so it is safer than the compile-only non-null assertion operator.",{"range":"151","text":"146"},{"range":"152","text":"148"},{"range":"153","text":"146"},{"range":"154","text":"148"},{"range":"155","text":"146"},{"range":"156","text":"148"},{"range":"157","text":"150"},{"range":"158","text":"150"},{"range":"159","text":"150"},{"range":"160","text":"150"},{"range":"161","text":"150"},{"range":"162","text":"146"},{"range":"163","text":"148"},{"range":"164","text":"146"},{"range":"165","text":"148"},{"range":"166","text":"146"},{"range":"167","text":"148"},{"range":"168","text":"146"},{"range":"169","text":"148"},"directive","",[1747,1750],"unknown",[1747,1750],"never",[2033,2034],"?",[2053,2056],[2053,2056],[4479,4482],[4479,4482],[5228,5231],[5228,5231],[5753,5754],[8407,8408],[8618,8619],[8783,8784],[424,425],[4687,4690],[4687,4690],[4737,4740],[4737,4740],[8208,8211],[8208,8211],[9372,9375],[9372,9375]] \ No newline at end of file diff --git a/config/config_docker.json b/config/config_docker.json index a491ec9..17f4636 100644 --- a/config/config_docker.json +++ b/config/config_docker.json @@ -35,7 +35,7 @@ "loincSystemUrl": "https://api.openconceptlab.org/orgs/Regenstrief/sources/LOINC/", "omangSystemUrl": "http://moh.bw.org/ext/identifier/omang", "brdsSystemUrl": "http://moh.bw.org/ext/identifier/bcn", - "immigrationSystemUrl": "http://moh.bw.org/ext/identifier/passportno", + "immigrationSystemUrl": "http://moh.bw.org/ext/identifier/ppn", "oclUrl": "https://api.openconceptlab.org", "facilityCodeSystemUrl": "http://moh.bw.org/ext/identifier/facility-code", "ipmsProviderSystemUrl": "http://moh.bw.org/ext/ipms-provider", diff --git a/debug.docker-compose.yml b/debug.docker-compose.yml index 40c186b..156bfb4 100644 --- a/debug.docker-compose.yml +++ b/debug.docker-compose.yml @@ -18,7 +18,7 @@ services: container_name: shr restart: unless-stopped hostname: shr - image: itechuw/shared-health-record:dev + image: itechuw/shared-health-record:debug build: context: ./ args: diff --git a/node-docker.sh b/node-docker.sh new file mode 100755 index 0000000..cebd356 --- /dev/null +++ b/node-docker.sh @@ -0,0 +1,3 @@ +#! /usr/bin/env bash + +docker run -it --rm -v ${PWD}:/usr/src/app -w /usr/src/app node:18-slim /bin/bash \ No newline at end of file diff --git a/src/workflows/botswana/IpmsWorkflows.ts b/src/workflows/botswana/IpmsWorkflows.ts index b6599d4..da440bd 100644 --- a/src/workflows/botswana/IpmsWorkflows.ts +++ b/src/workflows/botswana/IpmsWorkflows.ts @@ -14,6 +14,7 @@ import { IPatient, IReference, IServiceRequest, + ITask, TaskStatusKind, } from '@ahryman40k/ts-fhir-types/lib/R4' import { saveBundle } from '../../hapi/lab' @@ -42,7 +43,7 @@ export async function sendAdtToIpms(labBundle: R4.IBundle): Promise config.get('bwConfig:mllp:targetAdtPort'), ) - const adtMessage = await Hl7WorkflowsBw.getFhirTranslation( + const adtMessage = await Hl7WorkflowsBw.getFhirTranslationWithRetry( labBundle, config.get('bwConfig:toIpmsAdtTemplate'), ) @@ -52,7 +53,7 @@ export async function sendAdtToIpms(labBundle: R4.IBundle): Promise const adtResult: string = await sender.send(adtMessage) if (adtResult.includes && adtResult.includes('AA')) { - labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._accepted) + labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._received) } } else { logger.info('Order not ready for IPMS.') @@ -94,7 +95,7 @@ export async function sendOrmToIpms(bundles: any): Promise { const fetchedBundle = < R4.IBundle // TODO: Retry logic - >await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json() + >await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json() if (fetchedBundle && fetchedBundle.entry && srBundle.entry) { // Add child ServiceRequests if any exist @@ -125,7 +126,7 @@ export async function sendOrmToIpms(bundles: any): Promise { const outBundle = { ...sendBundle } outBundle.entry.push(sr) - const ormMessage = await Hl7WorkflowsBw.getFhirTranslation( + const ormMessage = await Hl7WorkflowsBw.getFhirTranslationWithRetry( outBundle, config.get('bwConfig:toIpmsOrmTemplate'), ) @@ -142,7 +143,7 @@ export async function sendOrmToIpms(bundles: any): Promise { if (ormMessage && ormMessage != '') { const result: any = await sender.send(ormMessage) if (result.includes('AA')) { - labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._inProgress) + labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._accepted) } logger.info(`*result:\n${result}\n`) } @@ -156,6 +157,9 @@ export async function sendOrmToIpms(bundles: any): Promise { /** * Handles ADT (Admission, Discharge, Transfer) messages received from IPMS (Integrated Patient Management System). + * + * This method needs to be able to match the patient coming back to the patient going in. + * * @param registrationBundle - The registration bundle containing the patient information. * @returns A Promise that resolves to the registration bundle. */ @@ -176,7 +180,8 @@ export async function handleAdtFromIpms(adtMessage: string): Promise { } // Get patient from registration Bundle - let patient: R4.IPatient, omang: string + let patient: R4.IPatient, omang: string, ppn: string, bcn: string, identifierParam: string + const patEntry = registrationBundle.entry!.find(entry => { return entry.resource && entry.resource.resourceType == 'Patient' }) @@ -184,47 +189,89 @@ export async function handleAdtFromIpms(adtMessage: string): Promise { if (patEntry && patEntry.resource) { patient = patEntry.resource + // Find patient identifiers, if they exist const omangEntry = patient.identifier?.find( i => i.system && i.system == config.get('bwConfig:omangSystemUrl'), ) + const ppnEntry = patient.identifier?.find( + i => i.system && i.system == config.get('bwConfig:immigrationSystemUrl'), + ) + + const bcnEntry = patient.identifier?.find( + i => i.system && i.system == config.get('bwConfig:bdrsSystemUrl'), + ) + if (omangEntry && omangEntry.value) { omang = omangEntry.value + identifierParam = `${config.get('bwConfig:omangSystemUrl')}|${omang}` + } else if (bcnEntry && bcnEntry.value) { + bcn = bcnEntry.value + identifierParam = `${config.get('bwConfig:bdrsSystemUrl')}|${bcn}` + } else if (ppnEntry && ppnEntry.value) { + ppn = ppnEntry.value + identifierParam = `${config.get('bwConfig:immigrationSystemUrl')}|${ppn}` } else { - logger.error( - 'Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.', - ) + let errorMessage = 'Patient missing a required identifier - matching supported only on Omang, birth certificate number, or passport number.' + + logger.error(errorMessage) + throw new IpmsWorkflowError( - `Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.`, + errorMessage ) } - // Find all patients with this Omang. + // Find all patients with these identifiers and grab the related Tasks options.searchParams = { - identifier: `${config.get('bwConfig:omangSystemUrl')}|${omang}`, + identifier: `${identifierParam}`, _revinclude: 'Task:patient', } - let patientTasks: R4.IBundle + let potentialPatientTasks: R4.IBundle try { - patientTasks = await got.get(`${config.get('fhirServer:baseURL')}/Patient`, options).json() + potentialPatientTasks = await got.get(`${config.get('fhirServer:baseURL')}/Patient`, options).json() } catch (e) { - patientTasks = { resourceType: 'Bundle' } + potentialPatientTasks = { resourceType: 'Bundle' } logger.error(e) } - if (patientTasks && patientTasks.entry) { - // Get all Tasks with `requested` status - for (const e of patientTasks.entry!) { - if ( - e.resource && - e.resource.resourceType == 'Task' && - (e.resource.status == TaskStatusKind._accepted || config.get('bwConfig:devTaskStatus')) - ) { + if (potentialPatientTasks && potentialPatientTasks.entry) { + // Get all Tasks with `recieved` status, which indicates the patient ADT has been sent to IPMS + + // Filter and Sort all resources in entry to have tasks by decending order or creation + const patientTasks = potentialPatientTasks.entry.filter( + e => e.resource && e.resource.resourceType == 'Task' && (e.resource.status == TaskStatusKind._received), + ).sort((a, b) => { + if (a.resource && b.resource) { + let at = a.resource + let bt = b.resource + + return ( + new Date(bt.authoredOn || 0).getTime() - + new Date(at.authoredOn || 0).getTime() + ) + } + return 0 + }) + + // TODO: Account for multiple task results! + + // For now, if multiple tasks exist, grab the most recent one and log a warning + if (patientTasks.length > 1) { + logger.warn( + `More than one task found for patient ${patient.id} with identifier ${identifierParam}! Processing most recent.`, + ) + } + + if (patientTasks.length > 0) { + const targetTask = patientTasks[0].resource + + if (targetTask) { + // Grab bundle for task: options.searchParams = { _include: '*', - _id: e.resource.id, + _id: targetTask.id, } const taskBundle: IBundle = await got @@ -235,7 +282,7 @@ export async function handleAdtFromIpms(adtMessage: string): Promise { } return { patient: undefined, taskBundle: undefined } } else { - logger.error('Could not find patient tasks!') + logger.error('Could not find any patient tasks for patient with identifier ' + identifierParam + '!') return { patient: undefined, taskBundle: undefined } } } diff --git a/src/workflows/botswana/hl7Workflows.ts b/src/workflows/botswana/hl7Workflows.ts index ef93954..548f081 100644 --- a/src/workflows/botswana/hl7Workflows.ts +++ b/src/workflows/botswana/hl7Workflows.ts @@ -6,9 +6,28 @@ import got from 'got/dist/source' import config from '../../lib/config' import logger from '../../lib/winston' import { WorkflowHandler, topicList } from './workflowHandler' -import sleep from 'sleep-promise' +import { KafkaProducerUtil } from '../../lib/kafkaProducerUtil' +import { KafkaConfig, logLevel } from 'kafkajs' + +const brokers = config.get('taskRunner:brokers') || ['kafka:9092'] export default class Hl7WorkflowsBw { + private static kafkaProducer: KafkaProducerUtil | null = null; + + public static async initKafkaProducer() { + if (!this.kafkaProducer) { + const hl7ProducerConfig: KafkaConfig = { + clientId: 'dead-message-producer', + brokers: brokers, + logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR, + } + this.kafkaProducer = new KafkaProducerUtil(hl7ProducerConfig, (report) => { + logger.info('HL7 message delivery report:', report); + }) + await this.kafkaProducer.init(); + } + } + public static errorBundle: IBundle = { resourceType: 'Bundle', type: BundleTypeKind._transactionResponse, @@ -50,18 +69,21 @@ export default class Hl7WorkflowsBw { } } - static async translateBundle(hl7Msg: string, template: string) { - let tries = 0 - let translatedBundle: R4.IBundle = this.errorBundle + static async translateBundle(hl7Msg: string, templateConfigKey: string) { + // The errorCheck function defines the criteria for retrying based on the operation's result + const errorCheck = (result: R4.IBundle) => result === this.errorBundle; - while (tries < 5 && translatedBundle == this.errorBundle) { - tries = tries + 1 - translatedBundle = await this.getHl7Translation(hl7Msg, config.get(template)) - if (translatedBundle == this.errorBundle) { - await sleep(1000) - } - } - return translatedBundle + // Define the payload for DMQ in case of failure + const payloadForDMQ = { hl7Msg, templateConfigKey }; + + // Use the retryOperation method with the new errorCheck criteria + return await this.retryOperation( + () => this.getHl7Translation(hl7Msg, config.get(templateConfigKey)), + 5, // maxRetries + 1000, // delay in milliseconds + errorCheck, + payloadForDMQ + ); } static async getHl7Translation(hl7Message: string, template: string): Promise { @@ -116,4 +138,59 @@ export default class Hl7WorkflowsBw { return '' } } + + static async getFhirTranslationWithRetry(bundle: R4.IBundle, template: string): Promise { + // Define your retry parameters + const maxRetries = 3; + const delay = 1000; // Starting delay in ms + + const errorCheck = (result: R4.IBundle) => result === this.errorBundle; + + const payloadForDMQ = { bundle, template }; + + return await this.retryOperation( + () => this.getFhirTranslation(bundle, template), + 5, // maxRetries + 2000, // delay in milliseconds + errorCheck, + payloadForDMQ + ); + } + + static async retryOperation(func: () => any, maxRetries: number, delay: number, errorCheck: (result: any) => boolean, payloadForDMQ: any) { + let attempts = 0; + let result: any; + while (attempts < maxRetries) { + try { + result = await func(); + // Check if the result meets the criteria to be considered successful + if (!errorCheck(result)) { + return result; // If result is satisfactory, return it + } + // If result is not satisfactory, log and prepare for a retry + logger.info(`Retry criteria not met, attempt ${attempts + 1} of ${maxRetries}`); + } catch (error) { + logger.error(`Error on attempt ${attempts + 1}: ${error}`); + // If this was the last attempt, handle DMQ logic + if (attempts === maxRetries - 1) { + logger.error(`Max retries reached, sending to Kafka DMQ topic. Error: ${error}`); + await this.initKafkaProducer(); + if (this.kafkaProducer) { + await this.kafkaProducer.sendMessageTransactionally([{ + topic: 'dmq', + messages: [{ value: JSON.stringify(payloadForDMQ) }], + }]); + } + throw new Error('Operation failed after maximum retries, message sent to DMQ.'); + } + } + // Prepare for the next attempt + attempts++; + await new Promise(resolve => setTimeout(resolve, delay)); + delay *= 2; // Exponential backoff + } + // If max retries are reached and the result is still not satisfactory, consider it a failure + throw new Error('Operation failed after maximum retries based on result criteria.'); + } + } diff --git a/src/workflows/botswana/workflowHandler.ts b/src/workflows/botswana/workflowHandler.ts index ec5295c..a348c16 100644 --- a/src/workflows/botswana/workflowHandler.ts +++ b/src/workflows/botswana/workflowHandler.ts @@ -165,6 +165,7 @@ export class WorkflowHandler { // Succeed only if this bundle saves successfully response = await saveBundle(enrichedBundle) + } else { response = adtRes successFlag = false