Skip to content

Commit

Permalink
Pilot testing updates
Browse files Browse the repository at this point in the history
  • Loading branch information
pmanko committed Apr 27, 2024
1 parent 355e988 commit cef8c6d
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 16 deletions.
8 changes: 8 additions & 0 deletions config/config_docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
"kafka:9092"
]
},
"retryConfig": {
"translatorMaxRetries": 5,
"translatorRetryDelay": 10000,
"hl7MaxRetries": 5,
"hl7RetryDelay": 10000,
"kafkaMaxRetries": 5,
"kafkaRetryDelay": 10000
},
"bwConfig": {
"pimsSystemUrl": "https://api.openconceptlab.org/orgs/I-TECH-UW/sources/PIMSLAB/",
"ipmsSystemUrl": "https://api.openconceptlab.org/orgs/I-TECH-UW/sources/IPMSLAB/",
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"sprintf-js": "^1.1.2",
"urijs": "^1.19.11",
"uuid": "^3.3.3",
"uuid4": "^1.1.4",
"winston": "^3.2.1",
"winston-daily-rotate-file": "^4.4.2"
},
Expand Down
7 changes: 4 additions & 3 deletions src/lib/hl7MllpSender.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MllpServer } from '@i-tech-uw/mllp-server'
import logger from './winston'
import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler'
import { config } from '../lib/config'

export default class Hl7MllpSender {
targetIp: string
Expand All @@ -19,9 +20,9 @@ export default class Hl7MllpSender {
this.mllpServer = new MllpServer(targetIp, targetPort, logger)
}

public static getInstance(targetIp: string, targetPort: number): Hl7MllpSender {
public static getInstance(targetIp: string, targetPort: number, retries?: number, retryInterval?: number): Hl7MllpSender {
if (!Hl7MllpSender.instance) {
Hl7MllpSender.instance = new Hl7MllpSender(targetIp, targetPort)
Hl7MllpSender.instance = new Hl7MllpSender(targetIp, targetPort, retries, retryInterval)
}
return Hl7MllpSender.instance
}
Expand Down Expand Up @@ -81,6 +82,6 @@ export default class Hl7MllpSender {
}
}

const hl7Sender = Hl7MllpSender.getInstance('127.0.0.1', 3000)
const hl7Sender = Hl7MllpSender.getInstance('127.0.0.1', 3000, config.get("retryConfig:hl7MaxRetries"), config.get("retryConfig:hl7RetryDelay"));

export { hl7Sender }
6 changes: 3 additions & 3 deletions src/lib/kafkaConsumerUtil.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs'
import logger from './winston'
import { WorkflowHandler, WorkflowResult, topicList } from '../workflows/botswana/workflowHandler'

import { config } from '../lib/config'
export type EachMessageCallback = (
topic: string,
partition: number,
Expand Down Expand Up @@ -61,9 +61,9 @@ export class KafkaConsumerUtil {
`Consumer | Recieved message from topic ${topic} on partition ${partition} with offset ${message.offset}`,
)

const maxRetries = 2
const maxRetries = config.get("retryConfig:kafkaMaxRetries") || 2
let retryCount = 0
let retryDelay = 2000
let retryDelay = config.get("retryConfig:kafkaRetryDelay") || 1000
let res: WorkflowResult | null = null

while (retryCount < maxRetries) {
Expand Down
15 changes: 9 additions & 6 deletions src/workflows/botswana/hl7Workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ export default class Hl7WorkflowsBw {
}

static async translateBundle(hl7Msg: string, templateConfigKey: string) {
let maxRetries = config.get('retryConfig:translatorMaxRetries') || 5;
let delay = config.get('retryConfig:translatorRetryDelay') || 2000;

// The errorCheck function defines the criteria for retrying based on the operation's result
const errorCheck = (result: R4.IBundle) => result === this.errorBundle;

Expand All @@ -78,8 +81,8 @@ export default class Hl7WorkflowsBw {
// Use the retryOperation method with the new errorCheck criteria
return await this.retryOperation(
() => this.getHl7Translation(hl7Msg, config.get(templateConfigKey)),
5, // maxRetries
1000, // delay in milliseconds
maxRetries,
delay,
errorCheck,
payloadForDMQ
);
Expand Down Expand Up @@ -140,17 +143,17 @@ export default class Hl7WorkflowsBw {

static async getFhirTranslationWithRetry(bundle: R4.IBundle, template: string): Promise<string> {
// Define your retry parameters
const maxRetries = 3;
const delay = 1000; // Starting delay in ms
const maxRetries = config.get('retryConfig:translatorMaxRetries') || 5
const delay = config.get('retryConfig:translatorRetryDelay') || 2000

const errorCheck = (result: R4.IBundle) => result === this.errorBundle;

const payloadForDMQ = { bundle, template };

return await this.retryOperation(
() => this.getFhirTranslation(bundle, template),
5, // maxRetries
2000, // delay in milliseconds
maxRetries,
delay,
errorCheck,
payloadForDMQ
);
Expand Down
11 changes: 7 additions & 4 deletions src/workflows/botswana/workflowHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,12 @@ export class WorkflowHandler {
public static async sendPayloadWithRetryDMQ(
payload: any,
topic: string,
maxRetries = 2,
retryDelay = 3000,
maxRetries?: number,
retryDelay?: number,
) {
let myMaxRetries = maxRetries || config.get('retryConfig:kafkaMaxRetries') || 5
let myRetryDelay = retryDelay || config.get('retryConfig:kafkaRetryDelay') || 2000

await this.initKafkaProducer()
let val = ''

Expand All @@ -282,7 +285,7 @@ export class WorkflowHandler {

let attempt = 0

while (attempt < maxRetries) {
while (attempt < myMaxRetries) {
try {
logger.info(`Attempt ${attempt + 1}: Sending payload to topic ${topic}!`)
await this.kafka.sendMessageTransactionally(records)
Expand All @@ -291,7 +294,7 @@ export class WorkflowHandler {
error = err
logger.error(`Attempt ${attempt + 1}: Error sending payload to topic ${topic}: ${err}`)
attempt++
await sleep(retryDelay * Math.pow(2, attempt - 1)) // Exponential back-off.
await sleep(myRetryDelay * Math.pow(2, attempt - 1)) // Exponential back-off.
}
}

Expand Down

0 comments on commit cef8c6d

Please sign in to comment.