From cef8c6d0590682a619657ef305123c0ebf7ecc84 Mon Sep 17 00:00:00 2001 From: Piotr Mankowski Date: Sat, 27 Apr 2024 20:22:32 +0200 Subject: [PATCH] Pilot testing updates --- config/config_docker.json | 8 ++++++++ package.json | 1 + src/lib/hl7MllpSender.ts | 7 ++++--- src/lib/kafkaConsumerUtil.ts | 6 +++--- src/workflows/botswana/hl7Workflows.ts | 15 +++++++++------ src/workflows/botswana/workflowHandler.ts | 11 +++++++---- 6 files changed, 32 insertions(+), 16 deletions(-) diff --git a/config/config_docker.json b/config/config_docker.json index 17f4636..01f580d 100644 --- a/config/config_docker.json +++ b/config/config_docker.json @@ -28,6 +28,14 @@ "kafka:9092" ] }, + "retryConfig": { + "translatorMaxRetries": 5, + "translatorRetryDelay": 10000, + "hl7MaxRetries": 5, + "hl7RetryDelay": 10000, + "kafkaMaxRetries": 5, + "kafkaRetryDelay": 10000 + }, "bwConfig": { "pimsSystemUrl": "https://api.openconceptlab.org/orgs/I-TECH-UW/sources/PIMSLAB/", "ipmsSystemUrl": "https://api.openconceptlab.org/orgs/I-TECH-UW/sources/IPMSLAB/", diff --git a/package.json b/package.json index 704b871..eacd938 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "sprintf-js": "^1.1.2", "urijs": "^1.19.11", "uuid": "^3.3.3", + "uuid4": "^1.1.4", "winston": "^3.2.1", "winston-daily-rotate-file": "^4.4.2" }, diff --git a/src/lib/hl7MllpSender.ts b/src/lib/hl7MllpSender.ts index ae67449..3f0d841 100644 --- a/src/lib/hl7MllpSender.ts +++ b/src/lib/hl7MllpSender.ts @@ -1,6 +1,7 @@ import { MllpServer } from '@i-tech-uw/mllp-server' import logger from './winston' import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler' +import { config } from '../lib/config' export default class Hl7MllpSender { targetIp: string @@ -19,9 +20,9 @@ export default class Hl7MllpSender { this.mllpServer = new MllpServer(targetIp, targetPort, logger) } - public static getInstance(targetIp: string, targetPort: number): Hl7MllpSender { + public static getInstance(targetIp: string, targetPort: number, retries?: number, retryInterval?: number): Hl7MllpSender { if (!Hl7MllpSender.instance) { - Hl7MllpSender.instance = new Hl7MllpSender(targetIp, targetPort) + Hl7MllpSender.instance = new Hl7MllpSender(targetIp, targetPort, retries, retryInterval) } return Hl7MllpSender.instance } @@ -81,6 +82,6 @@ export default class Hl7MllpSender { } } -const hl7Sender = Hl7MllpSender.getInstance('127.0.0.1', 3000) +const hl7Sender = Hl7MllpSender.getInstance('127.0.0.1', 3000, config.get("retryConfig:hl7MaxRetries"), config.get("retryConfig:hl7RetryDelay")); export { hl7Sender } diff --git a/src/lib/kafkaConsumerUtil.ts b/src/lib/kafkaConsumerUtil.ts index 6de03e3..00610cc 100644 --- a/src/lib/kafkaConsumerUtil.ts +++ b/src/lib/kafkaConsumerUtil.ts @@ -1,7 +1,7 @@ import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs' import logger from './winston' import { WorkflowHandler, WorkflowResult, topicList } from '../workflows/botswana/workflowHandler' - +import { config } from '../lib/config' export type EachMessageCallback = ( topic: string, partition: number, @@ -61,9 +61,9 @@ export class KafkaConsumerUtil { `Consumer | Recieved message from topic ${topic} on partition ${partition} with offset ${message.offset}`, ) - const maxRetries = 2 + const maxRetries = config.get("retryConfig:kafkaMaxRetries") || 2 let retryCount = 0 - let retryDelay = 2000 + let retryDelay = config.get("retryConfig:kafkaRetryDelay") || 1000 let res: WorkflowResult | null = null while (retryCount < maxRetries) { diff --git a/src/workflows/botswana/hl7Workflows.ts b/src/workflows/botswana/hl7Workflows.ts index 5d78fac..0a10c17 100644 --- a/src/workflows/botswana/hl7Workflows.ts +++ b/src/workflows/botswana/hl7Workflows.ts @@ -69,6 +69,9 @@ export default class Hl7WorkflowsBw { } static async translateBundle(hl7Msg: string, templateConfigKey: string) { + let maxRetries = config.get('retryConfig:translatorMaxRetries') || 5; + let delay = config.get('retryConfig:translatorRetryDelay') || 2000; + // The errorCheck function defines the criteria for retrying based on the operation's result const errorCheck = (result: R4.IBundle) => result === this.errorBundle; @@ -78,8 +81,8 @@ export default class Hl7WorkflowsBw { // Use the retryOperation method with the new errorCheck criteria return await this.retryOperation( () => this.getHl7Translation(hl7Msg, config.get(templateConfigKey)), - 5, // maxRetries - 1000, // delay in milliseconds + maxRetries, + delay, errorCheck, payloadForDMQ ); @@ -140,8 +143,8 @@ export default class Hl7WorkflowsBw { static async getFhirTranslationWithRetry(bundle: R4.IBundle, template: string): Promise { // Define your retry parameters - const maxRetries = 3; - const delay = 1000; // Starting delay in ms + const maxRetries = config.get('retryConfig:translatorMaxRetries') || 5 + const delay = config.get('retryConfig:translatorRetryDelay') || 2000 const errorCheck = (result: R4.IBundle) => result === this.errorBundle; @@ -149,8 +152,8 @@ export default class Hl7WorkflowsBw { return await this.retryOperation( () => this.getFhirTranslation(bundle, template), - 5, // maxRetries - 2000, // delay in milliseconds + maxRetries, + delay, errorCheck, payloadForDMQ ); diff --git a/src/workflows/botswana/workflowHandler.ts b/src/workflows/botswana/workflowHandler.ts index da6aa9c..a176acc 100644 --- a/src/workflows/botswana/workflowHandler.ts +++ b/src/workflows/botswana/workflowHandler.ts @@ -258,9 +258,12 @@ export class WorkflowHandler { public static async sendPayloadWithRetryDMQ( payload: any, topic: string, - maxRetries = 2, - retryDelay = 3000, + maxRetries?: number, + retryDelay?: number, ) { + let myMaxRetries = maxRetries || config.get('retryConfig:kafkaMaxRetries') || 5 + let myRetryDelay = retryDelay || config.get('retryConfig:kafkaRetryDelay') || 2000 + await this.initKafkaProducer() let val = '' @@ -282,7 +285,7 @@ export class WorkflowHandler { let attempt = 0 - while (attempt < maxRetries) { + while (attempt < myMaxRetries) { try { logger.info(`Attempt ${attempt + 1}: Sending payload to topic ${topic}!`) await this.kafka.sendMessageTransactionally(records) @@ -291,7 +294,7 @@ export class WorkflowHandler { error = err logger.error(`Attempt ${attempt + 1}: Error sending payload to topic ${topic}: ${err}`) attempt++ - await sleep(retryDelay * Math.pow(2, attempt - 1)) // Exponential back-off. + await sleep(myRetryDelay * Math.pow(2, attempt - 1)) // Exponential back-off. } }