diff --git a/.eslintcache b/.eslintcache new file mode 100644 index 0000000..fab1978 --- /dev/null +++ b/.eslintcache @@ -0,0 +1 @@ +[{"/root/code/shared-health-record/src/hapi/lab.ts":"1","/root/code/shared-health-record/src/lib/helpers.ts":"2","/root/code/shared-health-record/src/routes/lab-bw.ts":"3","/root/code/shared-health-record/src/server/__tests__/mllpAdapter.ts":"4","/root/code/shared-health-record/src/server/mllpAdapter.ts":"5","/root/code/shared-health-record/src/workflows/__tests__/labWorkflowsBw.ts":"6","/root/code/shared-health-record/src/workflows/botswana/IpmsWorkflows.ts":"7","/root/code/shared-health-record/src/workflows/botswana/LaboratoryServiceRequest.ts":"8","/root/code/shared-health-record/src/workflows/botswana/helpers.ts":"9","/root/code/shared-health-record/src/workflows/botswana/hl7Workflows.ts":"10","/root/code/shared-health-record/src/workflows/botswana/locationWorkflows.ts":"11","/root/code/shared-health-record/src/workflows/botswana/terminologyWorkflows.ts":"12","/root/code/shared-health-record/src/workflows/botswana/workflowHandler.ts":"13"},{"size":3311,"mtime":1699903230423,"results":"14","hashOfConfig":"15"},{"size":640,"mtime":1699903230445},{"size":2063,"mtime":1699903216752,"results":"16","hashOfConfig":"15"},{"size":1869,"mtime":1699903216752,"results":"17","hashOfConfig":"15"},{"size":1922,"mtime":1699903216752,"results":"18","hashOfConfig":"15"},{"size":3128,"mtime":1699903230521},{"size":12060,"mtime":1699903230586},{"size":3260,"mtime":1699903230597},{"size":3145,"mtime":1699903230618},{"size":3699,"mtime":1699903230640,"results":"19","hashOfConfig":"15"},{"size":6117,"mtime":1699903230662,"results":"20","hashOfConfig":"15"},{"size":6528,"mtime":1699903230694,"results":"21","hashOfConfig":"15"},{"size":10465,"mtime":1699903230716},{"filePath":"22","messages":"23","suppressedMessages":"24","errorCount":0,"fatalErrorCount":0,"warningCount":12,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},"ahalsm",{"filePath":"25","messages":"26","suppressedMessages":"27","errorCount":0,"fatalErrorCount":0,"warningCount":0,"fixableErrorCount":0,"fixableWarningCount":0},{"filePath":"28","messages":"29","suppressedMessages":"30","errorCount":0,"fatalErrorCount":0,"warningCount":4,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},{"filePath":"31","messages":"32","suppressedMessages":"33","errorCount":0,"fatalErrorCount":0,"warningCount":8,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},{"filePath":"34","messages":"35","suppressedMessages":"36","errorCount":0,"fatalErrorCount":0,"warningCount":5,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},{"filePath":"37","messages":"38","suppressedMessages":"39","errorCount":0,"fatalErrorCount":0,"warningCount":0,"fixableErrorCount":0,"fixableWarningCount":0},{"filePath":"40","messages":"41","suppressedMessages":"42","errorCount":0,"fatalErrorCount":0,"warningCount":10,"fixableErrorCount":0,"fixableWarningCount":0,"source":null},"/root/code/shared-health-record/src/hapi/lab.ts",["43","44","45","46","47","48","49","50","51","52","53","54"],[],"/root/code/shared-health-record/src/routes/lab-bw.ts",[],[],"/root/code/shared-health-record/src/server/__tests__/mllpAdapter.ts",["55","56","57","58"],[],"/root/code/shared-health-record/src/server/mllpAdapter.ts",["59","60","61","62","63","64","65","66"],[],"/root/code/shared-health-record/src/workflows/botswana/hl7Workflows.ts",["67","68","69","70","71"],[],"/root/code/shared-health-record/src/workflows/botswana/locationWorkflows.ts",[],[],"/root/code/shared-health-record/src/workflows/botswana/terminologyWorkflows.ts",["72","73","74","75","76","77","78","79","80","81"],[],{"ruleId":"82","severity":1,"message":"83","line":8,"column":7,"nodeType":"84","messageId":"85","endLine":8,"endColumn":11},{"ruleId":"86","severity":1,"message":"87","line":8,"column":7,"nodeType":"84","messageId":"85","endLine":8,"endColumn":11},{"ruleId":"88","severity":1,"message":"89","line":27,"column":70,"nodeType":"90","messageId":"91","endLine":27,"endColumn":73,"suggestions":"92"},{"ruleId":"88","severity":1,"message":"89","line":29,"column":21,"nodeType":"90","messageId":"91","endLine":29,"endColumn":24,"suggestions":"93"},{"ruleId":"82","severity":1,"message":"94","line":29,"column":26,"nodeType":"84","messageId":"85","endLine":29,"endColumn":44},{"ruleId":"86","severity":1,"message":"95","line":29,"column":26,"nodeType":"84","messageId":"85","endLine":29,"endColumn":44},{"ruleId":"82","severity":1,"message":"96","line":31,"column":3,"nodeType":"84","messageId":"85","endLine":31,"endColumn":12},{"ruleId":"86","severity":1,"message":"97","line":31,"column":3,"nodeType":"84","messageId":"85","endLine":31,"endColumn":12},{"ruleId":"88","severity":1,"message":"89","line":52,"column":19,"nodeType":"90","messageId":"91","endLine":52,"endColumn":22,"suggestions":"98"},{"ruleId":"82","severity":1,"message":"99","line":68,"column":9,"nodeType":"84","messageId":"85","endLine":68,"endColumn":19},{"ruleId":"86","severity":1,"message":"100","line":68,"column":9,"nodeType":"84","messageId":"85","endLine":68,"endColumn":19},{"ruleId":"88","severity":1,"message":"89","line":89,"column":19,"nodeType":"90","messageId":"91","endLine":89,"endColumn":22,"suggestions":"101"},{"ruleId":"88","severity":1,"message":"89","line":26,"column":19,"nodeType":"90","messageId":"91","endLine":26,"endColumn":22,"suggestions":"102"},{"ruleId":"103","severity":1,"message":"104","line":59,"column":12,"nodeType":"105","messageId":"106","endLine":59,"endColumn":46},{"ruleId":"103","severity":1,"message":"104","line":59,"column":12,"nodeType":"105","messageId":"106","endLine":59,"endColumn":38,"suggestions":"107"},{"ruleId":"103","severity":1,"message":"104","line":59,"column":12,"nodeType":"105","messageId":"106","endLine":59,"endColumn":25,"suggestions":"108"},{"ruleId":"88","severity":1,"message":"89","line":12,"column":37,"nodeType":"90","messageId":"91","endLine":12,"endColumn":40,"suggestions":"109"},{"ruleId":"82","severity":1,"message":"110","line":15,"column":24,"nodeType":"84","messageId":"85","endLine":15,"endColumn":34},{"ruleId":"86","severity":1,"message":"111","line":15,"column":24,"nodeType":"84","messageId":"85","endLine":15,"endColumn":34},{"ruleId":"88","severity":1,"message":"89","line":17,"column":39,"nodeType":"90","messageId":"91","endLine":17,"endColumn":42,"suggestions":"112"},{"ruleId":"82","severity":1,"message":"113","line":18,"column":13,"nodeType":"84","messageId":"85","endLine":18,"endColumn":26},{"ruleId":"86","severity":1,"message":"114","line":18,"column":13,"nodeType":"84","messageId":"85","endLine":18,"endColumn":26},{"ruleId":"88","severity":1,"message":"89","line":30,"column":36,"nodeType":"90","messageId":"91","endLine":30,"endColumn":39,"suggestions":"115"},{"ruleId":"88","severity":1,"message":"89","line":30,"column":50,"nodeType":"90","messageId":"91","endLine":30,"endColumn":53,"suggestions":"116"},{"ruleId":"88","severity":1,"message":"89","line":38,"column":21,"nodeType":"90","messageId":"91","endLine":38,"endColumn":24,"suggestions":"117"},{"ruleId":"88","severity":1,"message":"89","line":47,"column":21,"nodeType":"90","messageId":"91","endLine":47,"endColumn":24,"suggestions":"118"},{"ruleId":"88","severity":1,"message":"89","line":69,"column":32,"nodeType":"90","messageId":"91","endLine":69,"endColumn":35,"suggestions":"119"},{"ruleId":"88","severity":1,"message":"89","line":84,"column":21,"nodeType":"90","messageId":"91","endLine":84,"endColumn":24,"suggestions":"120"},{"ruleId":"88","severity":1,"message":"89","line":110,"column":21,"nodeType":"90","messageId":"91","endLine":110,"endColumn":24,"suggestions":"121"},{"ruleId":"103","severity":1,"message":"104","line":20,"column":21,"nodeType":"105","messageId":"106","endLine":20,"endColumn":37},{"ruleId":"88","severity":1,"message":"89","line":130,"column":11,"nodeType":"90","messageId":"91","endLine":130,"endColumn":14,"suggestions":"122"},{"ruleId":"88","severity":1,"message":"89","line":136,"column":13,"nodeType":"90","messageId":"91","endLine":136,"endColumn":16,"suggestions":"123"},{"ruleId":"88","severity":1,"message":"89","line":143,"column":29,"nodeType":"90","messageId":"91","endLine":143,"endColumn":32,"suggestions":"124"},{"ruleId":"88","severity":1,"message":"89","line":149,"column":54,"nodeType":"90","messageId":"91","endLine":149,"endColumn":57,"suggestions":"125"},{"ruleId":"88","severity":1,"message":"89","line":165,"column":50,"nodeType":"90","messageId":"91","endLine":165,"endColumn":53,"suggestions":"126"},{"ruleId":"88","severity":1,"message":"89","line":185,"column":60,"nodeType":"90","messageId":"91","endLine":185,"endColumn":63,"suggestions":"127"},{"ruleId":"82","severity":1,"message":"128","line":194,"column":16,"nodeType":"84","messageId":"85","endLine":194,"endColumn":29},{"ruleId":"86","severity":1,"message":"129","line":194,"column":16,"nodeType":"84","messageId":"85","endLine":194,"endColumn":29},{"ruleId":"88","severity":1,"message":"89","line":194,"column":60,"nodeType":"90","messageId":"91","endLine":194,"endColumn":63,"suggestions":"130"},"unused-imports/no-unused-vars","'util' is assigned a value but never used. Allowed unused vars must match /^_/u.","Identifier","unusedVar","@typescript-eslint/no-unused-vars","'util' is assigned a value but never used.","@typescript-eslint/no-explicit-any","Unexpected any. Specify a different type.","TSAnyKeyword","unexpectedAny",["131","132"],["133","134"],"'statusCode' is defined but never used. Allowed unused vars must match /^_/u.","'statusCode' is defined but never used.","'noCaching' is assigned a value but never used. Allowed unused vars must match /^_/u.","'noCaching' is assigned a value but never used.",["135","136"],"'requestUri' is assigned a value but never used. Allowed unused vars must match /^_/u.","'requestUri' is assigned a value but never used.",["137","138"],["139","140"],"@typescript-eslint/no-non-null-assertion","Forbidden non-null assertion.","TSNonNullExpression","noNonNull",["141"],["142"],["143","144"],"'err' is defined but never used. Allowed unused args must match /^_/u.","'err' is defined but never used.",["145","146"],"'start' is assigned a value but never used. Allowed unused vars must match /^_/u.","'start' is assigned a value but never used.",["147","148"],["149","150"],["151","152"],["153","154"],["155","156"],["157","158"],["159","160"],["161","162"],["163","164"],["165","166"],["167","168"],["169","170"],["171","172"],"'getOclConcept' is defined but never used. Allowed unused vars must match /^_/u.","'getOclConcept' is defined but never used.",["173","174"],{"messageId":"175","fix":"176","desc":"177"},{"messageId":"178","fix":"179","desc":"180"},{"messageId":"175","fix":"181","desc":"177"},{"messageId":"178","fix":"182","desc":"180"},{"messageId":"175","fix":"183","desc":"177"},{"messageId":"178","fix":"184","desc":"180"},{"messageId":"175","fix":"185","desc":"177"},{"messageId":"178","fix":"186","desc":"180"},{"messageId":"175","fix":"187","desc":"177"},{"messageId":"178","fix":"188","desc":"180"},{"messageId":"189","fix":"190","desc":"191"},{"messageId":"189","fix":"192","desc":"191"},{"messageId":"175","fix":"193","desc":"177"},{"messageId":"178","fix":"194","desc":"180"},{"messageId":"175","fix":"195","desc":"177"},{"messageId":"178","fix":"196","desc":"180"},{"messageId":"175","fix":"197","desc":"177"},{"messageId":"178","fix":"198","desc":"180"},{"messageId":"175","fix":"199","desc":"177"},{"messageId":"178","fix":"200","desc":"180"},{"messageId":"175","fix":"201","desc":"177"},{"messageId":"178","fix":"202","desc":"180"},{"messageId":"175","fix":"203","desc":"177"},{"messageId":"178","fix":"204","desc":"180"},{"messageId":"175","fix":"205","desc":"177"},{"messageId":"178","fix":"206","desc":"180"},{"messageId":"175","fix":"207","desc":"177"},{"messageId":"178","fix":"208","desc":"180"},{"messageId":"175","fix":"209","desc":"177"},{"messageId":"178","fix":"210","desc":"180"},{"messageId":"175","fix":"211","desc":"177"},{"messageId":"178","fix":"212","desc":"180"},{"messageId":"175","fix":"213","desc":"177"},{"messageId":"178","fix":"214","desc":"180"},{"messageId":"175","fix":"215","desc":"177"},{"messageId":"178","fix":"216","desc":"180"},{"messageId":"175","fix":"217","desc":"177"},{"messageId":"178","fix":"218","desc":"180"},{"messageId":"175","fix":"219","desc":"177"},{"messageId":"178","fix":"220","desc":"180"},{"messageId":"175","fix":"221","desc":"177"},{"messageId":"178","fix":"222","desc":"180"},{"messageId":"175","fix":"223","desc":"177"},{"messageId":"178","fix":"224","desc":"180"},"suggestUnknown",{"range":"225","text":"226"},"Use `unknown` instead, this will force you to explicitly, and safely assert the type is correct.","suggestNever",{"range":"225","text":"227"},"Use `never` instead, this is useful when instantiating generic type parameters that you don't need to know the type of.",{"range":"228","text":"226"},{"range":"228","text":"227"},{"range":"229","text":"226"},{"range":"229","text":"227"},{"range":"230","text":"226"},{"range":"230","text":"227"},{"range":"231","text":"226"},{"range":"231","text":"227"},"suggestOptionalChain",{"range":"232","text":"233"},"Consider using the optional chain operator `?.` instead. This operator includes runtime checks, so it is safer than the compile-only non-null assertion operator.",{"range":"234","text":"235"},{"range":"236","text":"226"},{"range":"236","text":"227"},{"range":"237","text":"226"},{"range":"237","text":"227"},{"range":"238","text":"226"},{"range":"238","text":"227"},{"range":"239","text":"226"},{"range":"239","text":"227"},{"range":"240","text":"226"},{"range":"240","text":"227"},{"range":"241","text":"226"},{"range":"241","text":"227"},{"range":"242","text":"226"},{"range":"242","text":"227"},{"range":"243","text":"226"},{"range":"243","text":"227"},{"range":"244","text":"226"},{"range":"244","text":"227"},{"range":"245","text":"226"},{"range":"245","text":"227"},{"range":"246","text":"226"},{"range":"246","text":"227"},{"range":"247","text":"226"},{"range":"247","text":"227"},{"range":"248","text":"226"},{"range":"248","text":"227"},{"range":"249","text":"226"},{"range":"249","text":"227"},{"range":"250","text":"226"},{"range":"250","text":"227"},{"range":"251","text":"226"},{"range":"251","text":"227"},[764,767],"unknown","never",[867,870],[1394,1397],[2451,2454],[873,876],[1833,1834],"?",[1820,1821],"?.",[396,399],[580,583],[981,984],[995,998],[1172,1175],[1478,1481],[2219,2222],[2716,2719],[3482,3485],[4056,4059],[4242,4245],[4521,4524],[4809,4812],[5262,5265],[5719,5722],[6042,6045]] \ No newline at end of file diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cc5a913..c039b41 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,9 +11,10 @@ on: workflow_dispatch: env: - REGISTRY: ghcr.io + REGISTRY: docker.io IMAGE_NAME: ${{ github.repository }} SHR_VERSION: ci + DOCKER_NAME: ${{ secrets.DOCKERHUB_USERNAME }}/${{ github.repository }} jobs: unit-test: @@ -47,123 +48,36 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v3 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 - - - name: Extract metadata (tags, labels) for Docker + uses: actions/checkout@v4 + - + # Add support for more platforms with QEMU (optional) + # https://github.com/docker/setup-qemu-action + name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - + name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - + name: Docker meta id: meta - uses: docker/metadata-action@v4 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - - - name: Change repo info to lowercase - id: string - uses: ASzc/change-string-case-action@v5 + uses: docker/metadata-action@v5 with: - string: ${{ github.repository }} - - - name: Get Package Version - id: package-version - uses: martinbeentjes/npm-get-version-action@main - - - name: Check custom tag - run: echo ${{ env.REGISTRY }}/${{ steps.string.outputs.lowercase }}:${{steps.package-version.outputs.current-version}} - - - name: Build SHR - uses: docker/build-push-action@v3 + images: ${{ env.DOCKER_NAME }} + - + name: Login to DockerHub + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 with: - tags: ghcr.io/i-tech-uw/shared-health-record:ci - push: false - load: true - context: . - cache-from: type=gha - cache-to: type=gha,mode=max - - # - name: Pull containers - # run: docker-compose pull shr-fhir openhim-core mongo-db newman kafka zookeeper - - # - name: Cache containers - # uses: satackey/action-docker-layer-caching@v0.0.11 - # continue-on-error: true - - # - name: Start containers - # run: docker-compose up -d shr-fhir mongo-db openhim-core kafka zookeeper - - # - name: Load openhim config - # run: docker-compose up -d openhim-config - - # - name: Sleep for 60 seconds - # run: sleep 90s - # shell: bash - - # - name: Display docker logs for openhim config - # run: docker-compose logs openhim-config - - # - name: Run SHR image - # run: docker-compose up -d shr - - # - name: Sleep for 30 seconds - # run: sleep 30s - # shell: bash - - # - name: Show SHR Log - # run: docker-compose logs shr - - # - name: Show kafka Log - # run: docker-compose logs kafka - - # - name: Check openhim-core - # run: curl -sSk https://localhost:8080/heartbeat - - # - name: Show containers - # run: docker-compose ps - - # - name: Show OpenHIM Log - # run: docker-compose logs openhim-core - - # # Postman Testing - see https://www.postman.com/itechuw/workspace/shared-health-record/ - # - name: Run General Tests Collection - # env: - # POSTMAN_COLLECTION: /.postman/collections/1_general_tests.json - # run: docker-compose up -d newman - - # - name: Run Laboratory Workflows Collection (collection/1525496-f28b488f-a40c-4723-8a72-a2b4f64bb5da) - # env: - # POSTMAN_COLLECTION: /.postman/collections/2_laboratory_workflows.json - # run: docker-compose up --exit-code-from newman newman - # continue-on-error: true - - # - name: Run MLLP Tests - # run: docker-compose up --exit-code-from mllp_tests mllp_tests - - # - name: Show SHR Log - # if: always() - # run: docker-compose logs shr - - # - name: Show kafka Log - # if: always() - # run: docker-compose logs kafka - - # - name: Stop containers - # if: always() - # run: docker-compose down - - - name: Log in to the Container registry - uses: docker/login-action@v2 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Publish Docker image - uses: docker/build-push-action@v3 - if: github.event_name == 'release' + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - + name: Build and push + uses: docker/build-push-action@v5 with: context: . - push: true - tags: ${{ steps.meta.outputs.tags }},${{ env.REGISTRY }}/${{ steps.string.outputs.lowercase }}:${{steps.package-version.outputs.current-version}} + push: ${{ github.event_name != 'pull_request' }} + load: ${{ github.event_name == 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha - cache-to: type=gha,mode=max + cache-from: type=registry,ref=${{github.repository}}:latest + cache-to: type=inline diff --git a/config/config_docker.json b/config/config_docker.json index 1fb3741..a491ec9 100644 --- a/config/config_docker.json +++ b/config/config_docker.json @@ -34,6 +34,8 @@ "cielSystemUrl": "https://openconceptlab.org/orgs/CIEL/sources/CIEL", "loincSystemUrl": "https://api.openconceptlab.org/orgs/Regenstrief/sources/LOINC/", "omangSystemUrl": "http://moh.bw.org/ext/identifier/omang", + "brdsSystemUrl": "http://moh.bw.org/ext/identifier/bcn", + "immigrationSystemUrl": "http://moh.bw.org/ext/identifier/passportno", "oclUrl": "https://api.openconceptlab.org", "facilityCodeSystemUrl": "http://moh.bw.org/ext/identifier/facility-code", "ipmsProviderSystemUrl": "http://moh.bw.org/ext/ipms-provider", diff --git a/src/hapi/lab.ts b/src/hapi/lab.ts index 91f95dd..6cd2260 100644 --- a/src/hapi/lab.ts +++ b/src/hapi/lab.ts @@ -8,14 +8,21 @@ import config from '../lib/config' const util = import('util') import logger from '../lib/winston' +import { postWithRetry } from '../workflows/botswana/helpers' let uri = URI(config.get('fhirServer:baseURL')) -// TODO: change source utils to use got() & await pattern -// Promisify fns +class HapiError extends Error { + constructor(message: string) { + super(message) + this.name = 'HapiError' -// const mpiClient = fhirClient(req, res).client({ serverUrl: mpiUrl, username: config.get('fhirServer:username'), password: config.get('fhirServer:password')}); -// const shrClient = fhirClient(req, res).client({ serverUrl: shrUrl, username: config.get('fhirServer:username'), password: config.get('fhirServer:password')}); + // Maintains proper stack trace for where our error was thrown (only available on V8) + if (Error.captureStackTrace) { + Error.captureStackTrace(this, HapiError) + } + } +} export async function getResource(type: string, id: string, params?: any, noCaching?: boolean) { // return got.get(`${SHR_URL}/${type}/${id}`).json() @@ -76,23 +83,13 @@ export async function saveBundle(bundle: R4.IBundle): Promise { bundle = translateToTransactionBundle(bundle) } try { - // logger.info(JSON.stringify(bundle)) - - const ret = await got.post(uri.toString(), { json: bundle }).json() - + const ret = await postWithRetry(uri.toString(), { json: bundle }) + logger.info(`Saved bundle to FHIR store!`) return ret } catch (error: any) { - return { - resourceType: 'Bundle', - type: BundleTypeKind._transactionResponse, - entry: [ - { - response: { - status: `${error.message}`, - }, - }, - ], - } + logger.error(`Could not save bundle: ${error.response.body}`) + + throw new HapiError('Could not save bundle to hapi server!') } } diff --git a/src/lib/helpers.ts b/src/lib/helpers.ts index 8d228db..29ef3a1 100644 --- a/src/lib/helpers.ts +++ b/src/lib/helpers.ts @@ -1,5 +1,6 @@ 'use strict' + export function invalidBundle(resource: any): boolean { return ( !resource.resourceType || @@ -24,14 +25,3 @@ export function invalidBundleMessage(): any { }, } } - -// export function getFromFhirServer(resourcePath, ) -// await got -// .get( -// `${config.get('fhirServer:baseURL')}/Patient?_id=${patientId}&_include=*&_revinclude=*`, -// { -// username: config.get('fhirServer:username'), -// password: config.get('fhirServer:password'), -// }, -// ) -// .json() diff --git a/src/lib/kafkaConsumerUtil.ts b/src/lib/kafkaConsumerUtil.ts index b928b4e..43ef37f 100644 --- a/src/lib/kafkaConsumerUtil.ts +++ b/src/lib/kafkaConsumerUtil.ts @@ -36,10 +36,10 @@ export class KafkaConsumerUtil { eachBatchAutoResolve: false, eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }: EachBatchPayload) => { const { topic, partition } = batch; - + for (const message of batch.messages) { if (!isRunning() || isStale()) return; - + logger.info({ topic, partition, @@ -47,10 +47,29 @@ export class KafkaConsumerUtil { value: message.value?.toString(), }); - await eachMessageCallback(topic, partition, message) - - resolveOffset(message.offset); - await heartbeat(); + const maxRetries = 6; + let retryCount = 0; + let retryDelay = 1000; + + while (retryCount < maxRetries) { + try { + await eachMessageCallback(topic, partition, message); + resolveOffset(message.offset); + await heartbeat(); + break; // Break the loop if processing succeeds + } catch (error) { + logger.error(`Error processing message ${message.offset}: ${error}`); + retryCount++; + if (retryCount >= maxRetries) { + logger.error(`Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`); + // TODO: handle with DLQ + break; + } + await new Promise(resolve => setTimeout(resolve, retryDelay)); + retryDelay *= 2; // Double the delay for the next retry + await heartbeat(); // Important to call heartbeat to keep the session alive + } + } } }, }); diff --git a/src/routes/lab-bw.ts b/src/routes/lab-bw.ts index 28f2888..1c05596 100644 --- a/src/routes/lab-bw.ts +++ b/src/routes/lab-bw.ts @@ -5,7 +5,7 @@ import express, { Request, Response } from 'express' import { saveBundle } from '../hapi/lab' import { invalidBundle, invalidBundleMessage } from '../lib/helpers' import logger from '../lib/winston' -import { LabWorkflowsBw } from '../workflows/labWorkflowsBw' +import { WorkflowHandler } from '../workflows/botswana/workflowHandler' export const router = express.Router() @@ -41,13 +41,15 @@ router.all('/', async (req: Request, res: Response) => { orderBundle.entry && resultBundle.entry.length == orderBundle.entry.length ) { - LabWorkflowsBw.handleBwLabOrder(orderBundle, resultBundle) + WorkflowHandler.handleLabOrder(orderBundle) return res.status(200).json(resultBundle) } else { return res.status(400).send(resultBundle) } } catch (e) { - return res.status(500).send(e) + logger.error(`Error saving bundle: ${e}`) + + return res.status(500).send("Couldn't save bundle!") } } }) diff --git a/src/server/__tests__/mllpAdapter.ts b/src/server/__tests__/mllpAdapter.ts index fcf1e37..ee2f08e 100644 --- a/src/server/__tests__/mllpAdapter.ts +++ b/src/server/__tests__/mllpAdapter.ts @@ -1,7 +1,7 @@ import { BundleTypeKind, IBundle } from '@ahryman40k/ts-fhir-types/lib/R4' import fs from 'fs/promises' import path from 'path' -import Hl7Workflows from '../../workflows/hl7WorkflowsBw' +import Hl7Workflows from '../../workflows/botswana/hl7Workflows' import MllpAdapter from '../mllpAdapter' describe('MllpAdapter#handleMessage', () => { @@ -21,11 +21,11 @@ describe('MllpAdapter#handleMessage', () => { const saveAdtMessageSpy = jest .spyOn(Hl7Workflows, 'handleAdtMessage') - .mockReturnValue(returnPromise) + .mockReturnValue(Promise.resolve(undefined)) const result: any = await mllp.handleMessage(msg) - expect(result).toEqual(returnBundle) + expect(result).toEqual(undefined) expect(saveAdtMessageSpy).toHaveBeenCalledTimes(1) saveAdtMessageSpy.mockRestore() diff --git a/src/server/kafkaWorkers.ts b/src/server/kafkaWorkers.ts index b842d10..d1b87ff 100644 --- a/src/server/kafkaWorkers.ts +++ b/src/server/kafkaWorkers.ts @@ -1,6 +1,6 @@ import { KafkaConfig, Message, logLevel } from 'kafkajs' import logger from '../lib/winston' -import { LabWorkflowsBw, topicList } from '../workflows/labWorkflowsBw' +import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler' import { config } from '../lib/config' import { KafkaConsumerUtil } from '../lib/kafkaConsumerUtil' @@ -70,20 +70,18 @@ const initAndConsume = async (topics: string[]) => { }; async function processMessage(topic: string, partition: number, message: Message): Promise { + // There is no additional error handling in this message, since any exceptions or problems will need to be + // logged and handled by the Kafka consumer retry logic in the KafkaConsumerUtil class. + logger.info(`Recieved message from topic ${topic} on partition ${partition}`) - try { - let val = '' - const res = null + let val = '' + const res = null - if (message.value) { - val = message.value.toString() - } - - LabWorkflowsBw.executeTopicWorkflow(topic, val) - } catch (error) { - logger.error(`Could not complete task from topic ${topic}!`) - - logger.error(error) + if (message.value) { + val = message.value.toString() } + + // This method needs to bubble up any exceptions to the Kafka consumer retry logic in the KafkaConsumerUtil class. + WorkflowHandler.executeTopicWorkflow(topic, val) } \ No newline at end of file diff --git a/src/server/mllpAdapter.ts b/src/server/mllpAdapter.ts index 46988f0..2b0698e 100644 --- a/src/server/mllpAdapter.ts +++ b/src/server/mllpAdapter.ts @@ -2,7 +2,7 @@ import { BundleTypeKind, IBundle } from '@ahryman40k/ts-fhir-types/lib/R4' import { MllpServer } from '@i-tech-uw/mllp-server' import config from '../lib/config' import logger from '../lib/winston' -import Hl7WorkflowsBw from '../workflows/hl7WorkflowsBw' +import Hl7WorkflowsBw from '../workflows/botswana/hl7Workflows' import { Logger } from 'winston' @@ -27,7 +27,7 @@ export default class MllpAdapter { }) } - public async handleMessage(data: any): Promise { + public async handleMessage(data: any): Promise { try { logger.info('received payload:', data) // Determine Message Type diff --git a/src/server/shrMediator.ts b/src/server/shrMediator.ts index b00fd34..4fac110 100644 --- a/src/server/shrMediator.ts +++ b/src/server/shrMediator.ts @@ -4,7 +4,7 @@ import shrApp from '../lib/shr' import logger from '../lib/winston' import medUtils from 'openhim-mediator-utils' -import { LabWorkflowsBw } from '../workflows/labWorkflowsBw' +import { WorkflowHandler } from '../workflows/botswana/workflowHandler' const errorTypes = ['unhandledRejection', 'uncaughtException'] @@ -27,7 +27,7 @@ export class ShrMediator { public async start(callback: any) { logger.info('Running SHR as a mediator with' + `${__dirname}/${this.config}`) try { - await LabWorkflowsBw.initKafkaProducer() + await WorkflowHandler.initKafkaProducer() medUtils.registerMediator( config.get('mediator:api'), this.config, @@ -35,7 +35,7 @@ export class ShrMediator { ) } catch (e: any) { logger.error(`Could not start SHR as a Mediator!\n${JSON.stringify(e)}`) - await LabWorkflowsBw.shutdownKafkaProducer() + await WorkflowHandler.shutdownKafkaProducer() process.exit(1) } @@ -44,7 +44,7 @@ export class ShrMediator { try { logger.error(`process.on ${type}`) logger.error(e) - await LabWorkflowsBw.shutdownKafkaProducer() + await WorkflowHandler.shutdownKafkaProducer() process.exit(0) } catch (_) { process.exit(1) @@ -55,7 +55,7 @@ export class ShrMediator { signalTraps.map(type => { process.once(type, async () => { try { - await LabWorkflowsBw.shutdownKafkaProducer() + await WorkflowHandler.shutdownKafkaProducer() } finally { process.kill(process.pid, type) } diff --git a/src/workflows/__tests__/labWorkflowsBw.ts b/src/workflows/__tests__/labWorkflowsBw.ts index ca337cf..de6f9cf 100644 --- a/src/workflows/__tests__/labWorkflowsBw.ts +++ b/src/workflows/__tests__/labWorkflowsBw.ts @@ -3,7 +3,10 @@ import { MllpServer } from '@i-tech-uw/mllp-server' import got from 'got' import nock from 'nock' import config from '../../lib/config' -import { LabWorkflowsBw } from '../labWorkflowsBw' +import { mapConcepts } from '../botswana/terminologyWorkflows' +import { getTaskStatus, setTaskStatus } from '../botswana/helpers' +import { sendAdtToIpms } from '../botswana/IpmsWorkflows' + const IG_URL = 'https://b-techbw.github.io/bw-lab-ig' let patient: R4.IPatient @@ -13,13 +16,9 @@ import fs from 'fs' describe('lab Workflows for Botswana should', () => { describe('translatePimsCoding', () => { it('should translate a given lab test PIMS coding to ciel, loinc, and IPMS', async () => { - const serviceRequest = ( - await got.get(IG_URL + '/ServiceRequest-example-bw-pims-service-request-1.json').json() - ) - - serviceRequest.code!.coding![0].code! = '3' - const result = await LabWorkflowsBw.translateCoding(serviceRequest) + const bundle = await got.get(IG_URL + '/Bundle-example-bw-lab-bundle.json').json() + const result = await mapConcepts(bundle) expect(result).toBeDefined }) }) @@ -28,8 +27,7 @@ describe('lab Workflows for Botswana should', () => { it('should get a Task status from a Bundle', async () => { const bundle = await got.get(IG_URL + '/Bundle-example-bw-lab-bundle.json').json() - const result: R4.TaskStatusKind | undefined = - LabWorkflowsBw.getTaskStatus(bundle) ?? undefined + const result: R4.TaskStatusKind | undefined = getTaskStatus(bundle) ?? undefined expect(result).toEqual(R4.TaskStatusKind._requested) }) @@ -41,9 +39,9 @@ describe('lab Workflows for Botswana should', () => { const status = R4.TaskStatusKind._accepted - const result: R4.IBundle = LabWorkflowsBw.setTaskStatus(bundle, status) + const result: R4.IBundle = setTaskStatus(bundle, status) - expect(LabWorkflowsBw.getTaskStatus(result)).toEqual(status) + expect(getTaskStatus(result)).toEqual(status) }) }) @@ -92,7 +90,7 @@ describe('lab Workflows for Botswana should', () => { (bundle.entry[taskIndex].resource!).status = R4.TaskStatusKind._draft } - const result: R4.IBundle = await LabWorkflowsBw.sendAdtToIpms(bundle) + const result: R4.IBundle = await sendAdtToIpms(bundle) }) }) }) diff --git a/src/workflows/botswana/IpmsWorkflows.ts b/src/workflows/botswana/IpmsWorkflows.ts new file mode 100644 index 0000000..e239c28 --- /dev/null +++ b/src/workflows/botswana/IpmsWorkflows.ts @@ -0,0 +1,370 @@ +import { R4 } from '@ahryman40k/ts-fhir-types' +import config from '../../lib/config' +import logger from '../../lib/winston' +import { getTaskStatus, setTaskStatus } from './helpers' +import Hl7MllpSender from '../../lib/hl7MllpSender' +import Hl7WorkflowsBw from '../botswana/hl7Workflows' +import got from 'got' +import { + BundleTypeKind, + Bundle_RequestMethodKind, + IBundle, + IDiagnosticReport, + IObservation, + IPatient, + IReference, + IServiceRequest, + TaskStatusKind, +} from '@ahryman40k/ts-fhir-types/lib/R4' +import { saveBundle } from '../../hapi/lab' + +// New Error Type for IPMS Workflow Errors +export class IpmsWorkflowError extends Error { + constructor(message: string) { + super(message) + this.name = 'IpmsWorkflowError' + } +} + +/** + * Sends an ADT message to IPMS. + * @param labBundle The lab bundle to send. + * @returns The updated lab bundle. + */ +export async function sendAdtToIpms(labBundle: R4.IBundle): Promise { + const status = getTaskStatus(labBundle) + + if (status && status === R4.TaskStatusKind._requested) { + logger.info('Sending ADT message to IPMS!') + + const sender = new Hl7MllpSender( + config.get('bwConfig:mllp:targetIp'), + config.get('bwConfig:mllp:targetAdtPort'), + ) + + const adtMessage = await Hl7WorkflowsBw.getFhirTranslation( + labBundle, + config.get('bwConfig:toIpmsAdtTemplate'), + ) + + logger.info(`adt:\n${adtMessage}`) + + const adtResult: string = await sender.send(adtMessage) + + if (adtResult.includes && adtResult.includes('AA')) { + labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._accepted) + } + } else { + logger.info('Order not ready for IPMS.') + } + return labBundle +} + +export async function sendOrmToIpms(bundles: any): Promise { + const srBundle: R4.IBundle = { resourceType: 'Bundle', entry: [] } + let labBundle = bundles.taskBundle + const patient = bundles.patient + + try { + // Replace PIMS/OpenMRS Patient Resource with one From IPMS Lab System + const pindex = labBundle.entry!.findIndex((entry: any) => { + return entry.resource && entry.resource.resourceType == 'Patient' + }) + + labBundle.entry[pindex].resource = patient + + const options = { + timeout: config.get('bwConfig:requestTimeout'), + searchParams: {}, + } + + const sendBundle = { ...labBundle } + sendBundle.entry = [] + srBundle.entry = [] + + // Compile sendBundle.entry from labBundle + // TODO: Outline Logic for mapping between Panels and sending multiple tests + for (const entry of labBundle.entry) { + // Isolate and process ServiceRequests + if (entry.resource && entry.resource.resourceType == 'ServiceRequest') { + // For PIMS - check if service request is profile-level and get child service requests: + options.searchParams = { + 'based-on': entry.resource.id, + } + + const fetchedBundle = // TODO: Retry logic + await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json() + + if (fetchedBundle && fetchedBundle.entry && srBundle.entry) { + // Add child ServiceRequests if any exist + srBundle.entry = srBundle.entry.concat(fetchedBundle.entry) + } else if ( + (!fetchedBundle || !(fetchedBundle.entry && fetchedBundle.entry.length > 0)) && + srBundle.entry + ) { + // If no child ServiceRequests, add this one if it has a code entry + if ( + entry.resource.code && + entry.resource.code.coding && + entry.resource.code.coding.length > 0 + ) { + srBundle.entry.push(entry) + } + } + } else { + // Copy over everything else + sendBundle.entry.push(entry) + } + } + + // Send one ORM for each ServiceRequest + // TODO: FIGURE OUT MANAGEMENT OF PANELS/PROFILES + for (const sr of srBundle.entry) { + // Send one ORM for each ServiceRequest + const outBundle = { ...sendBundle } + outBundle.entry.push(sr) + + const ormMessage = await Hl7WorkflowsBw.getFhirTranslation( + outBundle, + config.get('bwConfig:toIpmsOrmTemplate'), + ) + + const sender = new Hl7MllpSender( + config.get('bwConfig:mllp:targetIp'), + config.get('bwConfig:mllp:targetOrmPort'), + ) + + logger.info('Sending ORM message to IPMS!') + + logger.info(`orm:\n${ormMessage}\n`) + + if (ormMessage && ormMessage != '') { + const result: any = await sender.send(ormMessage) + if (result.includes('AA')) { + labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._inProgress) + } + logger.info(`*result:\n${result}\n`) + } + } + } catch (e) { + logger.error(`Could not send ORM message to IPMS!\n${e}`) + throw new IpmsWorkflowError(`Could not send ORM message to IPMS!\n${e}`) + } + return labBundle +} + +/** + * Handles ADT (Admission, Discharge, Transfer) messages received from IPMS (Integrated Patient Management System). + * @param registrationBundle - The registration bundle containing the patient information. + * @returns A Promise that resolves to the registration bundle. + */ +export async function handleAdtFromIpms(adtMessage: string): Promise { + try { + const registrationBundle: R4.IBundle = await Hl7WorkflowsBw.translateBundle( + adtMessage, + 'bwConfig:fromIpmsAdtTemplate', + ) + + if (registrationBundle === Hl7WorkflowsBw.errorBundle) { + throw new Error('Could not translate ADT message!') + } + + const options = { + timeout: config.get('bwConfig:requestTimeout'), + searchParams: {}, + } + + // Get patient from registration Bundle + let patient: R4.IPatient, omang: string + const patEntry = registrationBundle.entry!.find(entry => { + return entry.resource && entry.resource.resourceType == 'Patient' + }) + + if (patEntry && patEntry.resource) { + patient = patEntry.resource + + const omangEntry = patient.identifier?.find( + i => i.system && i.system == config.get('bwConfig:omangSystemUrl'), + ) + + if (omangEntry) { + omang = omangEntry.value! + } else { + logger.error( + 'Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.', + ) + return registrationBundle + } + + // Find all patients with this Omang. + options.searchParams = { + identifier: `${config.get('bwConfig:omangSystemUrl')}|${omang}`, + _revinclude: 'Task:patient', + } + + let patientTasks: R4.IBundle + try { + patientTasks = await got.get(`${config.get('fhirServer:baseURL')}/Patient`, options).json() + } catch (e) { + patientTasks = { resourceType: 'Bundle' } + logger.error(e) + } + + if (patientTasks && patientTasks.entry) { + // Get all Tasks with `requested` status + for (const e of patientTasks.entry!) { + if ( + e.resource && + e.resource.resourceType == 'Task' && + e.resource.status == TaskStatusKind._requested + ) { + // Grab bundle for task: + options.searchParams = { + _include: '*', + _id: e.resource.id, + } + + const taskBundle: IBundle = await got + .get(`${config.get('fhirServer:baseURL')}/Task`, options) + .json() + return { patient: patient, taskBundle: taskBundle } + } + } + } + } + } catch (e) { + logger.error('Could not process ADT!\n' + e) + throw new IpmsWorkflowError('Could not process ADT!\n' + e) + return { patient: undefined, taskBundle: undefined } + } +} + +export async function handleOruFromIpms(translatedBundle: R4.IBundle): Promise { + // Get Patient By Omang + + // Get ServiceRequests by status and code + + // Match Results to Service Requests + try { + if (translatedBundle && translatedBundle.entry) { + const patient: IPatient = ( + translatedBundle.entry.find(e => e.resource && e.resource.resourceType == 'Patient')! + .resource! + ) + + const dr: IDiagnosticReport = ( + translatedBundle.entry.find( + e => e.resource && e.resource.resourceType == 'DiagnosticReport', + )!.resource! + ) + + const obs: IObservation = ( + translatedBundle.entry.find(e => e.resource && e.resource.resourceType == 'Observation')! + .resource! + ) + const drCode = + dr.code && dr.code.coding && dr.code.coding.length > 0 ? dr.code.coding[0].code : '' + + let omang + const omangEntry = patient.identifier?.find( + i => i.system && i.system == config.get('bwConfig:omangSystemUrl'), + ) + + if (omangEntry) { + omang = omangEntry.value! + } else { + omang = '' + } + + const options = { + timeout: config.get('bwConfig:requestTimeout'), + searchParams: {}, + } + + // Find all active service requests with dr code with this Omang. + options.searchParams = { + identifier: `${config.get('bwConfig:omangSystemUrl')}|${omang}`, + _revinclude: ['ServiceRequest:patient', 'Task:patient'], + } + + const patientBundle = ( + await got + .get( + `${config.get('fhirServer:baseURL')}/Patient/identifier=${config.get( + 'bwConfig:omangSystemUrl', + )}|${omang}&_revinclude=Task:patient&_revinclude=ServiceRequest:patient`, + ) + .json() + ) + + if (patientBundle && patientBundle.entry && patientBundle.entry.length > 0) { + const candidates: IServiceRequest[] = patientBundle.entry + .filter( + e => + e.resource && + e.resource.resourceType == 'ServiceRequest' && + e.resource.status && + e.resource.status == 'active' && + e.resource.code && + e.resource.code.coding && + e.resource.code.coding.length > 0, + ) + .map(e => e.resource) + + const primaryCandidate: IServiceRequest | undefined = candidates.find(c => { + if (c && c.code && c.code.coding) { + const candidateCode = c.code.coding.find( + co => co.system == config.get('bwConfig:ipmsSystemUrl'), + ) + return candidateCode && candidateCode.code == drCode + } + return false + }) + + // Update DR based on primary candidate details + // Update Obs based on primary candidate details + if (primaryCandidate && primaryCandidate.code && primaryCandidate.code.coding) { + if (dr.code && dr.code.coding) + dr.code.coding = dr.code.coding.concat(primaryCandidate.code.coding) + if (obs.code && obs.code.coding) + obs.code.coding = obs.code.coding.concat(primaryCandidate.code.coding) + + const srRef: IReference = {} + srRef.type = 'ServiceRequest' + srRef.reference = 'ServiceRequest/' + primaryCandidate.id + + dr.basedOn = [srRef] + obs.basedOn = [srRef] + } + } + + // TODO: Only send if valid details available + const sendBundle: R4.IBundle = { + resourceType: 'Bundle', + type: BundleTypeKind._transaction, + entry: [ + { + resource: patient, + request: { method: Bundle_RequestMethodKind._put, url: 'Patient/' + patient.id }, + }, + { + resource: dr, + request: { method: Bundle_RequestMethodKind._put, url: 'DiagnosticReport/' + dr.id }, + }, + { + resource: obs, + request: { method: Bundle_RequestMethodKind._put, url: 'Observation/' + obs.id }, + }, + ], + } + + // Save to SHR + const resultBundle: R4.IBundle = await saveBundle(sendBundle) + return resultBundle + } + } catch (error) { + logger.error(`Could not process ORU!\n${error}`) + } + + return translatedBundle +} diff --git a/src/workflows/botswana/LaboratoryBundle.ts b/src/workflows/botswana/LaboratoryBundle.ts new file mode 100644 index 0000000..4cc1f18 --- /dev/null +++ b/src/workflows/botswana/LaboratoryBundle.ts @@ -0,0 +1,40 @@ +import { R4 } from "@ahryman40k/ts-fhir-types"; + +// Define the type for a Transaction bundle with Task, Encounter, and Practitioner resources +interface ILaboratoryBundle extends R4.IBundle { + type: R4.BundleTypeKind._transaction; // Make sure to only allow 'transaction' as the bundle type + // entry: ILaboratoryBundleEntry[]; // Define the entry array using custom type +} + +// // Define the type for each entry in the Transaction bundle +// interface ILaboratoryBundleEntry extends R4.IBundle_Entry { +// resource: R4.ITask | R4.IEncounter | R4.IPractitioner; // Task, Encounter, or Practitioner +// request: R4.IBundle_Request; // For the transaction request details +// } + + +class LaboratoryBundle { + protected bundle: ILaboratoryBundle; + + constructor(bundle: ILaboratoryBundle) { + this.bundle = bundle; + } + + + + getBundle(): R4.IBundle { + return this.bundle; + } + + +} + + + +class PIMSLaboratoryBundle extends LaboratoryBundle { + +} + +class IPMSLaboratoryBundle extends LaboratoryBundle { + +} diff --git a/src/workflows/botswana/LaboratoryServiceRequest.ts b/src/workflows/botswana/LaboratoryServiceRequest.ts new file mode 100644 index 0000000..55495aa --- /dev/null +++ b/src/workflows/botswana/LaboratoryServiceRequest.ts @@ -0,0 +1,90 @@ + +// class LaboratoryServiceRequest { +// protected serviceRequest: R4.IServiceRequest; + +// constructor(serviceRequest: R4.IServiceRequest) { +// this.serviceRequest = serviceRequest; +// } + +// protected async translateCoding(): Promise { +// let ipmsCoding, cielCoding, loincCoding, pimsCoding + +// try { +// if (serviceRequest && serviceRequest.code && serviceRequest.code.coding && serviceRequest.code.coding.length > 0) { +// pimsCoding = this.getCoding(serviceRequest, config.get('bwConfig:pimsSystemUrl')) +// cielCoding = this.getCoding(serviceRequest, config.get('bwConfig:cielSystemUrl')) + +// logger.info(`PIMS Coding: ${JSON.stringify(pimsCoding)}`) +// logger.info(`CIEL Coding: ${JSON.stringify(cielCoding)}`) + +// if (pimsCoding && pimsCoding.code) { +// // Translate from PIMS to CIEL and IPMS +// ipmsCoding = await this.getIpmsCode( +// `/orgs/I-TECH-UW/sources/IPMSLAB/mappings?toConcept=${pimsCoding.code}&toConceptSource=PIMSLAB`, +// pimsCoding.code, +// ) + +// if (ipmsCoding && ipmsCoding.code) { +// cielCoding = await this.getMappedCode( +// `/orgs/I-TECH-UW/sources/IPMSLAB/mappings/?toConceptSource=CIEL&fromConcept=${ipmsCoding.code}`, +// ) +// } + +// if (cielCoding && cielCoding.code) { +// serviceRequest.code.coding.push({ +// system: config.get('bwConfig:cielSystemUrl'), +// code: cielCoding.code, +// display: cielCoding.display, +// }) +// } +// } else if (cielCoding && cielCoding.code) { +// // Translate from CIEL to IPMS +// ipmsCoding = await this.getIpmsCode( +// `/orgs/I-TECH-UW/sources/IPMSLAB/mappings?toConcept=${cielCoding.code}&toConceptSource=CIEL`, +// cielCoding.code, +// ) +// } + +// // Add IPMS Coding +// if (ipmsCoding && ipmsCoding.code) { +// const ipmsOrderTypeExt = { +// url: config.get('bwConfig:ipmsOrderTypeSystemUrl'), +// valueString: ipmsCoding.hl7Flag, +// } + +// const srCoding = { +// system: config.get('bwConfig:ipmsSystemUrl'), +// code: ipmsCoding.mnemonic, +// display: ipmsCoding.display, +// extension: [ipmsOrderTypeExt], +// } + +// serviceRequest.code.coding.push(srCoding) +// } + +// // Get LOINC Coding +// if (cielCoding && cielCoding.code) { +// loincCoding = await this.getMappedCode( +// `/orgs/CIEL/sources/CIEL/mappings/?toConceptSource=LOINC&fromConcept=${cielCoding.code}`, +// ) +// if (loincCoding && loincCoding.code) { +// serviceRequest.code.coding.push({ +// system: config.get('bwConfig:loincSystemUrl'), +// code: loincCoding.code, +// display: loincCoding.display, +// }) +// } +// } + +// return serviceRequest +// } else { +// logger.error('Could not find coding to translate in:\n' + JSON.stringify(serviceRequest)) +// return serviceRequest +// } +// } catch (e) { +// logger.error(`Could not translate ServiceRequest codings: \n ${e}`) +// return serviceRequest +// } + +// return ServiceRequest; +// } diff --git a/src/workflows/botswana/helpers.ts b/src/workflows/botswana/helpers.ts new file mode 100644 index 0000000..e15be5d --- /dev/null +++ b/src/workflows/botswana/helpers.ts @@ -0,0 +1,109 @@ +import { R4 } from '@ahryman40k/ts-fhir-types' +import logger from '../../lib/winston' +import got, { HTTPError, OptionsOfTextResponseBody, RequestError } from 'got' + +export function getTaskStatus(labBundle: R4.IBundle): R4.TaskStatusKind | undefined { + let taskResult, task + + try { + taskResult = labBundle.entry!.find(entry => { + return entry.resource && entry.resource.resourceType == 'Task' + }) + + if (taskResult) { + task = taskResult.resource! + + return task.status! + } + } catch (error) { + logger.error(`Could not get Task status for task:\n${task}`) + return undefined + } +} + +export function setTaskStatus(labBundle: R4.IBundle, status: R4.TaskStatusKind): R4.IBundle { + let taskIndex, task + + try { + taskIndex = labBundle.entry!.findIndex(entry => { + return entry.resource && entry.resource.resourceType == 'Task' + }) + + if (labBundle.entry && labBundle.entry.length > 0 && taskIndex >= 0) { + (labBundle.entry[taskIndex].resource!).status = status + } + return labBundle + } catch (error) { + logger.error(`Could not get Task status for task:\n${task}`) + return labBundle + } +} + +export function getBundleEntry( + entries: R4.IBundle_Entry[], + type: string, + id?: string, +): R4.IResource | undefined { + const entry = entries.find(entry => { + return entry.resource && entry.resource.resourceType == type && (!id || entry.resource.id == id) + }) + + return entry?.resource +} + +export function getBundleEntries( + entries: R4.IBundle_Entry[], + type: string, + id?: string, +): (R4.IResource | undefined)[] { + return entries + .filter(entry => { + return ( + entry.resource && entry.resource.resourceType == type && (!id || entry.resource.id == id) + ) + }) + .map(entry => { + return entry.resource + }) +} + +// Wrapper function that includes retry logic +export async function postWithRetry( + crUrl: string, + options: OptionsOfTextResponseBody, + retryLimit = 5, + timeout = 1000, +) { + for (let attempt = 1; attempt <= retryLimit; attempt++) { + try { + const response = await got.post(crUrl, options).json() + return response // If request is successful, return the response + } catch (error) { + logger.error(`Attempt ${attempt} failed`) + + // Sleep for a given amount of time + await new Promise(resolve => setTimeout(resolve, timeout)) + + if (error instanceof HTTPError) { + // Handle HTTP errors (4xx and 5xx response codes) + console.error(`HTTP Error: ${error.response.statusCode}`) + } else if (error instanceof RequestError) { + // Handle network errors or other request issues + console.error(`Request Error: ${error.message}`) + } else { + // Handle any other errors that might occur + console.error(`Unknown Error: ${error}`) + } + + // If we are on the last attempt, re-throw the error + if (attempt === retryLimit) { + console.error('All retries failed') + throw error + } + } + } +} + +export async function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)) +} diff --git a/src/workflows/hl7WorkflowsBw.ts b/src/workflows/botswana/hl7Workflows.ts similarity index 78% rename from src/workflows/hl7WorkflowsBw.ts rename to src/workflows/botswana/hl7Workflows.ts index 9967611..3ea3bd6 100644 --- a/src/workflows/hl7WorkflowsBw.ts +++ b/src/workflows/botswana/hl7Workflows.ts @@ -3,9 +3,9 @@ import { R4 } from '@ahryman40k/ts-fhir-types' import { BundleTypeKind, IBundle } from '@ahryman40k/ts-fhir-types/lib/R4' import got from 'got/dist/source' -import config from '../lib/config' -import logger from '../lib/winston' -import { LabWorkflowsBw, topicList } from './labWorkflowsBw' +import config from '../../lib/config' +import logger from '../../lib/winston' +import { WorkflowHandler, topicList } from './workflowHandler' import sleep from 'sleep-promise' export default class Hl7WorkflowsBw { @@ -30,7 +30,7 @@ export default class Hl7WorkflowsBw { ) if (translatedBundle != this.errorBundle && translatedBundle.entry) { - LabWorkflowsBw.sendPayload({ bundle: translatedBundle }, topicList.HANDLE_ORU_FROM_IPMS) + WorkflowHandler.sendPayload({ bundle: translatedBundle }, topicList.HANDLE_ORU_FROM_IPMS) return translatedBundle } else { return this.errorBundle @@ -41,30 +41,16 @@ export default class Hl7WorkflowsBw { } } - static async handleAdtMessage(hl7Msg: string): Promise { + static async handleAdtMessage(hl7Msg: string): Promise { try { - const translatedBundle: R4.IBundle = await Hl7WorkflowsBw.translateBundle( - hl7Msg, - 'bwConfig:fromIpmsAdtTemplate', - ) - - if (translatedBundle != this.errorBundle) { - // Save to SHR?? - // let resultBundle: R4.IBundle = await saveBundle(translatedBundle) - - LabWorkflowsBw.sendPayload({ bundle: translatedBundle }, topicList.SAVE_IPMS_PATIENT) - - return translatedBundle - } else { - return this.errorBundle - } + WorkflowHandler.sendPayload({ message: hl7Msg }, topicList.HANDLE_ADT_FROM_IPMS) } catch (error: any) { + // TODO: Major Error - send to DMQ or handle otherwise logger.error(`Could not translate and save ADT message!\n${JSON.stringify(error)}`) - return this.errorBundle } } - private static async translateBundle(hl7Msg: string, template: string) { + static async translateBundle(hl7Msg: string, template: string) { let tries = 0 let translatedBundle: R4.IBundle = this.errorBundle diff --git a/src/workflows/botswana/locationWorkflows.ts b/src/workflows/botswana/locationWorkflows.ts new file mode 100644 index 0000000..4713de1 --- /dev/null +++ b/src/workflows/botswana/locationWorkflows.ts @@ -0,0 +1,192 @@ +import { R4 } from '@ahryman40k/ts-fhir-types' +import facilityMappings from '../../lib/locationMap' +import logger from '../../lib/winston' +import { getBundleEntries, getBundleEntry } from './helpers' +import * as crypto from 'crypto' +import config from '../../lib/config' +/** + * + * @param labBundle + * @returns + */ +export async function mapLocations(labBundle: R4.IBundle): Promise { + logger.info('Mapping Locations!') + + return await addBwLocations(labBundle) +} + +// * This method adds IPMS - specific location mappings to the order bundle based on the ordering +// * facility +// * @param bundle +// * @returns bundle +// * / +// // +// +// This method assumes that the Task resource has a reference to the recieving facility +// under the `owner` field. This is the facility that the lab order is being sent to. +export async function addBwLocations(bundle: R4.IBundle): Promise { + let mappedLocation: R4.ILocation | undefined + let mappedOrganization: R4.IOrganization | undefined + + try { + logger.info('Adding Location Info to Bundle') + + if (bundle && bundle.entry) { + const task: R4.ITask = getBundleEntry(bundle.entry, 'Task') + const srs: R4.IServiceRequest[] = ( + getBundleEntries(bundle.entry, 'ServiceRequest') + ) + + const orderingLocationRef: R4.IReference | undefined = task.location + + const srOrganizationRefs: (R4.IReference | undefined)[] = srs.map(sr => { + if (sr.requester) { + return sr.requester + } else { + return undefined + } + }) + + const locationId = orderingLocationRef?.reference?.split('/')[1] + const srOrgIds = srOrganizationRefs.map(ref => { + return ref?.reference?.split('/')[1] + }) + + const uniqueOrgIds = Array.from(new Set(srOrgIds)) + + if (uniqueOrgIds.length != 1 || !locationId) { + logger.error( + `Wrong number of ordering Organizations and Locations in this bundle:\n${JSON.stringify( + uniqueOrgIds, + )}\n${JSON.stringify(locationId)}`, + ) + } + + const orderingLocation = getBundleEntry(bundle.entry, 'Location', locationId) + const orderingOrganization = ( + getBundleEntry(bundle.entry, 'Organization', uniqueOrgIds[0]) + ) + + if (orderingLocation && orderingOrganization) { + if ( + !orderingLocation.managingOrganization || + orderingLocation.managingOrganization.reference?.split('/')[1] != orderingOrganization.id + ) { + logger.error('Ordering Organization is not the managing Organziation of Location!') + } + + mappedLocation = await translateLocation(orderingLocation) + mappedOrganization = { + resourceType: 'Organization', + id: crypto + .createHash('md5') + .update('Organization/' + mappedLocation.name) + .digest('hex'), + identifier: mappedLocation.identifier, + name: mappedLocation.name, + } + + const mappedLocationRef: R4.IReference = { + reference: `Location/${mappedLocation.id}`, + } + const mappedOrganizationRef: R4.IReference = { + reference: `Organization/${mappedOrganization.id}`, + } + + mappedLocation.managingOrganization = mappedOrganizationRef + + if (mappedLocation && mappedLocation.id) { + task.location = mappedLocationRef + + bundle.entry.push({ + resource: mappedLocation, + request: { + method: R4.Bundle_RequestMethodKind._put, + url: mappedLocationRef.reference, + }, + }) + } + if (mappedOrganization && mappedOrganization.id) { + task.owner = mappedOrganizationRef + bundle.entry.push({ + resource: mappedOrganization, + request: { + method: R4.Bundle_RequestMethodKind._put, + url: mappedOrganizationRef.reference, + }, + }) + for (const sr of srs) { + sr.performer + ? sr.performer.push(mappedOrganizationRef) + : (sr.performer = [mappedOrganizationRef]) + } + } + } + } + } catch (e) { + logger.error(e) + } + + return bundle +} + +/** + * @param location + * @returns R4.ILocation + */ +export async function translateLocation(location: R4.ILocation): Promise { + logger.info('Translating Location Data') + + const returnLocation: R4.ILocation = { + resourceType: 'Location', + } + const mappings = await facilityMappings + let targetMapping + logger.info('Facility mappings: ' + mappings.length) + + for (const mapping of mappings) { + if (mapping.orderingFacility == location.name) { + targetMapping = mapping + } + } + + if (targetMapping) { + logger.info( + "Mapped location '" + location.name + "' to '" + targetMapping.orderingFacility + "'", + ) + returnLocation.id = crypto + .createHash('md5') + .update('Organization/' + returnLocation.name) + .digest('hex') + returnLocation.identifier = [ + { + system: config.get('bwConfig:ipmsCodeSystemUrl'), + value: targetMapping.receivingFacility, + }, + ] + + returnLocation.name = targetMapping.receivingFacility + returnLocation.extension = [] + returnLocation.extension.push({ + url: config.get('bwConfig:ipmsProviderSystemUrl'), + valueString: targetMapping.provider, + }) + returnLocation.extension.push({ + url: config.get('bwConfig:ipmsPatientTypeSystemUrl'), + valueString: targetMapping.patientType, + }) + returnLocation.extension.push({ + url: config.get('bwConfig:ipmsPatientStatusSystemUrl'), + valueString: targetMapping.patientStatus, + }) + returnLocation.extension.push({ + url: config.get('bwConfig:ipmsXLocationSystemUrl'), + valueString: targetMapping.xLocation, + }) + } else { + logger.error('Could not find a location mapping for:\n' + JSON.stringify(location.name)) + } + + logger.info(`Translated Location:\n${JSON.stringify(returnLocation)}`) + return returnLocation +} diff --git a/src/workflows/botswana/patientIdentityWorkflows.ts b/src/workflows/botswana/patientIdentityWorkflows.ts new file mode 100644 index 0000000..a167c1f --- /dev/null +++ b/src/workflows/botswana/patientIdentityWorkflows.ts @@ -0,0 +1,61 @@ +import { R4 } from "@ahryman40k/ts-fhir-types" +import config from "../../lib/config" +import { postWithRetry } from "./helpers" +import logger from "../../lib/winston" + +/** + * updateCrPatient + * @param labBundle + * @returns + */ +export async function updateCrPatient(bundle: R4.IBundle): Promise { + const crUrl = `${config.get('clientRegistryUrl')}/Patient` + let pat: R4.IPatient + + const patResult = bundle.entry!.find(entry => { + return entry.resource && entry.resource.resourceType == 'Patient' + }) + + const options = { + timeout: config.get('bwConfig:requestTimeout'), + username: config.get('mediator:client:username'), + password: config.get('mediator:client:password'), + json: {}, + } + + if (patResult) { + pat = patResult.resource! + options.json = pat + } + + const crResult = await postWithRetry(crUrl, options, config.get('bwConfig:retryCount'), config.get('bwConfig:retryDelay')) + + logger.debug(`CR Patient Update Result: ${JSON.stringify(crResult)}`) + + return bundle +} + + +/** + * + * @param labBundle + * @returns + */ +export async function savePimsPatient(labBundle: R4.IBundle): Promise { + const resultBundle = updateCrPatient(labBundle) + + return resultBundle +} + +/** + * + * @param labBundle + * @returns + */ +export async function saveIpmsPatient(registrationBundle: R4.IBundle): Promise { + // Save to CR + const resultBundle = updateCrPatient(registrationBundle) + + return resultBundle +} + diff --git a/src/workflows/botswana/terminologyWorkflows.ts b/src/workflows/botswana/terminologyWorkflows.ts new file mode 100644 index 0000000..f06a67b --- /dev/null +++ b/src/workflows/botswana/terminologyWorkflows.ts @@ -0,0 +1,212 @@ +import { R4 } from '@ahryman40k/ts-fhir-types' +import { IBundle } from '@ahryman40k/ts-fhir-types/lib/R4' +import got from 'got' +import logger from '../../lib/winston' +import config from '../../lib/config' +/** + * + * @param labBundle + * @returns + */ +export async function mapConcepts(labBundle: IBundle): Promise { + logger.info('Mapping Concepts!') + + return await addAllCodings(labBundle) +} + +// Add terminology mappings info to Bundle +async function addAllCodings(labBundle: IBundle): Promise { + try { + for (const e of labBundle.entry!) { + if ( + e.resource && + e.resource.resourceType == 'ServiceRequest' && + e.resource.code && + e.resource.code.coding && + e.resource.code.coding.length > 0 + ) { + logger.info`Translating ServiceRequest Codings` + e.resource = await translateCoding(e.resource) + } else { + logger.info`No Codings to Translate` + } + } + } catch (e) { + logger.error(e) + } + return labBundle +} + +async function translateCoding(sr: R4.IServiceRequest): Promise { + let ipmsCoding, cielCoding, loincCoding, pimsCoding + + try { + if (sr && sr.code && sr.code.coding && sr.code.coding.length > 0) { + pimsCoding = getCoding(sr, config.get('bwConfig:pimsSystemUrl')) + cielCoding = getCoding(sr, config.get('bwConfig:cielSystemUrl')) + + logger.info(`PIMS Coding: ${JSON.stringify(pimsCoding)}`) + logger.info(`CIEL Coding: ${JSON.stringify(cielCoding)}`) + + if (pimsCoding && pimsCoding.code) { + // Translate from PIMS to CIEL and IPMS + ipmsCoding = await getIpmsCode( + `/orgs/I-TECH-UW/sources/IPMSLAB/mappings?toConcept=${pimsCoding.code}&toConceptSource=PIMSLAB`, + pimsCoding.code, + ) + + if (ipmsCoding && ipmsCoding.code) { + cielCoding = await getMappedCode( + `/orgs/I-TECH-UW/sources/IPMSLAB/mappings/?toConceptSource=CIEL&fromConcept=${ipmsCoding.code}`, + ) + } + + if (cielCoding && cielCoding.code) { + sr.code.coding.push({ + system: config.get('bwConfig:cielSystemUrl'), + code: cielCoding.code, + display: cielCoding.display, + }) + } + } else if (cielCoding && cielCoding.code) { + // Translate from CIEL to IPMS + ipmsCoding = await getIpmsCode( + `/orgs/I-TECH-UW/sources/IPMSLAB/mappings?toConcept=${cielCoding.code}&toConceptSource=CIEL`, + cielCoding.code, + ) + } + + // Add IPMS Coding + if (ipmsCoding && ipmsCoding.code) { + const ipmsOrderTypeExt = { + url: config.get('bwConfig:ipmsOrderTypeSystemUrl'), + valueString: ipmsCoding.hl7Flag, + } + + const srCoding = { + system: config.get('bwConfig:ipmsSystemUrl'), + code: ipmsCoding.mnemonic, + display: ipmsCoding.display, + extension: [ipmsOrderTypeExt], + } + + sr.code.coding.push(srCoding) + } + + // Get LOINC Coding + if (cielCoding && cielCoding.code) { + loincCoding = await getMappedCode( + `/orgs/CIEL/sources/CIEL/mappings/?toConceptSource=LOINC&fromConcept=${cielCoding.code}`, + ) + if (loincCoding && loincCoding.code) { + sr.code.coding.push({ + system: config.get('bwConfig:loincSystemUrl'), + code: loincCoding.code, + display: loincCoding.display, + }) + } + } + + return sr + } else { + logger.error('Could not find coding to translate in:\n' + JSON.stringify(sr)) + return sr + } + } catch (e) { + logger.error(`Could not translate ServiceRequest codings: \n ${e}`) + return sr + } +} + +async function getIpmsCode(q: string, c = '') { + try { + const ipmsMappings = await getOclMapping(q) + + //logger.info(`IPMS Mappings: ${JSON.stringify(ipmsMappings)}`) + + // Prioritize "Broader Than Mappings" + //TODO: Figure out if this is proper way to handle panels / broad to narrow + let mappingIndex = ipmsMappings.findIndex( + (x: any) => x.map_type == 'BROADER-THAN' && x.to_concept_code == c, + ) + + // Fall back to "SAME AS" + if (mappingIndex < 0) { + mappingIndex = ipmsMappings.findIndex( + (x: any) => x.map_type == 'SAME-AS' && x.to_concept_code == c, + ) + } + + if (mappingIndex >= 0) { + const ipmsCode = ipmsMappings[mappingIndex].from_concept_code + const ipmsDisplay = ipmsMappings[mappingIndex].from_concept_name_resolved + const ipmsCodingInfo: any = await getOclMapping( + `/orgs/I-TECH-UW/sources/IPMSLAB/concepts/${ipmsCode}`, + ) + // logger.info(`IPMS Coding Info: ${JSON.stringify(ipmsCodingInfo)}`) + let ipmsMnemonic, hl7Flag + if (ipmsCodingInfo) { + ipmsMnemonic = ipmsCodingInfo.names.find((x: any) => x.name_type == 'Short').name + hl7Flag = + ipmsCodingInfo.extras && ipmsCodingInfo.extras.IPMS_HL7_ORM_TYPE + ? ipmsCodingInfo.extras.IPMS_HL7_ORM_TYPE + : 'LAB' + } + + return { code: ipmsCode, display: ipmsDisplay, mnemonic: ipmsMnemonic, hl7Flag: hl7Flag } + } else { + return null + } + } catch (e) { + logger.error(e) + return null + } +} +async function getMappedCode(q: string): Promise { + try { + const codeMapping = await getOclMapping(q) + + //logger.info(`Code Mapping: ${JSON.stringify(codeMapping)}`) + + if (codeMapping && codeMapping.length > 0) { + return { + code: codeMapping[0].to_concept_code, + display: codeMapping[0].to_concept_name_resolved, + } + } else { + return {} + } + } catch (e) { + logger.error(e) + return {} + } +} + +async function getOclMapping(queryString: string): Promise { + const options = { timeout: config.get('bwConfig:requestTimeout') | 1000 } + + logger.info(`${config.get('bwConfig:oclUrl')}${queryString}`) + + // TODO: Add retry logic + return got.get(`${config.get('bwConfig:oclUrl')}${queryString}`, options).json() +} + +async function getOclConcept(conceptCode: string): Promise { + const options = { timeout: config.get('bwConfig:requestTimeout') | 1000 } + + // TODO: Add retry logic + return got + .get( + `${config.get('bwConfig:oclUrl')}/orgs/I-TECH-UW/sources/IPMSLAB/concepts/${conceptCode}`, + options, + ) + .json() +} + +function getCoding(sr: R4.IServiceRequest, system: string): R4.ICoding { + if (sr.code && sr.code.coding) { + return sr.code.coding.find(e => e.system && e.system == system) + } else { + return {} + } +} diff --git a/src/workflows/botswana/workflowHandler.ts b/src/workflows/botswana/workflowHandler.ts new file mode 100644 index 0000000..4659d67 --- /dev/null +++ b/src/workflows/botswana/workflowHandler.ts @@ -0,0 +1,285 @@ +'use strict' + +import { R4 } from '@ahryman40k/ts-fhir-types' + +import config from '../../lib/config' +import { KafkaProducerUtil } from '../../lib/kafkaProducerUtil' +import logger from '../../lib/winston' +import { KafkaConfig, ProducerRecord } from 'kafkajs' +import { logLevel } from 'kafkajs' +import { handleAdtFromIpms, handleOruFromIpms, sendAdtToIpms, sendOrmToIpms } from './IpmsWorkflows' +import { mapConcepts } from './terminologyWorkflows' +import { mapLocations } from './locationWorkflows' +import { saveIpmsPatient, updateCrPatient } from './patientIdentityWorkflows' +import { saveBundle } from '../../hapi/lab' +import { sleep } from './helpers' + +// eslint-disable-next-line @typescript-eslint/no-var-requires +const hl7 = require('hl7') + +const brokers = config.get('taskRunner:brokers') || ['kafka:9092'] + +const producerConfig: KafkaConfig = { + clientId: 'shr-producer', + brokers: brokers, + logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR, +} + +class KafkaCallbackError extends Error { + constructor(message: string) { + super(message) + this.name = 'KafkaCallbackError' + + // Maintains proper stack trace for where our error was thrown (only available on V8) + if (Error.captureStackTrace) { + Error.captureStackTrace(this, KafkaCallbackError) + } + } +} + +export const topicList = { + SEND_ADT_TO_IPMS: 'send-adt-to-ipms', + SEND_ORM_TO_IPMS: 'send-orm-to-ipms', + SAVE_PIMS_PATIENT: 'save-pims-patient', + SAVE_IPMS_PATIENT: 'save-ipms-patient', + HANDLE_ORU_FROM_IPMS: 'handle-oru-from-ipms', + HANDLE_ADT_FROM_IPMS: 'handle-adt-from-ipms', +} + +/** + * + * To handle a lab order from the PIMS system (https://www.postman.com/itechuw/workspace/botswana-hie/collection/1525496-db80feab-8a77-42c8-aa7e-fd4beb0ae6a8) + * + * 1. PIMS sends Lab bundle to SHR (postman trigger) + * 2. SHR saves bundle and patient, and sets bundle status --> Requested (postman test) + * 3. SHR saves patient in CR (postman test) + * 3. SHR translates bundle --> ADT04 HL7 Message (postman request - ADT04_to_IPMS.hbs) + * 4. SHR sends HL7 Message to IPMS (Request Interceptor Needed - mllp interceptor?) + * + * ----- async ----- + * 5. IPMS sends registration message to SHR (mllp test trigger) + * 6. SHR translates message --> Patient Resource (postman request) + * 7. Search for patient by some data from patient resource, like Omang / passport # / etc.) (postman request) + * 8. Get all Task Bundles where status is Requested for the patient in the SHR (postman request) + * 9. Send patient resource --> CR (postman test) + * For each Bundle: + * 10. Translate to ORM message (postman request) + * 11. Send ORM HL7 message to IPMS and get back ACK (Request Interceptor Needed - mllp interceptor?) + * 12. Set Task status --> received / accepted / rejected (postman test) + * + * + * + * Ensuring Data Integrity and Consistency + * + * For the following key actions, all of the outlined indicators of success must be met; otherwise, the + * incoming package needs to be marked as "uncommited" and retried later. This is necessary for + * both incoming Bundles and HL7 messages. Basically, if the external client has connectivity to + * the HIE, and if the SHR is running and recieves the package, then the workflow must at some point + * run and result in an ADT message being sent. If the workflow fails at any point, the package must + * be marked as "uncommited" and retried until success, or until a notification is sent out. + * + * 1. Incoming Lab Bundle + * We need to ensure that once a Lab Bundle comes in either from PIMS or BotswanaEMR, + * that eventually an ADT message is sent to IPMS to begin the IPMS side of the workflow. + * + * - Bundle is saved into SHR + * - Patient is saved into CR + * - Bundle is translated to ADT message + * - ADT message is sent to IPMS + */ +export class WorkflowHandler { + private static kafka = new KafkaProducerUtil(producerConfig, report => { + logger.info('Delivery report:', report) + }) + + // Static instance of the Kafka producer. + private static kafkaProducerInitialized = false + + // Initialize Kafka producer when the class is first used. + public static async initKafkaProducer() { + if (!this.kafkaProducerInitialized) { + await this.kafka.init() + this.kafkaProducerInitialized = true + } + } + + // Shutdown Kafka producer when the application terminates. + public static async shutdownKafkaProducer() { + if (this.kafkaProducerInitialized) { + await this.kafka.shutdown() + this.kafkaProducerInitialized = false + } + } + + static async executeTopicWorkflow(topic: string, val: any) { + let response: any + let enrichedBundle + let origBundle + let hl7Message + + // Each of these topics holds a workflow that is atomic. If any required part fails, then + // the entire workflow fails and the message is retried by the Kafka consumer logic. + // + // The SHR will ensure integrity for the following workflows: + // - If an order bundle reaches the HIE from PIMS or OpenMRS, then an ADT message will eventually be sent to IPMS. + // - If an ADT message comes in from IPMS and reaches the HIE, then an ORM message will eventually be sent to IPMS + // - If a results message comes in from IPMS, then the result will eventually be saved to the SHR and the task status updated. + + try { + switch (topic) { + // Retry this kafka message if the ADT message fails to send to IPMS. In other words, + // manage offsets manually, and only update them if the ADT message is successfully sent. + case topicList.SEND_ADT_TO_IPMS: { + origBundle = JSON.parse(val).bundle + + enrichedBundle = await mapConcepts(origBundle) + enrichedBundle = await mapLocations(enrichedBundle) + + this.sendPayloadWithRetryDMQ({ bundle: enrichedBundle }, topicList.SAVE_PIMS_PATIENT) + + enrichedBundle = await sendAdtToIpms(enrichedBundle) + + // Succeed only if this bundle saves successfully + response = await saveBundle(enrichedBundle) + + break + } + case topicList.HANDLE_ADT_FROM_IPMS: { + hl7Message = val + + const adtRes = await handleAdtFromIpms(hl7Message) + + this.sendPayloadWithRetryDMQ(adtRes.patient, topicList.SAVE_IPMS_PATIENT) + + enrichedBundle = await sendOrmToIpms(adtRes) + + // Succeed only if this bundle saves successfully + response = await saveBundle(enrichedBundle) + + break + } + case topicList.HANDLE_ORU_FROM_IPMS: { + hl7Message = val + + response = await handleOruFromIpms(val) + + break + } + case topicList.SAVE_PIMS_PATIENT: { + origBundle = JSON.parse(val).bundle + response = await updateCrPatient(origBundle) + + break + } + case topicList.SAVE_IPMS_PATIENT: { + origBundle = JSON.parse(val).bundle + response = await saveIpmsPatient(origBundle) + break + } + default: { + break + } + } + await new Promise(resolve => setTimeout(resolve, 300)) + + return response + } catch (e) { + logger.error('Could not execute Kafka consumer callback workflow!\nerror: ' + e) + throw new KafkaCallbackError( + 'Could not execute Kafka consumer callback workflow!\nerror: ' + e, + ) + } + } + + /** + * Sends a payload to a Kafka topic. + * @param payload - The payload to send. + * @param topic - The Kafka topic to send the payload to. + * @returns A Promise that resolves when the payload has been sent. + */ + public static async sendPayload(payload: any, topic: string) { + await this.initKafkaProducer() + let val = '' + + if (JSON.parse(payload)) val = JSON.stringify(payload) + else val = payload + + const records: ProducerRecord[] = [ + { + topic: topic, + messages: [{ key: 'body', value: val }], + }, + ] + + try { + logger.info(`Sending payload to topic ${topic}: ${JSON.stringify(payload)}`) + await this.kafka.sendMessageTransactionally(records) + } catch (err) { + console.error(`Error sending payload to topic ${topic}: ${err}`) + throw new Error(`Error sending payload to topic ${topic}: ${err}`) + } + } + + /** + * Sends a payload to a Kafka topic with exponential retry and DMQ logging. + * @param payload - The payload to send. + * @param topic - The Kafka topic to send the payload to. + * @param maxRetries - Maximum number of retries before sending to DMQ. + * @param retryDelay - Initial delay before the first retry, subsequent retries double this delay. + * @returns A Promise that resolves when the payload has been sent or logged to DMQ. + */ + public static async sendPayloadWithRetryDMQ( + payload: any, + topic: string, + maxRetries = 5, + retryDelay = 1000, + ) { + await this.initKafkaProducer() + const val = JSON.parse(payload) ? JSON.stringify(payload) : payload + let error + const records: ProducerRecord[] = [ + { + topic: topic, + messages: [{ key: 'body', value: val }], + }, + ] + + let attempt = 0 + + while (attempt < maxRetries) { + try { + logger.info( + `Attempt ${attempt + 1}: Sending payload to topic ${topic}: ${JSON.stringify(payload)}`, + ) + await this.kafka.sendMessageTransactionally(records) + return // Success, exit the function. + } catch (err) { + error = err + logger.error(`Attempt ${attempt + 1}: Error sending payload to topic ${topic}: ${err}`) + attempt++ + await sleep(retryDelay * Math.pow(2, attempt - 1)) // Exponential back-off. + } + } + + // If all retries fail, send to Dead Message Queue. + if (error && attempt === maxRetries) { + logger.error(`All retries failed. Sending payload to DMQ!`) + try { + logger.error('TODO: Implement DMQ!:\n' + JSON.stringify(payload)) + } catch (dmqError) { + logger.error(`Failed to send payload to DMQ: ${dmqError}`) + throw new Error(`Failed to send payload to DMQ: ${dmqError}`) + } + } + } + + // Entrypoint wrapper function for Lab Order Workflows + static async handleLabOrder(orderBundle: R4.IBundle): Promise { + try { + await this.sendPayload({ bundle: orderBundle }, topicList.SEND_ADT_TO_IPMS) + } catch (e) { + logger.error(`Could not handle lab order!\n${JSON.stringify(e)}`) + throw new Error(`Could not handle lab order!\n${JSON.stringify(e)}`) + } + } +} diff --git a/src/workflows/labWorkflowsBw.ts b/src/workflows/labWorkflowsBw.ts deleted file mode 100644 index c263ecd..0000000 --- a/src/workflows/labWorkflowsBw.ts +++ /dev/null @@ -1,1047 +0,0 @@ -'use strict' - -import { R4 } from '@ahryman40k/ts-fhir-types' -import { - BundleTypeKind, - Bundle_RequestMethodKind, - IBundle, - IBundle_Entry, - IDiagnosticReport, - IObservation, - IPatient, - IReference, - IServiceRequest, - TaskStatusKind, -} from '@ahryman40k/ts-fhir-types/lib/R4' -import got from 'got' -import { saveBundle } from '../hapi/lab' -import config from '../lib/config' -import Hl7MllpSender from '../lib/hl7MllpSender' -import { KafkaProducerUtil } from '../lib/kafkaProducerUtil' -import logger from '../lib/winston' -import Hl7WorkflowsBw from './hl7WorkflowsBw' -import { LabWorkflows } from './labWorkflows' -import facilityMappings from '../lib/locationMap' -import crypto from 'crypto' -import { KafkaConfig, ProducerRecord } from 'kafkajs' -import { logLevel } from 'kafkajs'; - -// eslint-disable-next-line @typescript-eslint/no-var-requires -const hl7 = require('hl7') - -const brokers = config.get('taskRunner:brokers') || ['kafka:9092'] - -const producerConfig: KafkaConfig = { - clientId: 'shr-producer', - brokers: brokers, - logLevel: config.get('taskRunner:logLevel') || logLevel.ERROR -}; - - -export const topicList = { - MAP_CONCEPTS: 'map-concepts', - MAP_LOCATIONS: 'map-locations', - SEND_ADT_TO_IPMS: 'send-adt-to-ipms', - SEND_ORM_TO_IPMS: 'send-orm-to-ipms', - SAVE_PIMS_PATIENT: 'save-pims-patient', - SAVE_IPMS_PATIENT: 'save-ipms-patient', - HANDLE_ORU_FROM_IPMS: 'handle-oru-from-ipms', -} - -export class LabWorkflowsBw extends LabWorkflows { - private static kafka = new KafkaProducerUtil(producerConfig, (report) => { - logger.info('Delivery report:', report); - }); - - // Static instance of the Kafka producer. - private static kafkaProducerInitialized = false; - - // Initialize Kafka producer when the class is first used. - public static async initKafkaProducer() { - if (!this.kafkaProducerInitialized) { - await this.kafka.init(); - this.kafkaProducerInitialized = true; - } - } - - // Shutdown Kafka producer when the application terminates. - public static async shutdownKafkaProducer() { - if (this.kafkaProducerInitialized) { - await this.kafka.shutdown(); - this.kafkaProducerInitialized = false; - } - } - - /** - * - * To handle a lab order from the PIMS system (https://www.postman.com/itechuw/workspace/botswana-hie/collection/1525496-db80feab-8a77-42c8-aa7e-fd4beb0ae6a8) - * - * 1. PIMS sends Lab bundle to SHR (postman trigger) - * 2. SHR saves bundle and patient, and sets bundle status --> Requested (postman test) - * 3. SHR saves patient in CR (postman test) - * 3. SHR translates bundle --> ADT04 HL7 Message (postman request - ADT04_to_IPMS.hbs) - * 4. SHR sends HL7 Message to IPMS (Request Interceptor Needed - mllp interceptor?) - * - * ----- async ----- - * 5. IPMS sends registration message to SHR (mllp test trigger) - * 6. SHR translates message --> Patient Resource (postman request) - * 7. Search for patient by some data from patient resource, like Omang / passport # / etc.) (postman request) - * 8. Get all Task Bundles where status is Requested for the patient in the SHR (postman request) - * 9. Send patient resource --> CR (postman test) - * For each Bundle: - * 10. Translate to ORM message (postman request) - * 11. Send ORM HL7 message to IPMS and get back ACK (Request Interceptor Needed - mllp interceptor?) - * 12. Set Task status --> received / accepted / rejected (postman test) - * - * @param orderBundle - * @param resultBundle - */ - - static async handleBwLabOrder(orderBundle: R4.IBundle, resultBundle: R4.IBundle) { - try { - await this.sendPayload({ bundle: orderBundle, response: resultBundle }, topicList.MAP_CONCEPTS) - } catch (e) { - logger.error(e) - } - } - - static async executeTopicWorkflow(topic: string, val: any) { - let res - try { - switch (topic) { - case topicList.MAP_CONCEPTS: - res = await LabWorkflowsBw.mapConcepts(JSON.parse(val).bundle) - break - case topicList.MAP_LOCATIONS: - res = await LabWorkflowsBw.mapLocations(JSON.parse(val).bundle) - break - case topicList.SAVE_PIMS_PATIENT: - res = await LabWorkflowsBw.updateCrPatient(JSON.parse(val).bundle) - break - case topicList.SEND_ADT_TO_IPMS: - res = await LabWorkflowsBw.sendAdtToIpms(JSON.parse(val).bundle) - break - case topicList.SAVE_IPMS_PATIENT: - res = await LabWorkflowsBw.saveIpmsPatient(JSON.parse(val).bundle) - break - case topicList.SEND_ORM_TO_IPMS: - res = await LabWorkflowsBw.sendOrmToIpms(JSON.parse(val)) - break - case topicList.HANDLE_ORU_FROM_IPMS: - res = await LabWorkflowsBw.handleOruFromIpms(JSON.parse(val).bundle) - break - default: - break - } - await new Promise(resolve => setTimeout(resolve, 300)) - - return res - } catch (e) { - logger.error(e) - } - } - - // Add coding mappings info to bundle - static async addBwCodings(bundle: R4.IBundle): Promise { - try { - for (const e of bundle.entry!) { - if ( - e.resource && - e.resource.resourceType == 'ServiceRequest' && - e.resource.code && - e.resource.code.coding && - e.resource.code.coding.length > 0 - ) { - logger.info`Translating ServiceRequest Codings` - e.resource = await this.translateCoding(e.resource) - } - } - } catch (e) { - logger.error(e) - } - - return bundle - } - - /** - * This method adds IPMS-specific location mappings to the order bundle based on the ordering - * facility - * @param bundle - * @returns bundle - */ - // - // - // This method assumes that the Task resource has a reference to the recieving facility - // under the `owner` field. This is the facility that the lab order is being sent to. - static async addBwLocations(bundle: R4.IBundle): Promise { - let mappedLocation: R4.ILocation | undefined - let mappedOrganization: R4.IOrganization | undefined - - try { - logger.info('Adding Location Info to Bundle') - - if (bundle && bundle.entry) { - const task: R4.ITask = this.getBundleEntry(bundle.entry, 'Task') - const srs: R4.IServiceRequest[] = ( - this.getBundleEntries(bundle.entry, 'ServiceRequest') - ) - - const orderingLocationRef: R4.IReference | undefined = task.location - - const srOrganizationRefs: (R4.IReference | undefined)[] = srs.map(sr => { - if (sr.requester) { - return sr.requester - } else { - return undefined - } - }) - - const locationId = orderingLocationRef?.reference?.split('/')[1] - const srOrgIds = srOrganizationRefs.map(ref => { - return ref?.reference?.split('/')[1] - }) - - const uniqueOrgIds = Array.from(new Set(srOrgIds)) - - if (uniqueOrgIds.length != 1 || !locationId) { - logger.error( - `Wrong number of ordering Organizations and Locations in this bundle:\n${JSON.stringify( - uniqueOrgIds, - )}\n${JSON.stringify(locationId)}`, - ) - } - - const orderingLocation = ( - this.getBundleEntry(bundle.entry, 'Location', locationId) - ) - const orderingOrganization = ( - this.getBundleEntry(bundle.entry, 'Organization', uniqueOrgIds[0]) - ) - - if (orderingLocation && orderingOrganization) { - if ( - !orderingLocation.managingOrganization || - orderingLocation.managingOrganization.reference?.split('/')[1] != - orderingOrganization.id - ) { - logger.error('Ordering Organization is not the managing Organziation of Location!') - } - - mappedLocation = await this.translateLocation(orderingLocation) - mappedOrganization = { - resourceType: 'Organization', - id: crypto - .createHash('md5') - .update('Organization/' + mappedLocation.name) - .digest('hex'), - identifier: mappedLocation.identifier, - name: mappedLocation.name, - } - - const mappedLocationRef: R4.IReference = { - reference: `Location/${mappedLocation.id}`, - } - const mappedOrganizationRef: R4.IReference = { - reference: `Organization/${mappedOrganization.id}`, - } - - mappedLocation.managingOrganization = mappedOrganizationRef - - if (mappedLocation && mappedLocation.id) { - task.location = mappedLocationRef - - bundle.entry.push({ - resource: mappedLocation, - request: { - method: R4.Bundle_RequestMethodKind._put, - url: mappedLocationRef.reference, - }, - }) - } - if (mappedOrganization && mappedOrganization.id) { - task.owner = mappedOrganizationRef - bundle.entry.push({ - resource: mappedOrganization, - request: { - method: R4.Bundle_RequestMethodKind._put, - url: mappedOrganizationRef.reference, - }, - }) - for (const sr of srs) { - sr.performer - ? sr.performer.push(mappedOrganizationRef) - : (sr.performer = [mappedOrganizationRef]) - } - } - } - } - } catch (e) { - logger.error(e) - } - - return bundle - } - - private static getBundleEntry( - entries: IBundle_Entry[], - type: string, - id?: string, - ): R4.IResource | undefined { - const entry = entries.find(entry => { - return ( - entry.resource && entry.resource.resourceType == type && (!id || entry.resource.id == id) - ) - }) - - return entry?.resource - } - - private static getBundleEntries( - entries: IBundle_Entry[], - type: string, - id?: string, - ): (R4.IResource | undefined)[] { - return entries - .filter(entry => { - return ( - entry.resource && entry.resource.resourceType == type && (!id || entry.resource.id == id) - ) - }) - .map(entry => { - return entry.resource - }) - } - - static async translateCoding(sr: R4.IServiceRequest): Promise { - let ipmsCoding, cielCoding, loincCoding, pimsCoding - - try { - if (sr && sr.code && sr.code.coding && sr.code.coding.length > 0) { - pimsCoding = this.getCoding(sr, config.get('bwConfig:pimsSystemUrl')) - cielCoding = this.getCoding(sr, config.get('bwConfig:cielSystemUrl')) - - logger.info(`PIMS Coding: ${JSON.stringify(pimsCoding)}`) - logger.info(`CIEL Coding: ${JSON.stringify(cielCoding)}`) - - if (pimsCoding && pimsCoding.code) { - // Translate from PIMS to CIEL and IPMS - ipmsCoding = await this.getIpmsCode( - `/orgs/I-TECH-UW/sources/IPMSLAB/mappings?toConcept=${pimsCoding.code}&toConceptSource=PIMSLAB`, - pimsCoding.code, - ) - - if (ipmsCoding && ipmsCoding.code) { - cielCoding = await this.getMappedCode( - `/orgs/I-TECH-UW/sources/IPMSLAB/mappings/?toConceptSource=CIEL&fromConcept=${ipmsCoding.code}`, - ) - } - - if (cielCoding && cielCoding.code) { - sr.code.coding.push({ - system: config.get('bwConfig:cielSystemUrl'), - code: cielCoding.code, - display: cielCoding.display, - }) - } - } else if (cielCoding && cielCoding.code) { - // Translate from CIEL to IPMS - ipmsCoding = await this.getIpmsCode( - `/orgs/I-TECH-UW/sources/IPMSLAB/mappings?toConcept=${cielCoding.code}&toConceptSource=CIEL`, - cielCoding.code, - ) - } - - // Add IPMS Coding - if (ipmsCoding && ipmsCoding.code) { - const ipmsOrderTypeExt = { - url: config.get('bwConfig:ipmsOrderTypeSystemUrl'), - valueString: ipmsCoding.hl7Flag, - } - - const srCoding = { - system: config.get('bwConfig:ipmsSystemUrl'), - code: ipmsCoding.mnemonic, - display: ipmsCoding.display, - extension: [ipmsOrderTypeExt], - } - - sr.code.coding.push(srCoding) - } - - // Get LOINC Coding - if (cielCoding && cielCoding.code) { - loincCoding = await this.getMappedCode( - `/orgs/CIEL/sources/CIEL/mappings/?toConceptSource=LOINC&fromConcept=${cielCoding.code}`, - ) - if (loincCoding && loincCoding.code) { - sr.code.coding.push({ - system: config.get('bwConfig:loincSystemUrl'), - code: loincCoding.code, - display: loincCoding.display, - }) - } - } - - return sr - } else { - logger.error('Could not find coding to translate in:\n' + JSON.stringify(sr)) - return sr - } - } catch (e) { - logger.error(`Could not translate ServiceRequest codings: \n ${e}`) - return sr - } - } - - /** - * @param location - * @returns R4.ILocation - */ - static async translateLocation(location: R4.ILocation): Promise { - logger.info('Translating Location Data') - - const returnLocation: R4.ILocation = { - resourceType: 'Location', - } - const mappings = await facilityMappings - let targetMapping - logger.info('Facility mappings: ' + mappings.length) - - for (const mapping of mappings) { - if (mapping.orderingFacility == location.name) { - targetMapping = mapping - } - } - - if (targetMapping) { - logger.info( - "Mapped location '" + location.name + "' to '" + targetMapping.orderingFacility + "'", - ) - returnLocation.id = crypto - .createHash('md5') - .update('Organization/' + returnLocation.name) - .digest('hex') - returnLocation.identifier = [ - { - system: config.get('bwConfig:ipmsCodeSystemUrl'), - value: targetMapping.receivingFacility, - }, - ] - - returnLocation.name = targetMapping.receivingFacility - returnLocation.extension = [] - returnLocation.extension.push({ - url: config.get('bwConfig:ipmsProviderSystemUrl'), - valueString: targetMapping.provider, - }) - returnLocation.extension.push({ - url: config.get('bwConfig:ipmsPatientTypeSystemUrl'), - valueString: targetMapping.patientType, - }) - returnLocation.extension.push({ - url: config.get('bwConfig:ipmsPatientStatusSystemUrl'), - valueString: targetMapping.patientStatus, - }) - returnLocation.extension.push({ - url: config.get('bwConfig:ipmsXLocationSystemUrl'), - valueString: targetMapping.xLocation, - }) - } else { - logger.error('Could not find a location mapping for:\n' + JSON.stringify(location.name)) - } - - logger.info(`Translated Location:\n${JSON.stringify(returnLocation)}`) - return returnLocation - } - - /** - * - * @param labBundle - * @returns - */ - public static async mapConcepts(labBundle: R4.IBundle): Promise { - logger.info('Mapping Concepts!') - - labBundle = await LabWorkflowsBw.addBwCodings(labBundle) - - const response: R4.IBundle = await saveBundle(labBundle) - - await this.sendPayload({ bundle: labBundle }, topicList.MAP_LOCATIONS) - - return response - } - - /** - * - * @param labBundle - * @returns - */ - public static async mapLocations(labBundle: R4.IBundle): Promise { - logger.info('Mapping Locations!') - - labBundle = await LabWorkflowsBw.addBwLocations(labBundle) - const response: R4.IBundle = await saveBundle(labBundle) - - await this.sendPayload({ bundle: labBundle }, topicList.SAVE_PIMS_PATIENT) - await this.sendPayload({ bundle: labBundle }, topicList.SEND_ADT_TO_IPMS) - - logger.debug(`Response: ${JSON.stringify(response)}`) - return response - } - - /** - * - * @param labBundle - * @returns - */ - public static async savePimsPatient(labBundle: R4.IBundle): Promise { - const resultBundle = this.updateCrPatient(labBundle) - - return resultBundle - } - - /** - * - * @param labBundle - * @returns - */ - public static async saveIpmsPatient(registrationBundle: R4.IBundle): Promise { - // Save to CR - const resultBundle = this.updateCrPatient(registrationBundle) - - // Handle order entry - this.handleAdtFromIpms(registrationBundle) - - return resultBundle - } - - /** - * updateCrPatient - * @param labBundle - * @returns - */ - public static async updateCrPatient(bundle: R4.IBundle): Promise { - const crUrl = `${config.get('clientRegistryUrl')}/Patient` - let pat: IPatient - - const patResult = bundle.entry!.find(entry => { - return entry.resource && entry.resource.resourceType == 'Patient' - }) - - const options = { - timeout: config.get('bwConfig:requestTimeout'), - username: config.get('mediator:client:username'), - password: config.get('mediator:client:password'), - json: {}, - } - - if (patResult) { - pat = patResult.resource! - options.json = pat - } - - const crResult = await got.post(`${crUrl}`, options).json() - - logger.debug(`CR Patient Update Result: ${JSON.stringify(crResult)}`) - - return bundle - } - - /** - * IPMS Order Creation Workflow - * @param val - * @returns - */ - public static async sendAdtToIpms(labBundle: R4.IBundle): Promise { - const status = this.getTaskStatus(labBundle) - - if (status && status === TaskStatusKind._requested) { - logger.info('Sending ADT message to IPMS!') - - const sender = new Hl7MllpSender( - config.get('bwConfig:mllp:targetIp'), - config.get('bwConfig:mllp:targetAdtPort'), - ) - - const adtMessage = await Hl7WorkflowsBw.getFhirTranslation( - labBundle, - config.get('bwConfig:toIpmsAdtTemplate'), - ) - - logger.info(`adt:\n${adtMessage}`) - - const adtResult: string = await sender.send(adtMessage) - - if (adtResult.includes && adtResult.includes('AA')) { - labBundle = this.setTaskStatus(labBundle, R4.TaskStatusKind._accepted) - } - } else { - logger.info('Order not ready for IPMS.') - } - return labBundle - } - - public static async sendOrmToIpms(bundles: any): Promise { - const srBundle: IBundle = { resourceType: 'Bundle', entry: [] } - let labBundle = bundles.taskBundle - const patient = bundles.patient - - // logger.info(`task bundle:\n${JSON.stringify(bundles.taskBundle)}\npatient:\n${JSON.stringify(bundles.patient)}`) - - try { - // Replace PIMS/OpenMRS Patient Resource with one From IPMS Lab System - const pindex = labBundle.entry!.findIndex((entry: any) => { - return entry.resource && entry.resource.resourceType == 'Patient' - }) - - labBundle.entry[pindex].resource = patient - - const options = { - timeout: config.get('bwConfig:requestTimeout'), - searchParams: {}, - } - - const sendBundle = { ...labBundle } - sendBundle.entry = [] - srBundle.entry = [] - - // Compile sendBundle.entry from labBundle - // TODO: Outline Logic for mapping between Panels and sending multiple tests - for (const entry of labBundle.entry) { - // Isolate and process ServiceRequests - if (entry.resource && entry.resource.resourceType == 'ServiceRequest') { - // For PIMS - check if service request is profile-level and get child service requests: - options.searchParams = { - 'based-on': entry.resource.id, - } - - const fetchedBundle = ( - await got.get(`${config.get('fhirServer:baseURL')}/ServiceRequest`, options).json() - ) - - if (fetchedBundle && fetchedBundle.entry && srBundle.entry) { - // Add child ServiceRequests if any exist - srBundle.entry = srBundle.entry.concat(fetchedBundle.entry) - } else if ( - (!fetchedBundle || !(fetchedBundle.entry && fetchedBundle.entry.length > 0)) && - srBundle.entry - ) { - // If no child ServiceRequests, add this one if it has a code entry - if ( - entry.resource.code && - entry.resource.code.coding && - entry.resource.code.coding.length > 0 - ) { - srBundle.entry.push(entry) - } - } - } else { - // Copy over everything else - sendBundle.entry.push(entry) - } - } - - // Send one ORM for each ServiceRequest - // TODO: FIGURE OUT MANAGEMENT OF PANELS/PROFILES - for (const sr of srBundle.entry) { - // Send one ORM for each ServiceRequest - const outBundle = { ...sendBundle } - outBundle.entry.push(sr) - - const ormMessage = await Hl7WorkflowsBw.getFhirTranslation( - outBundle, - config.get('bwConfig:toIpmsOrmTemplate'), - ) - - const sender = new Hl7MllpSender( - config.get('bwConfig:mllp:targetIp'), - config.get('bwConfig:mllp:targetOrmPort'), - ) - - logger.info('Sending ORM message to IPMS!') - - logger.info(`orm:\n${ormMessage}\n`) - - if (ormMessage && ormMessage != '') { - const result: any = await sender.send(ormMessage) - if (result.includes('AA')) { - labBundle = this.setTaskStatus(labBundle, R4.TaskStatusKind._inProgress) - } - logger.info(`*result:\n${result}\n`) - } - } - } catch (e) { - logger.error(e) - } - return labBundle - } - - public static async handleAdtFromIpms(registrationBundle: R4.IBundle): Promise { - try { - const options = { - timeout: config.get('bwConfig:requestTimeout'), - searchParams: {}, - } - - let patient: IPatient, omang: string - const patEntry = registrationBundle.entry!.find(entry => { - return entry.resource && entry.resource.resourceType == 'Patient' - }) - - if (patEntry && patEntry.resource) { - patient = patEntry.resource - - const omangEntry = patient.identifier?.find( - i => i.system && i.system == config.get('bwConfig:omangSystemUrl'), - ) - - if (omangEntry) { - omang = omangEntry.value! - } else { - logger.error( - 'Missing Omang - currently, only matching on Omang supported, but patient does not have an Omang number.', - ) - return registrationBundle - } - - // Find all patients with this Omang. - options.searchParams = { - identifier: `${config.get('bwConfig:omangSystemUrl')}|${omang}`, - _revinclude: 'Task:patient', - } - - let patientTasks: IBundle - try { - patientTasks = await got - .get(`${config.get('fhirServer:baseURL')}/Patient`, options) - .json() - } catch (e) { - patientTasks = { resourceType: 'Bundle' } - logger.error(e) - } - - if (patientTasks && patientTasks.entry) { - // Get all Tasks with `requested` status - for (const e of patientTasks.entry!) { - if ( - e.resource && - e.resource.resourceType == 'Task' && - e.resource.status == TaskStatusKind._requested - ) { - // Grab bundle for task: - options.searchParams = { - _include: '*', - _id: e.resource.id, - } - - const taskBundle: IBundle = await got - .get(`${config.get('fhirServer:baseURL')}/Task`, options) - .json() - - await this.sendPayload({ taskBundle: taskBundle, patient: patient }, topicList.SEND_ORM_TO_IPMS) - } - } - } - } - } catch (e) { - logger.error(e) - } - - // let obrMessage = await Hl7WorkflowsBw.getFhirTranslation(labBundle, 'OBR.hbs') - - // let obrResult = await sender.send(obrMessage) - - // logger.info(`obr:\n${obrMessage}\nres:\n${obrResult}`) - - // let response: R4.IBundle = await saveLabBundle(labBundle) - - return registrationBundle - } - - public static async handleOruFromIpms(translatedBundle: R4.IBundle): Promise { - // Get Patient By Omang - - // Get ServiceRequests by status and code - - // Match Results to Service Requests - try { - if (translatedBundle && translatedBundle.entry) { - const patient: IPatient = ( - translatedBundle.entry.find(e => e.resource && e.resource.resourceType == 'Patient')! - .resource! - ) - - const dr: IDiagnosticReport = ( - translatedBundle.entry.find( - e => e.resource && e.resource.resourceType == 'DiagnosticReport', - )!.resource! - ) - - const obs: IObservation = ( - translatedBundle.entry.find(e => e.resource && e.resource.resourceType == 'Observation')! - .resource! - ) - const drCode = - dr.code && dr.code.coding && dr.code.coding.length > 0 ? dr.code.coding[0].code : '' - - let omang - const omangEntry = patient.identifier?.find( - i => i.system && i.system == config.get('bwConfig:omangSystemUrl'), - ) - - if (omangEntry) { - omang = omangEntry.value! - } else { - omang = '' - } - - const options = { - timeout: config.get('bwConfig:requestTimeout'), - searchParams: {}, - } - - // Find all active service requests with dr code with this Omang. - options.searchParams = { - identifier: `${config.get('bwConfig:omangSystemUrl')}|${omang}`, - _revinclude: ['ServiceRequest:patient', 'Task:patient'], - } - - const patientBundle = ( - await got - .get( - `${config.get('fhirServer:baseURL')}/Patient/identifier=${config.get( - 'bwConfig:omangSystemUrl', - )}|${omang}&_revinclude=Task:patient&_revinclude=ServiceRequest:patient`, - ) - .json() - ) - - if (patientBundle && patientBundle.entry && patientBundle.entry.length > 0) { - const candidates: IServiceRequest[] = patientBundle.entry - .filter( - e => - e.resource && - e.resource.resourceType == 'ServiceRequest' && - e.resource.status && - e.resource.status == 'active' && - e.resource.code && - e.resource.code.coding && - e.resource.code.coding.length > 0, - ) - .map(e => e.resource) - - const primaryCandidate: IServiceRequest | undefined = candidates.find(c => { - if (c && c.code && c.code.coding) { - const candidateCode = c.code.coding.find( - co => co.system == config.get('bwConfig:ipmsSystemUrl'), - ) - return candidateCode && candidateCode.code == drCode - } - return false - }) - - // Update DR based on primary candidate details - // Update Obs based on primary candidate details - if (primaryCandidate && primaryCandidate.code && primaryCandidate.code.coding) { - if (dr.code && dr.code.coding) - dr.code.coding = dr.code.coding.concat(primaryCandidate.code.coding) - if (obs.code && obs.code.coding) - obs.code.coding = obs.code.coding.concat(primaryCandidate.code.coding) - - const srRef: IReference = {} - srRef.type = 'ServiceRequest' - srRef.reference = 'ServiceRequest/' + primaryCandidate.id - - dr.basedOn = [srRef] - obs.basedOn = [srRef] - } - } - - // TODO: Only send if valid details available - const sendBundle: R4.IBundle = { - resourceType: 'Bundle', - type: BundleTypeKind._transaction, - entry: [ - { - resource: patient, - request: { method: Bundle_RequestMethodKind._put, url: 'Patient/' + patient.id }, - }, - { - resource: dr, - request: { method: Bundle_RequestMethodKind._put, url: 'DiagnosticReport/' + dr.id }, - }, - { - resource: obs, - request: { method: Bundle_RequestMethodKind._put, url: 'Observation/' + obs.id }, - }, - ], - } - - // Save to SHR - const resultBundle: R4.IBundle = await saveBundle(sendBundle) - return resultBundle - } - } catch (error) { - logger.error(`Could not process ORU!\n${error}`) - } - - return translatedBundle - } - - public static getTaskStatus(labBundle: R4.IBundle): R4.TaskStatusKind | undefined { - let taskResult, task - - try { - taskResult = labBundle.entry!.find(entry => { - return entry.resource && entry.resource.resourceType == 'Task' - }) - - if (taskResult) { - task = taskResult.resource! - - return task.status! - } - } catch (error) { - logger.error(`Could not get Task status for task:\n${task}`) - return undefined - } - } - - public static setTaskStatus(labBundle: R4.IBundle, status: R4.TaskStatusKind): R4.IBundle { - let taskIndex, task - - try { - taskIndex = labBundle.entry!.findIndex(entry => { - return entry.resource && entry.resource.resourceType == 'Task' - }) - - if (labBundle.entry && labBundle.entry.length > 0 && taskIndex >= 0) { - (labBundle.entry[taskIndex].resource!).status = status - } - return labBundle - } catch (error) { - logger.error(`Could not get Task status for task:\n${task}`) - return labBundle - } - } - - private static getCoding(sr: R4.IServiceRequest, system: string): R4.ICoding { - if (sr.code && sr.code.coding) { - return sr.code.coding.find(e => e.system && e.system == system) - } else { - return {} - } - } - - - /** - * Sends a payload to a Kafka topic. - * @param payload - The payload to send. - * @param topic - The Kafka topic to send the payload to. - * @returns A Promise that resolves when the payload has been sent. - */ - public static async sendPayload(payload: any, topic: string) { - await this.initKafkaProducer(); - - const records: ProducerRecord[] = [ - { - topic: topic, - messages: [ - { key: 'body', value: JSON.stringify(payload) } - ], - }, - ]; - - try { - logger.info(`Sending payload to topic ${topic}: ${JSON.stringify(payload)}`); - await this.kafka.sendMessageTransactionally(records); - } catch (err) { - console.error('Failed to send message:', err); - } - } - - private static async getIpmsCode(q: string, c = '') { - try { - const ipmsMappings = await this.getOclMapping(q) - - //logger.info(`IPMS Mappings: ${JSON.stringify(ipmsMappings)}`) - - // Prioritize "Broader Than Mappings" - //TODO: Figure out if this is proper way to handle panels / broad to narrow - let mappingIndex = ipmsMappings.findIndex( - (x: any) => x.map_type == 'BROADER-THAN' && x.to_concept_code == c, - ) - - // Fall back to "SAME AS" - if (mappingIndex < 0) { - mappingIndex = ipmsMappings.findIndex( - (x: any) => x.map_type == 'SAME-AS' && x.to_concept_code == c, - ) - } - - if (mappingIndex >= 0) { - const ipmsCode = ipmsMappings[mappingIndex].from_concept_code - const ipmsDisplay = ipmsMappings[mappingIndex].from_concept_name_resolved - const ipmsCodingInfo: any = await this.getOclMapping( - `/orgs/I-TECH-UW/sources/IPMSLAB/concepts/${ipmsCode}`, - ) - // logger.info(`IPMS Coding Info: ${JSON.stringify(ipmsCodingInfo)}`) - let ipmsMnemonic, hl7Flag - if (ipmsCodingInfo) { - ipmsMnemonic = ipmsCodingInfo.names.find((x: any) => x.name_type == 'Short').name - hl7Flag = - ipmsCodingInfo.extras && ipmsCodingInfo.extras.IPMS_HL7_ORM_TYPE - ? ipmsCodingInfo.extras.IPMS_HL7_ORM_TYPE - : 'LAB' - } - - return { code: ipmsCode, display: ipmsDisplay, mnemonic: ipmsMnemonic, hl7Flag: hl7Flag } - } else { - return null - } - } catch (e) { - logger.error(e) - return null - } - } - - private static async getMappedCode(q: string): Promise { - try { - const codeMapping = await this.getOclMapping(q) - - //logger.info(`Code Mapping: ${JSON.stringify(codeMapping)}`) - - if (codeMapping && codeMapping.length > 0) { - return { - code: codeMapping[0].to_concept_code, - display: codeMapping[0].to_concept_name_resolved, - } - } else { - return {} - } - } catch (e) { - logger.error(e) - return {} - } - } - - private static async getOclMapping(queryString: string): Promise { - const options = { timeout: config.get('bwConfig:requestTimeout') | 1000 } - - logger.info(`${config.get('bwConfig:oclUrl')}${queryString}`) - - return got.get(`${config.get('bwConfig:oclUrl')}${queryString}`, options).json() - } - - private static async getOclConcept(conceptCode: string): Promise { - const options = { timeout: config.get('bwConfig:requestTimeout') | 1000 } - - return got - .get( - `${config.get('bwConfig:oclUrl')}/orgs/I-TECH-UW/sources/IPMSLAB/concepts/${conceptCode}`, - options, - ) - .json() - } -} -