Skip to content

Commit

Permalink
feat: draft of using subscriptions to manage outgoing requests to
Browse files Browse the repository at this point in the history
openmrs
  • Loading branch information
witash committed Jan 23, 2025
1 parent 539f03e commit 870061e
Show file tree
Hide file tree
Showing 9 changed files with 475 additions and 7 deletions.
8 changes: 5 additions & 3 deletions mediator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import organizationRoutes from './src/routes/organization';
import endpointRoutes from './src/routes/endpoint';
import chtRoutes from './src/routes/cht';
import openMRSRoutes from './src/routes/openmrs';
import { registerMediatorCallback } from './src/utils/openhim';
import { registerMediatorCallback, registerOpenMRSMediatorCallback } from './src/utils/openhim';
import os from 'os';

const {registerMediator} = require('openhim-mediator-utils');
Expand Down Expand Up @@ -41,14 +41,16 @@ app.use('/cht', chtRoutes);
app.use('/openmrs', openMRSRoutes);

if (process.env.NODE_ENV !== 'test') {
app.listen(PORT, () => logger.info(`Server listening on port ${PORT}`));
app.listen(PORT, () => {
logger.info(`Server listening on port ${PORT}`);
});

// TODO => inject the 'port' and 'http scheme' into 'mediatorConfig'
registerMediator(OPENHIM, mediatorConfig, registerMediatorCallback);

// if OPENMRS is specified, register its mediator
if (OPENMRS.url) {
registerMediator(OPENHIM, openMRSMediatorConfig, registerMediatorCallback);
registerMediator(OPENHIM, openMRSMediatorConfig, registerOpenMRSMediatorCallback);
}
}

Expand Down
11 changes: 9 additions & 2 deletions mediator/src/controllers/cht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
replaceReference,
updateFhirResource
} from '../utils/fhir';
import { chtEventEmitter, CHT_EVENTS } from '../utils/cht';
import {
buildFhirObservationFromCht,
buildFhirEncounterFromCht,
Expand All @@ -29,7 +30,11 @@ export async function createPatient(chtPatientDoc: any) {
}

const fhirPatient = buildFhirPatientFromCht(chtPatientDoc.doc);
return updateFhirResource({ ...fhirPatient, resourceType: 'Patient' });
const result = await updateFhirResource({ ...fhirPatient, resourceType: 'Patient' });
if (result.status === 200 || result.status === 201) {
chtEventEmitter.emit(CHT_EVENTS.PATIENT_CREATED, { patient: fhirPatient, result });
}
return result;
}

export async function updatePatientIds(chtFormDoc: any) {
Expand Down Expand Up @@ -83,5 +88,7 @@ export async function createEncounter(chtReport: any) {
createFhirResource(observation);
}

return { status: 200, data: {} };
const result = { status: 200, data: {} };
chtEventEmitter.emit(CHT_EVENTS.ENCOUNTER_CREATED, { encounter: fhirEncounter, result });
return result;
}
4 changes: 3 additions & 1 deletion mediator/src/mappers/cht.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getIdType, copyIdToNamedIdentifier } from '../utils/fhir';
import { getIdType, copyIdToNamedIdentifier, addSourceMeta } from '../utils/fhir';

export const chtDocumentIdentifierType: fhir4.CodeableConcept = {
text: 'CHT Document ID'
Expand Down Expand Up @@ -74,6 +74,7 @@ export function buildFhirPatientFromCht(chtPatient: any): fhir4.Patient {
};

copyIdToNamedIdentifier(patient, patient, chtDocumentIdentifierType);
addSourceMeta(patient, chtSource);

return patient;
}
Expand All @@ -98,6 +99,7 @@ export function buildFhirEncounterFromCht(chtReport: any): fhir4.Encounter {
}

copyIdToNamedIdentifier(encounter, encounter, chtDocumentIdentifierType);
addSourceMeta(encounter, chtSource);

return encounter
}
Expand Down
8 changes: 8 additions & 0 deletions mediator/src/utils/cht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import https from 'https';
import path from 'path';
import { buildChtPatientFromFhir, buildChtRecordFromObservations } from '../mappers/cht';
import { logger } from '../../logger';
import { EventEmitter } from 'events';

export const CHT_EVENTS = {
PATIENT_CREATED: 'patient:created',
ENCOUNTER_CREATED: 'encounter:created'
} as const;

export const chtEventEmitter = new EventEmitter();

type CouchDBQuery = {
selector: Record<string, any>;
Expand Down
57 changes: 57 additions & 0 deletions mediator/src/utils/fhir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Fhir } from 'fhir';
import { FHIR } from '../../config';
import axios from 'axios';
import { logger } from '../../logger';
import { EventEmitter } from 'events'

export const VALID_GENDERS = ['male', 'female', 'other', 'unknown'] as const;

Expand All @@ -15,6 +16,8 @@ const axiosOptions = {

const fhir = new Fhir();

export const fhirEventEmitter = new EventEmitter();

export function validateFhirResource(resourceType: string) {
return function wrapper(data: any) {
return fhir.validate({ ...data, resourceType });
Expand Down Expand Up @@ -161,6 +164,9 @@ export async function deleteFhirSubscription(id?: string) {
export async function createFhirResource(doc: fhir4.Resource) {
try {
const res = await axios.post(`${FHIR.url}/${doc.resourceType}`, doc, axiosOptions);
if (res?.status === 201 || res?.status === 200) {
fhirEventEmitter.emit('resourceCreated', doc);
}
return { status: res?.status, data: res?.data };
} catch (error: any) {
logger.error(error);
Expand All @@ -171,6 +177,9 @@ export async function createFhirResource(doc: fhir4.Resource) {
export async function updateFhirResource(doc: fhir4.Resource) {
try {
const res = await axios.put(`${FHIR.url}/${doc.resourceType}/${doc.id}`, doc, axiosOptions);
if (res?.status === 201 || res?.status === 200) {
fhirEventEmitter.emit('resourceUpdated', doc);
}
return { status: res?.status, data: res?.data };
} catch (error: any) {
logger.error(error);
Expand Down Expand Up @@ -261,3 +270,51 @@ export async function getFhirResourceByIdentifier(identifierValue: string, resou
return { status: error.response?.status, data: error.response?.data };
}
}

export async function getActiveSubscriptions(): Promise<fhir4.Subscription[]> {
try {
const res = await axios.get(
`${FHIR.url}/Subscription?status=active`,
axiosOptions
);
if (res.data.entry) {
return res.data.entry.map((entry: any) => entry.resource);
}
return [];
} catch (error: any) {
logger.error('Error fetching active subscriptions:', error);
return [];
}
}

export async function createSubscription(criteria: string, endpoint: string, headers?: string[]): Promise<fhir4.Subscription | null> {
try {
const subscription: fhir4.Subscription = {
resourceType: 'Subscription',
status: 'active',
reason: `Monitor resources matching: ${criteria}`,
criteria,
channel: {
type: 'rest-hook',
endpoint,
payload: 'application/fhir+json',
header: headers || ['Content-Type: application/fhir+json']
}
};

const res = await axios.post(
`${FHIR.url}/Subscription`,
subscription,
axiosOptions
);

if (res.status === 201 || res.status === 200) {
logger.info(`Created subscription for criteria: ${criteria}`);
return res.data;
}
return null;
} catch (error: any) {
logger.error('Error creating subscription:', error);
return null;
}
}
10 changes: 10 additions & 0 deletions mediator/src/utils/openhim.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
import { logger } from '../../logger';
import { registerOpenMRSListeners } from './openmrs-listener';

export const registerOpenMRSMediatorCallback = (err?: string): void => {
if (err) {
throw new Error(`OpenMRS Mediator Registration Failed: Reason ${err}`);
}

logger.info('Successfully registered OpenMRS mediator.');
registerOpenMRSListeners();
};

export const registerMediatorCallback = (err?: string): void => {
if (err) {
Expand Down
104 changes: 104 additions & 0 deletions mediator/src/utils/openmrs-listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { chtEventEmitter, CHT_EVENTS } from './cht';
import { sendPatientToOpenMRS, sendEncounterToOpenMRS } from './openmrs_sync';
import { logger } from '../../logger';
import { createSubscription, fhirEventEmitter } from './fhir';
import { OPENMRS } from '../../config';

// Store active listeners to allow for deregistration
type PatientListener = (patient: fhir4.Patient) => Promise<void>;
type EncounterListener = (data: { encounter: fhir4.Encounter, references: fhir4.Resource[] }) => Promise<void>;
const activeListeners: { [key: string]: PatientListener | EncounterListener } = {};

function registerPatientListener() {
if (activeListeners[CHT_EVENTS.PATIENT_CREATED]) {
return; // Already registered
}
const listener = async (patient: fhir4.Patient) => {
try {
await sendPatientToOpenMRS(patient);
} catch (error) {
logger.error(`Error sending patient to OpenMRS: ${error}`);
}
};
chtEventEmitter.on(CHT_EVENTS.PATIENT_CREATED, listener);
activeListeners[CHT_EVENTS.PATIENT_CREATED] = listener;
logger.info('Patient listener registered');
}

function registerEncounterListener() {
if (activeListeners[CHT_EVENTS.ENCOUNTER_CREATED]) {
return; // Already registered
}
const listener = async (data: {
encounter: fhir4.Encounter,
references: fhir4.Resource[]
}) => {
try {
await sendEncounterToOpenMRS(data.encounter, data.references);
} catch (error) {
logger.error(`Error sending encounter to OpenMRS: ${error}`);
}
};
chtEventEmitter.on(CHT_EVENTS.ENCOUNTER_CREATED, listener);
activeListeners[CHT_EVENTS.ENCOUNTER_CREATED] = listener;
logger.info('Encounter listener registered');
}

function deregisterListener(eventName: string) {
if (activeListeners[eventName]) {
chtEventEmitter.off(eventName, activeListeners[eventName]);
delete activeListeners[eventName];
logger.info(`Deregistered listener for ${eventName}`);
}
}

function handleActiveSubscription(subscription: fhir4.Subscription) {
if (subscription.criteria.startsWith('Patient')) {
registerPatientListener();
} else if (subscription.criteria.startsWith('Encounter')) {
registerEncounterListener();
}
}

// Handle subscription status changes
fhirEventEmitter.on('resourceCreated', (resource: fhir4.Resource) => {
if (resource.resourceType === 'Subscription' && (resource as fhir4.Subscription).status === 'active') {
handleActiveSubscription(resource as fhir4.Subscription);
}
});

fhirEventEmitter.on('resourceUpdated', (resource: fhir4.Resource) => {
if (resource.resourceType === 'Subscription') {
const subscription = resource as fhir4.Subscription;
if (subscription.status === 'active') {
handleActiveSubscription(subscription);
} else {
if (subscription.criteria.startsWith('Patient')) {
deregisterListener(CHT_EVENTS.PATIENT_CREATED);
} else if (subscription.criteria.startsWith('Encounter')) {
deregisterListener(CHT_EVENTS.ENCOUNTER_CREATED);
}
}
}
});

export async function registerOpenMRSListeners() {
try {
// Create subscriptions for patient and encounter events
await createSubscription(
'Patient?',
`${OPENMRS.url}/Patient/`,
['Content-Type: application/fhir+json']
);

await createSubscription(
'Encounter',
`${OPENMRS.url}/Encounter/`,
['Content-Type: application/fhir+json']
);

logger.info('OpenMRS FHIR subscriptions created successfully');
} catch (error) {
logger.error('Error creating OpenMRS FHIR subscriptions:', error);
}
}
2 changes: 1 addition & 1 deletion mediator/src/utils/openmrs_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async function sendPatientToFhir(patient: fhir4.Patient) {
Send a patient from CHT to OpenMRS
And update OpenMRS Id if successful
*/
async function sendPatientToOpenMRS(patient: fhir4.Patient) {
export async function sendPatientToOpenMRS(patient: fhir4.Patient) {
logger.info(`Sending Patient ${patient.id} to OpenMRS`);
const openMRSPatient = buildOpenMRSPatient(patient);
addSourceMeta(openMRSPatient, chtSource);
Expand Down
Loading

0 comments on commit 870061e

Please sign in to comment.