Skip to content

Commit

Permalink
feat: add openmrs listener and remove sync
Browse files Browse the repository at this point in the history
  • Loading branch information
witash committed Jan 23, 2025
1 parent 539f03e commit 4e47d9e
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 294 deletions.
8 changes: 5 additions & 3 deletions mediator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import organizationRoutes from './src/routes/organization';
import endpointRoutes from './src/routes/endpoint';
import chtRoutes from './src/routes/cht';
import openMRSRoutes from './src/routes/openmrs';
import { registerMediatorCallback } from './src/utils/openhim';
import { registerMediatorCallback, registerOpenMRSMediatorCallback } from './src/utils/openhim';
import os from 'os';

const {registerMediator} = require('openhim-mediator-utils');
Expand Down Expand Up @@ -41,14 +41,16 @@ app.use('/cht', chtRoutes);
app.use('/openmrs', openMRSRoutes);

if (process.env.NODE_ENV !== 'test') {
app.listen(PORT, () => logger.info(`Server listening on port ${PORT}`));
app.listen(PORT, () => {
logger.info(`Server listening on port ${PORT}`);
});

// TODO => inject the 'port' and 'http scheme' into 'mediatorConfig'
registerMediator(OPENHIM, mediatorConfig, registerMediatorCallback);

// if OPENMRS is specified, register its mediator
if (OPENMRS.url) {
registerMediator(OPENHIM, openMRSMediatorConfig, registerMediatorCallback);
registerMediator(OPENHIM, openMRSMediatorConfig, registerOpenMRSMediatorCallback);
}
}

Expand Down
16 changes: 13 additions & 3 deletions mediator/src/controllers/cht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
replaceReference,
updateFhirResource
} from '../utils/fhir';
import { chtEventEmitter, CHT_EVENTS } from '../utils/cht';
import {
buildFhirObservationFromCht,
buildFhirEncounterFromCht,
Expand All @@ -29,7 +30,11 @@ export async function createPatient(chtPatientDoc: any) {
}

const fhirPatient = buildFhirPatientFromCht(chtPatientDoc.doc);
return updateFhirResource({ ...fhirPatient, resourceType: 'Patient' });
const result = await updateFhirResource({ ...fhirPatient, resourceType: 'Patient' });
if (result.status === 200 || result.status === 201) {
chtEventEmitter.emit(CHT_EVENTS.PATIENT_CREATED, fhirPatient);
}
return result;
}

export async function updatePatientIds(chtFormDoc: any) {
Expand Down Expand Up @@ -59,6 +64,7 @@ export async function updatePatientIds(chtFormDoc: any) {

export async function createEncounter(chtReport: any) {
const fhirEncounter = buildFhirEncounterFromCht(chtReport);
const references: fhir4.Resource[] = [];

const patientResponse = await getFHIRPatientResource(chtReport.patient_uuid);
if (patientResponse.status != 200){
Expand All @@ -70,6 +76,7 @@ export async function createEncounter(chtReport: any) {
}

const patient = patientResponse.data.entry[0].resource as fhir4.Patient;
references.push(patient);
replaceReference(fhirEncounter, 'subject', patient);
const response = await updateFhirResource(fhirEncounter);

Expand All @@ -80,8 +87,11 @@ export async function createEncounter(chtReport: any) {

for (const entry of chtReport.observations) {
const observation = buildFhirObservationFromCht(chtReport.patient_uuid, fhirEncounter, entry);
createFhirResource(observation);
await createFhirResource(observation);
references.push(observation);
}

return { status: 200, data: {} };
const result = { status: 200, data: {} };
chtEventEmitter.emit(CHT_EVENTS.ENCOUNTER_CREATED, { encounter: fhirEncounter, references: references });
return result;
}
25 changes: 23 additions & 2 deletions mediator/src/controllers/openmrs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger } from '../../logger';
import { syncPatients, syncEncounters } from '../utils/openmrs_sync'
import { SYNC_PERIOD } from '../../config'
import { syncPatients, syncEncounters } from '../utils/openmrs_sync';
import { addListeners, removeListeners } from '../utils/openmrs-listener';
import { SYNC_PERIOD } from '../../config';

export async function sync() {
try {
Expand All @@ -16,3 +17,23 @@ export async function sync() {
return { status: 500, data: { message: `Error during OpenMRS Sync`} };
}
}

export async function startListeners() {
try {
addListeners();
return { status: 200, data: { message: 'OpenMRS listeners started successfully' } };
} catch (error: any) {
logger.error(error);
return { status: 500, data: { message: 'Error starting OpenMRS listeners' } };
}
}

export async function stopListeners() {
try {
removeListeners();
return { status: 200, data: { message: 'OpenMRS listeners stopped successfully' } };
} catch (error: any) {
logger.error(error);
return { status: 500, data: { message: 'Error stopping OpenMRS listeners' } };
}
}
111 changes: 111 additions & 0 deletions mediator/src/controllers/tests/openmrs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { sync, startListeners, stopListeners } from '../openmrs';
import * as openmrsSync from '../../utils/openmrs_sync';
import * as openmrsListener from '../../utils/openmrs-listener';
import { logger } from '../../../logger';
import { SYNC_PERIOD } from '../../../config';

jest.mock('../../../logger');
jest.mock('../../utils/openmrs_sync');
jest.mock('../../utils/openmrs-listener');

describe('OpenMRS Controller', () => {
beforeEach(() => {
jest.clearAllMocks();
});

describe('sync', () => {
it('syncs patients and encounters successfully', async () => {
const syncPatientsSpy = jest.spyOn(openmrsSync, 'syncPatients').mockResolvedValueOnce();
const syncEncountersSpy = jest.spyOn(openmrsSync, 'syncEncounters').mockResolvedValueOnce();

const result = await sync();

// Verify sync period calculation
const expectedStartTime = new Date(Date.now() - parseInt(SYNC_PERIOD, 10) * 1000);
expect(syncPatientsSpy).toHaveBeenCalledWith(expect.any(Date));
expect(syncEncountersSpy).toHaveBeenCalledWith(expect.any(Date));

// Verify the timestamps are within 1 second of expected
const patientCallTime = syncPatientsSpy.mock.calls[0][0] as Date;
const encounterCallTime = syncEncountersSpy.mock.calls[0][0] as Date;
expect(Math.abs(patientCallTime.getTime() - expectedStartTime.getTime())).toBeLessThan(1000);
expect(Math.abs(encounterCallTime.getTime() - expectedStartTime.getTime())).toBeLessThan(1000);

expect(result).toEqual({
status: 200,
data: { message: 'OpenMRS sync completed successfully' }
});
});

it('handles sync errors', async () => {
const error = new Error('Sync failed');
jest.spyOn(openmrsSync, 'syncPatients').mockRejectedValueOnce(error);

const result = await sync();

expect(logger.error).toHaveBeenCalledWith(error);
expect(result).toEqual({
status: 500,
data: { message: 'Error during OpenMRS Sync' }
});
});
});

describe('startListeners', () => {
it('starts listeners successfully', async () => {
const addListenersSpy = jest.spyOn(openmrsListener, 'addListeners');

const result = await startListeners();

expect(addListenersSpy).toHaveBeenCalled();
expect(result).toEqual({
status: 200,
data: { message: 'OpenMRS listeners started successfully' }
});
});

it('handles listener start errors', async () => {
const error = new Error('Failed to start listeners');
jest.spyOn(openmrsListener, 'addListeners').mockImplementationOnce(() => {
throw error;
});

const result = await startListeners();

expect(logger.error).toHaveBeenCalledWith(error);
expect(result).toEqual({
status: 500,
data: { message: 'Error starting OpenMRS listeners' }
});
});
});

describe('stopListeners', () => {
it('stops listeners successfully', async () => {
const removeListenersSpy = jest.spyOn(openmrsListener, 'removeListeners');

const result = await stopListeners();

expect(removeListenersSpy).toHaveBeenCalled();
expect(result).toEqual({
status: 200,
data: { message: 'OpenMRS listeners stopped successfully' }
});
});

it('handles listener stop errors', async () => {
const error = new Error('Failed to stop listeners');
jest.spyOn(openmrsListener, 'removeListeners').mockImplementationOnce(() => {
throw error;
});

const result = await stopListeners();

expect(logger.error).toHaveBeenCalledWith(error);
expect(result).toEqual({
status: 500,
data: { message: 'Error stopping OpenMRS listeners' }
});
});
});
});
12 changes: 11 additions & 1 deletion mediator/src/routes/openmrs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Router } from 'express';
import { requestHandler } from '../utils/request';
import { sync } from '../controllers/openmrs'
import { sync, startListeners, stopListeners } from '../controllers/openmrs';

const router = Router();

Expand All @@ -9,4 +9,14 @@ router.get(
requestHandler((req) => sync())
);

router.post(
'/listeners/start',
requestHandler((req) => startListeners())
);

router.post(
'/listeners/stop',
requestHandler((req) => stopListeners())
);

export default router;
8 changes: 8 additions & 0 deletions mediator/src/utils/cht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import https from 'https';
import path from 'path';
import { buildChtPatientFromFhir, buildChtRecordFromObservations } from '../mappers/cht';
import { logger } from '../../logger';
import { EventEmitter } from 'events';

export const CHT_EVENTS = {
PATIENT_CREATED: 'patient:created',
ENCOUNTER_CREATED: 'encounter:created'
} as const;

export const chtEventEmitter = new EventEmitter();

type CouchDBQuery = {
selector: Record<string, any>;
Expand Down
10 changes: 10 additions & 0 deletions mediator/src/utils/openhim.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
import { logger } from '../../logger';
import { addListeners } from './openmrs-listener';

export const registerOpenMRSMediatorCallback = (err?: string): void => {
if (err) {
throw new Error(`OpenMRS Mediator Registration Failed: Reason ${err}`);
}

logger.info('Successfully registered OpenMRS mediator.');
addListeners();
};

export const registerMediatorCallback = (err?: string): void => {
if (err) {
Expand Down
63 changes: 63 additions & 0 deletions mediator/src/utils/openmrs-listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { chtEventEmitter, CHT_EVENTS } from './cht';
import { sendPatientToOpenMRS, sendEncounterToOpenMRS } from './openmrs_sync';
import { logger } from '../../logger';

// Store active listeners to allow for deregistration
type PatientListener = (patient: fhir4.Patient) => Promise<void>;
type EncounterListener = (data: { encounter: fhir4.Encounter, references: fhir4.Resource[] }) => Promise<void>;
const activeListeners: { [key: string]: PatientListener | EncounterListener } = {};

function registerPatientListener() {
if (activeListeners[CHT_EVENTS.PATIENT_CREATED]) {
return; // Already registered
}
const listener = async (patient: fhir4.Patient) => {
try {
await sendPatientToOpenMRS(patient);
} catch (error) {
logger.error(`Error sending patient to OpenMRS: ${error}`);
}
};
chtEventEmitter.on(CHT_EVENTS.PATIENT_CREATED, listener);
activeListeners[CHT_EVENTS.PATIENT_CREATED] = listener;
logger.info('Patient listener registered');
}

function registerEncounterListener() {
if (activeListeners[CHT_EVENTS.ENCOUNTER_CREATED]) {
return; // Already registered
}
const listener = async (data: {
encounter: fhir4.Encounter,
references: fhir4.Resource[]
}) => {
try {
await sendEncounterToOpenMRS(data.encounter, data.references);
} catch (error) {
logger.error(`Error sending encounter to OpenMRS: ${error}`);
}
};
chtEventEmitter.on(CHT_EVENTS.ENCOUNTER_CREATED, listener);
activeListeners[CHT_EVENTS.ENCOUNTER_CREATED] = listener;
logger.info('Encounter listener registered');
}

function deregisterListener(eventName: string) {
if (activeListeners[eventName]) {
chtEventEmitter.off(eventName, activeListeners[eventName]);
delete activeListeners[eventName];
logger.info(`Deregistered listener for ${eventName}`);
}
}

export function addListeners() {
registerPatientListener();
registerEncounterListener();
logger.info('OpenMRS listeners added successfully');
}

export function removeListeners() {
deregisterListener(CHT_EVENTS.PATIENT_CREATED);
deregisterListener(CHT_EVENTS.ENCOUNTER_CREATED);
logger.info('OpenMRS listeners removed successfully');
}
Loading

0 comments on commit 4e47d9e

Please sign in to comment.