diff --git a/src/lib/hl7MllpSender.ts b/src/lib/hl7MllpSender.ts index bb254a7..b4c5ce1 100644 --- a/src/lib/hl7MllpSender.ts +++ b/src/lib/hl7MllpSender.ts @@ -1,30 +1,57 @@ import { MllpServer } from '@i-tech-uw/mllp-server' import logger from './winston' +import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler' export default class Hl7MllpSender { targetIp: string targetPort: number mllpServer: MllpServer + retries: number + retryInterval: number - constructor(targetIp: string, targetPort: number) { + private static instance: Hl7MllpSender; + + constructor(targetIp: string, targetPort: number, retries = 3, retryInterval = 10000) { this.targetPort = targetPort this.targetIp = targetIp + this.retries = retries + this.retryInterval = retryInterval this.mllpServer = new MllpServer(targetIp, targetPort, logger) } + public static getInstance(targetIp: string, targetPort: number): Hl7MllpSender { + if (!Hl7MllpSender.instance) { + Hl7MllpSender.instance = new Hl7MllpSender(targetIp, targetPort) + } + return Hl7MllpSender.instance + } + + /** * * @returns Promise */ - send(message: string, retries = 10): any { + send(message: string, targetIp?: string, port?: number, retries?: number): any { + if(!targetIp) { + targetIp = this.targetIp; + } + + if(!port) { + port = this.targetPort; + } + + if(!retries) { + retries = this.retries + } + message = message.replace(/[\n\r]/g, '\r') const firstNewline = message.match(/\r/) const header = firstNewline ? message.substring(0, firstNewline.index) : '' return new Promise((resolve, reject) => { - this.mllpServer.send(this.targetIp, this.targetPort, message, (err: any, ackData: any) => { + this.mllpServer.send(targetIp, port, message, (err: any, ackData: any) => { logger.info( - `!! Sending HL7 message ${header}!\n err: ${err ? err : ''}\n ackData: ${ + `Sending HL7 message ${header}!\n err: ${err ? err : ''}\n ackData: ${ ackData ? ackData : '' }`, ) @@ -32,7 +59,7 @@ export default class Hl7MllpSender { reject({ error: err, retries: retries }) } else { logger.info( - `!! Successfully sent HL7 message ${header} \n with ${retries} retries left!`, + `Successfully sent HL7 message ${header} \n with ${retries} retries left!`, ) resolve(ackData) } @@ -44,11 +71,19 @@ export default class Hl7MllpSender { .catch(e => { if (e.retries > 0) { logger.info(`Retrying... ${e.retries} retries left`) - return setTimeout(() => this.send(message, e.retries - 1), 2000) + return setTimeout(() => this.send(message, targetIp, port, retries - 1), this.retryInterval) } else { - logger.error(`!! Failed to send HL7 message ${header}!`) + logger.error(`Failed to send HL7 message ${header}!`) + + // Send to DMQ + WorkflowHandler.sendPayload({message: message, targetIp: this.targetIp, port: port}, topicList.DMQ) + return e.error } }) } } + +const hl7Sender = Hl7MllpSender.getInstance('127.0.0.1', 3000); + +export { hl7Sender }; diff --git a/src/lib/kafkaConsumerUtil.ts b/src/lib/kafkaConsumerUtil.ts index 2c5e2bc..6de03e3 100644 --- a/src/lib/kafkaConsumerUtil.ts +++ b/src/lib/kafkaConsumerUtil.ts @@ -1,6 +1,6 @@ import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs' import logger from './winston' -import { WorkflowResult } from '../workflows/botswana/workflowHandler' +import { WorkflowHandler, WorkflowResult, topicList } from '../workflows/botswana/workflowHandler' export type EachMessageCallback = ( topic: string, @@ -87,11 +87,13 @@ export class KafkaConsumerUtil { retryCount++ if (retryCount >= maxRetries) { logger.error( - `Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`, + `Max retries reached for message ${message.offset}, sending to dead message queue.`, ) resolveOffset(message.offset) - // TODO: handle with DLQ + // Send to DMQ + WorkflowHandler.sendPayload({ topic: topic, partition: partition, message: message }, topicList.DMQ) + break } await new Promise(resolve => setTimeout(resolve, retryDelay)) diff --git a/src/routes/hl7.ts b/src/routes/hl7.ts index 94279b3..90e633f 100644 --- a/src/routes/hl7.ts +++ b/src/routes/hl7.ts @@ -1,7 +1,7 @@ 'use strict' import express, { Request, Response } from 'express' -import Hl7MllpSender from '../lib/hl7MllpSender' +import { hl7Sender } from '../lib/hl7MllpSender' export const router = express.Router() @@ -11,9 +11,7 @@ router.post('/forward/:targetIp/:targetPort', async (req: Request, res: Response const targetIp: string = req.params.targetIp const targetPort = Number(req.params.targetPort) - const sender = new Hl7MllpSender(targetIp, targetPort) - - const ack = await sender.send(hl7Msg) + const ack = await hl7Sender.send(hl7Msg, targetIp, targetPort) res.status(200) res.send(ack) diff --git a/src/workflows/botswana/IpmsWorkflows.ts b/src/workflows/botswana/IpmsWorkflows.ts index da440bd..f42cab9 100644 --- a/src/workflows/botswana/IpmsWorkflows.ts +++ b/src/workflows/botswana/IpmsWorkflows.ts @@ -2,7 +2,7 @@ import { R4 } from '@ahryman40k/ts-fhir-types' import config from '../../lib/config' import logger from '../../lib/winston' import { getTaskStatus, setTaskStatus } from './helpers' -import Hl7MllpSender from '../../lib/hl7MllpSender' +import { hl7Sender } from '../../lib/hl7MllpSender' import Hl7WorkflowsBw from '../botswana/hl7Workflows' import got from 'got' import { @@ -38,11 +38,6 @@ export async function sendAdtToIpms(labBundle: R4.IBundle): Promise if (status && status === R4.TaskStatusKind._requested) { logger.info('Sending ADT message to IPMS!') - const sender = new Hl7MllpSender( - config.get('bwConfig:mllp:targetIp'), - config.get('bwConfig:mllp:targetAdtPort'), - ) - const adtMessage = await Hl7WorkflowsBw.getFhirTranslationWithRetry( labBundle, config.get('bwConfig:toIpmsAdtTemplate'), @@ -50,7 +45,10 @@ export async function sendAdtToIpms(labBundle: R4.IBundle): Promise logger.info(`adt:\n${adtMessage}`) - const adtResult: string = await sender.send(adtMessage) + const targetIp = config.get('bwConfig:mllp:targetIp') + const targetPort = config.get('bwConfig:mllp:targetAdtPort') + + const adtResult: string = await hl7Sender.send(adtMessage, targetIp, targetPort) if (adtResult.includes && adtResult.includes('AA')) { labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._received) @@ -131,17 +129,15 @@ export async function sendOrmToIpms(bundles: any): Promise { config.get('bwConfig:toIpmsOrmTemplate'), ) - const sender = new Hl7MllpSender( - config.get('bwConfig:mllp:targetIp'), - config.get('bwConfig:mllp:targetOrmPort'), - ) - + const targetIp = config.get('bwConfig:mllp:targetIp') + const targetPort = config.get('bwConfig:mllp:targetOrmPort') + logger.info('Sending ORM message to IPMS!') logger.info(`orm:\n${ormMessage}\n`) if (ormMessage && ormMessage != '') { - const result: any = await sender.send(ormMessage) + const result: any = await hl7Sender.send(ormMessage, targetIp, targetPort) if (result.includes('AA')) { labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._accepted) } @@ -163,7 +159,7 @@ export async function sendOrmToIpms(bundles: any): Promise { * @param registrationBundle - The registration bundle containing the patient information. * @returns A Promise that resolves to the registration bundle. */ -export async function handleAdtFromIpms(adtMessage: string): Promise { +export async function handleAdtFromIpms(adtMessage: string, sender?: Hl7MllpSender): Promise { try { const registrationBundle: R4.IBundle = await Hl7WorkflowsBw.translateBundle( adtMessage, diff --git a/src/workflows/botswana/hl7Workflows.ts b/src/workflows/botswana/hl7Workflows.ts index 548f081..5d78fac 100644 --- a/src/workflows/botswana/hl7Workflows.ts +++ b/src/workflows/botswana/hl7Workflows.ts @@ -64,7 +64,6 @@ export default class Hl7WorkflowsBw { try { WorkflowHandler.sendPayloadWithRetryDMQ({ message: hl7Msg }, topicList.HANDLE_ADT_FROM_IPMS) } catch (error: any) { - // TODO: Major Error - send to DMQ or handle otherwise logger.error(`Could not translate and save ADT message!\n${JSON.stringify(error)}`) } } diff --git a/src/workflows/botswana/workflowHandler.ts b/src/workflows/botswana/workflowHandler.ts index a348c16..da6aa9c 100644 --- a/src/workflows/botswana/workflowHandler.ts +++ b/src/workflows/botswana/workflowHandler.ts @@ -50,6 +50,7 @@ export const topicList = { SAVE_IPMS_PATIENT: 'save-ipms-patient', HANDLE_ORU_FROM_IPMS: 'handle-oru-from-ipms', HANDLE_ADT_FROM_IPMS: 'handle-adt-from-ipms', + DMQ: 'dmq' } /** @@ -298,7 +299,7 @@ export class WorkflowHandler { if (error && attempt === maxRetries) { logger.error(`All retries failed. Sending payload to DMQ!`) try { - logger.error('TODO: Implement DMQ!:\n' + JSON.stringify(payload)) + WorkflowHandler.sendPayload({ payload: payload, topic: topic, error: error }, topicList.DMQ) } catch (dmqError) { logger.error(`Failed to send payload to DMQ: ${dmqError}`) throw new Error(`Failed to send payload to DMQ: ${dmqError}`)