diff --git a/package-lock.json b/package-lock.json index ea9aacc82..c76e84c52 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "openhim-core", - "version": "8.2.0", + "version": "8.3.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 60178dbc7..3ff5d8a1d 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "openhim-core", "description": "The OpenHIM core application that provides logging and routing of http requests", - "version": "8.2.0", + "version": "8.3.0", "main": "./lib/server.js", "bin": { "openhim-core": "./bin/openhim-core.js" diff --git a/src/api/apps.js b/src/api/apps.js index 1e5b308f2..c2e3b041c 100644 --- a/src/api/apps.js +++ b/src/api/apps.js @@ -45,7 +45,9 @@ const createErrorResponse = (ctx, operation, error) => { const validateId = (ctx, id) => { if (!id.match(/^[0-9a-fA-F]{24}$/)) { ctx.statusCode = 400 - throw Error(`App id "${id}" is invalid. ObjectId should contain 24 characters`) + throw Error( + `App id "${id}" is invalid. ObjectId should contain 24 characters` + ) } } diff --git a/src/api/clients.js b/src/api/clients.js index 384118fbf..8a8bc2a16 100644 --- a/src/api/clients.js +++ b/src/api/clients.js @@ -102,15 +102,22 @@ export async function getClient(ctx, clientId, property) { return } - clientId = unescape(clientId) - try { - const result = await ClientModelAPI.findById( - clientId, - projectionRestriction - ) - .lean() - .exec() + let result + if (ctx?.query?.byNamedClientID === 'true') { + result = await ClientModelAPI.findOne( + {clientID: clientId}, + projectionRestriction + ) + .lean() + .exec() + } else { + clientId = unescape(clientId) + result = await ClientModelAPI.findById(clientId, projectionRestriction) + .lean() + .exec() + } + if (result === null) { utils.logAndSetResponse( ctx, diff --git a/src/api/tasks.js b/src/api/tasks.js index 740b891f0..538d0b6a0 100644 --- a/src/api/tasks.js +++ b/src/api/tasks.js @@ -216,13 +216,13 @@ export async function addTask(ctx) { ) // Clear the transactions out of the auto retry queue, in case they're in there - return AutoRetryModelAPI - .deleteMany({transactionID: {$in: transactions.tids}}) - .exec(err => { - if (err) { - return logger.error(err) - } - }) + return AutoRetryModelAPI.deleteMany({ + transactionID: {$in: transactions.tids} + }).exec(err => { + if (err) { + return logger.error(err) + } + }) } else { // rerun task creation not allowed utils.logAndSetResponse( diff --git a/src/middleware/messageStore.js b/src/middleware/messageStore.js index 827d3a35e..a67d2d9c3 100644 --- a/src/middleware/messageStore.js +++ b/src/middleware/messageStore.js @@ -8,7 +8,7 @@ import * as metrics from '../metrics' import * as transactions from '../model/transactions' import * as utils from '../utils' -const { transactionStatus } = transactions +const {transactionStatus} = transactions function copyMapWithEscapedReservedCharacters(map) { const escapedMap = {} @@ -30,7 +30,9 @@ export function storeTransaction(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.header) const tx = new transactions.TransactionModel({ - status: transactionStatus.PROCESSING, + status: ctx?.matchingChannel?.isAsynchronousProcess + ? transactionStatus.PENDING_ASYNC + : transactionStatus.PROCESSING, clientID: ctx.authenticated != null ? ctx.authenticated._id : undefined, channelID: ctx.authorisedChannel._id, clientIP: ctx.ip, @@ -87,9 +89,18 @@ export function storeTransaction(ctx, done) { export function storeResponse(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.response.header) + const isAsynchronousProcess = + Boolean(ctx?.matchingChannel?.isAsynchronousProcess) || + Boolean(ctx?.authorisedChannel?.isAsynchronousProcess) + const status = + isAsynchronousProcess && ctx.response.status < 400 + ? 202 + : ctx.response.status + // if the channel is asynchronous and the response is successful, change the status to 202 otherwise use the original status + ctx.response.status = status const res = { - status: ctx.response.status, + status, headers, body: !ctx.response.body ? '' : ctx.response.body.toString(), timestamp: ctx.response.timestamp @@ -202,6 +213,10 @@ export function storeNonPrimaryResponse(ctx, route, done) { * This should only be called once all routes have responded. */ export function setFinalStatus(ctx, callback) { + const isAsynchronousProcess = + Boolean(ctx?.matchingChannel?.isAsynchronousProcess) || + Boolean(ctx?.authorisedChannel?.isAsynchronousProcess) + let transactionId = '' if ( ctx.request != null && @@ -253,7 +268,11 @@ export function setFinalStatus(ctx, callback) { ctx.response.status <= 299 && routeSuccess ) { - tx.status = transactionStatus.SUCCESSFUL + if (isAsynchronousProcess) { + tx.status = transactionStatus.PENDING_ASYNC + } else { + tx.status = transactionStatus.SUCCESSFUL + } } if ( ctx.response.status >= 400 && diff --git a/src/middleware/router.js b/src/middleware/router.js index 75c431529..1633c936d 100644 --- a/src/middleware/router.js +++ b/src/middleware/router.js @@ -656,6 +656,7 @@ function sendKafkaRequest(ctx, route) { const message = { method: ctx.request.method, path: ctx.request.url, + pattern: channel.urlPattern, headers: ctx.request.headers, body: ctx.body && ctx.body.toString() } @@ -669,7 +670,8 @@ function sendKafkaRequest(ctx, route) { resolve({ status: 200, body: JSON.stringify(res), - timestamp: +new Date() + timestamp: +new Date(), + headers: {} }) }) }) diff --git a/src/middleware/sessionStore.js b/src/middleware/sessionStore.js index 886eaab4f..f683facd9 100644 --- a/src/middleware/sessionStore.js +++ b/src/middleware/sessionStore.js @@ -41,7 +41,10 @@ class MongooseStore { async set(id, data) { const {session} = this const record = {_id: id, data, updatedAt: new Date()} - await session.findByIdAndUpdate(id, record, {upsert: true, writeConcern: {w: "majority", wtimeout: 10000}}) + await session.findByIdAndUpdate(id, record, { + upsert: true, + writeConcern: {w: 'majority', wtimeout: 10000} + }) return data } diff --git a/src/model/channels.js b/src/model/channels.js index 7eb247244..1b373cb39 100644 --- a/src/model/channels.js +++ b/src/model/channels.js @@ -112,6 +112,10 @@ const ChannelDef = { type: String, required: true }, + isAsynchronousProcess: { + type: Boolean, + default: false + }, maxBodyAgeDays: { type: Number, min: 1, diff --git a/src/model/tasks.js b/src/model/tasks.js index 805c447c0..f6adc3df1 100644 --- a/src/model/tasks.js +++ b/src/model/tasks.js @@ -8,7 +8,7 @@ const TaskSchema = new Schema({ status: { type: String, required: true, - enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed'], + enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed', "Pending Async"], default: 'Queued', index: true }, diff --git a/src/model/transactions.js b/src/model/transactions.js index 9b21fc584..5eee1f192 100644 --- a/src/model/transactions.js +++ b/src/model/transactions.js @@ -6,6 +6,7 @@ import {connectionAPI, connectionDefault} from '../config' export const transactionStatus = { PROCESSING: 'Processing', + PENDING_ASYNC: 'Pending Async', SUCCESSFUL: 'Successful', COMPLETED: 'Completed', COMPLETED_W_ERR: 'Completed with error(s)', @@ -125,21 +126,23 @@ export const TransactionModel = connectionDefault.model( * Resolve a transaction stuck in the processing state * * If OpenHIM crashes with an inflight transaction, that transaction's status will stay in processing - * So we run this function at start up and set all those transactions to failed + * So we run this function at start up and set all those transactions to failed * */ export const resolveStuckProcessingState = async () => { - TransactionModelAPI.find({ status: transactionStatus.PROCESSING }) - .cursor() - .on('data', async (transaction) => { - try { - if (transaction.$isEmpty('response') && transaction.$isEmpty('error')) - TransactionModelAPI.findByIdAndUpdate(transaction.id, { - status: transactionStatus.FAILED, - error: { message: 'OpenHIM crashed while still waiting for a response' }, - }).exec() - } catch (err) { - console.error(`Error updating transaction stuck in processing: ${err}`) - } - }) + TransactionModelAPI.find({status: transactionStatus.PROCESSING}) + .cursor() + .on('data', async transaction => { + try { + if (transaction.$isEmpty('response') && transaction.$isEmpty('error')) + TransactionModelAPI.findByIdAndUpdate(transaction.id, { + status: transactionStatus.FAILED, + error: { + message: 'OpenHIM crashed while still waiting for a response' + } + }).exec() + } catch (err) { + console.error(`Error updating transaction stuck in processing: ${err}`) + } + }) } diff --git a/src/server.js b/src/server.js index 334832abf..1f2c92ec8 100644 --- a/src/server.js +++ b/src/server.js @@ -37,7 +37,7 @@ import * as upgradeDB from './upgradeDB' import {KeystoreModel} from './model/keystore' import {UserModel, createUser, updateTokenUser} from './model/users' import {appRoot, config, connectionAgenda} from './config' -import { resolveStuckProcessingState } from './model/transactions' +import {resolveStuckProcessingState} from './model/transactions' mongoose.Promise = Promise @@ -888,7 +888,7 @@ if (cluster.isMaster && !module.parent) { return Promise.all(promises) .then(() => { resolveStuckProcessingState() - + let audit = atna.construct.appActivityAudit( true, himSourceID, diff --git a/src/tasks.js b/src/tasks.js index 3d188b402..5486bd342 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -28,11 +28,19 @@ export async function findAndProcessAQueuedTask() { {status: 'Processing'}, {new: true} ) + if (task != null) { activeTasks++ await processNextTaskRound(task) activeTasks-- } + + const asyncTasks = await TaskModel.find({status: 'Pending Async'}); + + asyncTasks.forEach(async task => { + await checkAsyncTaskStatus(task); + }) + } catch (err) { if (task == null) { logger.error(`An error occurred while looking for rerun tasks: ${err}`) @@ -45,6 +53,34 @@ export async function findAndProcessAQueuedTask() { } } +async function checkAsyncTaskStatus(task) { + const pendingAsyncTransactions = task.transactions.filter(transaction => transaction.rerunStatus === 'Pending Async'); + + let remainingAsyncTransactions = pendingAsyncTransactions.length; + + pendingAsyncTransactions.forEach(async transaction => { + const currentTransactionStatus = await TransactionModel.findById(transaction.rerunID); + + if (["Successful", "Completed with error(s)", "Failed"].includes(currentTransactionStatus.status)) { + transaction.tstatus = 'Completed'; + transaction.rerunStatus = currentTransactionStatus.status; + await task.save(); + remainingAsyncTransactions--; + } + }); + + + if (remainingAsyncTransactions === 0){ + task.status = 'Completed'; + task.completedDate = new Date(); + await task.save() + logger.info(`Async task ${task._id} completed`); + } + + return; + +} + function rerunTaskProcessor() { if (live) { findAndProcessAQueuedTask() @@ -139,6 +175,8 @@ async function processNextTaskRound(task) { return } + let taskHasAsyncTransactions = false + const promises = transactions.map(transaction => { task.remainingTransactions-- @@ -158,7 +196,11 @@ async function processNextTaskRound(task) { logger.error( `An error occurred while rerunning transaction ${transaction.tid} for task ${task._id}: ${err}` ) - } else { + }else if(response.status === 202){ + transaction.tstatus = 'Processing' + taskHasAsyncTransactions = true + } + else { transaction.tstatus = 'Completed' } return resolve() @@ -177,8 +219,8 @@ async function processNextTaskRound(task) { if (task.remainingTransactions) { await processNextTaskRound(task) } else { - task.status = 'Completed' - task.completedDate = new Date() + task.status = taskHasAsyncTransactions ? 'Pending Async' : 'Completed' + task.completedDate = taskHasAsyncTransactions ? null : new Date() logger.info(`Round completed for rerun task #${task._id} - Task completed`) await task.save().catch(err => { diff --git a/test/integration/channelsAPITests.js b/test/integration/channelsAPITests.js index f0b0664fa..fdaedfc10 100644 --- a/test/integration/channelsAPITests.js +++ b/test/integration/channelsAPITests.js @@ -859,7 +859,8 @@ describe('API Integration Tests', () => { convertedPatch._id = convertedPatch._id.toString() convertedPatch.ref = convertedPatch.ref.toString() convertedPatch.date = convertedPatch.date.toISOString() - convertedPatch.updatedBy._id = convertedPatch.updatedBy._id.toString() + convertedPatch.updatedBy._id = + convertedPatch.updatedBy._id.toString() return convertedPatch }) }) diff --git a/test/integration/clientsAPITests.js b/test/integration/clientsAPITests.js index 37f9a92f9..58a53b1a7 100644 --- a/test/integration/clientsAPITests.js +++ b/test/integration/clientsAPITests.js @@ -162,6 +162,46 @@ describe('API Integration Tests', () => { }) }) + describe('Getting a client ID by the named "clientID" field on the document in the database', () => { + const clientTest = { + clientID: 'testClient', + clientDomain: 'www.zedmusic-unique.co.zw', + name: 'OpenHIE NodeJs', + roles: ['test_role_PoC', 'monitoring'], + passwordHash: + '$2a$10$w8GyqInkl72LMIQNpMM/fenF6VsVukyya.c6fh/GRtrKq05C2.Zgy' + } + + let clientId + + beforeEach(async () => { + const client = await new ClientModelAPI(clientTest).save() + clientId = clientTest.clientID + }) + + it('should return the client ID if it exists', async () => { + const client = await new ClientModelAPI(testAppDoc).save() + const res = await request(BASE_URL) + .get(`/clients/${clientId}?byNamedClientID=true`) + .set('Cookie', rootCookie) + .expect(200) + }) + + it('should return 404 if the client ID does not exist', async () => { + await request(BASE_URL) + .get('/clients/nonExistentClientID?byNamedClientID=true') + .set('Cookie', rootCookie) + .expect(404) + }) + + it('should fail when sending clientID with "byNamedClientID" param set to false', async () => { + await request(BASE_URL) + .get(`/clients/${clientId}?byNamedClientID=false`) + .set('Cookie', rootCookie) + .expect(500) + }) + }) + describe('*getClient(_id)', () => { const clientTest = { clientID: 'testClient', diff --git a/test/integration/httpTests.js b/test/integration/httpTests.js index 78c8958e4..a18b23c99 100644 --- a/test/integration/httpTests.js +++ b/test/integration/httpTests.js @@ -223,13 +223,34 @@ describe('HTTP tests', () => { } }) + const asyncChannel = new ChannelModelAPI({ + name: 'TEST DATA - Mock async endpoint async', + urlPattern: '/async', + allow: ['PoC'], + methods: ['POST', 'PUT'], + routes: [ + { + name: 'test route async', + host: 'localhost', + port: constants.MEDIATOR_PORT, + primary: true + } + ], + isAsynchronousProcess: true, + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + }) + await Promise.all([ channel1.save(), channel2.save(), channel3.save(), channel4.save(), channel5.save(), - channel6.save() + channel6.save(), + asyncChannel.save() ]) const testAppDoc = { @@ -345,6 +366,15 @@ describe('HTTP tests', () => { .expect('content-encoding', 'gzip') .expect(testDoc) }) + + it('should return 202 CREATED on POST Request for Channel Set as Async Process - async', async () => { + await promisify(server.start)({httpPort: SERVER_PORTS.httpPort}) + await request(constants.HTTP_BASE_URL) + .post('/async') + .send(testDoc) + .auth('testApp', 'password') + .expect(202) + }) }) describe('HTTP body content matching - XML', () => { diff --git a/test/unit/contactTest.js b/test/unit/contactTest.js index 70f9e475c..b23d0ed78 100644 --- a/test/unit/contactTest.js +++ b/test/unit/contactTest.js @@ -164,7 +164,7 @@ describe('Contact Users', () => { describe('contactUser', () => { it('should throw if passed the incorrect method type', done => { - contact.contactUser('none', '', '', '', '', (err) => { + contact.contactUser('none', '', '', '', '', err => { should.exist(err) should.equal(err.message, "Unknown contact method 'none'") return done() @@ -173,11 +173,11 @@ describe('Contact Users', () => { it('should throw on sms send if sms provider is not listed', done => { config.smsGateway.provider = 'none' - contact.contactUser('sms', 'test', '', 'test message', '', (err) => { + contact.contactUser('sms', 'test', '', 'test message', '', err => { should.exist(err) should.equal(err.message, "Unknown SMS gateway provider 'none'") - config.smsGateway.provider = 'clickatell' + config.smsGateway.provider = 'clickatell' return done() }) }) diff --git a/test/unit/messageStoreTest.js b/test/unit/messageStoreTest.js index ed1990441..4b3091921 100644 --- a/test/unit/messageStoreTest.js +++ b/test/unit/messageStoreTest.js @@ -57,6 +57,26 @@ describe('MessageStore', () => { } } + const asyncChannel = { + name: 'TestAsyncChannel3', + urlPattern: 'test/sample-async', + allow: ['PoC', 'Test1', 'Test2'], + routes: [ + { + name: 'test route', + host: 'localhost', + port: 9876, + primary: true + } + ], + isAsynchronousProcess: true, + txViewAcl: 'group1', + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + } + const req = {} req.path = '/api/test/request' req.headers = { @@ -154,6 +174,19 @@ describe('MessageStore', () => { }) }) + it('should have a transaction with a status of "Pending Async" if the channel is asynchronous', done => { + ctx.path = '/api/test/sample-async' + ctx.matchingChannel = {isAsynchronousProcess: true} + + messageStore.storeTransaction(ctx, (error, result) => { + should.not.exist(error) + TransactionModel.findOne({_id: result._id}, (error, trans) => { + trans.status.should.equal('Pending Async') + return done() + }) + }) + }) + it('should be able to save the transaction if the headers contain Mongo reserved characters ($ or .)', done => { ctx.header['dot.header'] = '123' ctx.header.dollar$header = '124' diff --git a/test/unit/tasksTest.js b/test/unit/tasksTest.js index 7e5abfc8a..a13dd5fe8 100644 --- a/test/unit/tasksTest.js +++ b/test/unit/tasksTest.js @@ -377,10 +377,10 @@ describe('Rerun Task Tests', () => { await tasks.findAndProcessAQueuedTask().should.not.rejected() }) - it("should return without processing if active tasks is greater than max active", async () => { - const spy = sinon.spy(TaskModel, "findOneAndUpdate") + it('should return without processing if active tasks is greater than max active', async () => { + const spy = sinon.spy(TaskModel, 'findOneAndUpdate') config.rerun.activeConcurrentTasks = -1 - + await tasks.findAndProcessAQueuedTask() spy.called.should.be.false diff --git a/test/unit/transactionsTest.js b/test/unit/transactionsTest.js index 308e8c93d..677c00d4c 100644 --- a/test/unit/transactionsTest.js +++ b/test/unit/transactionsTest.js @@ -3,7 +3,11 @@ /* eslint-env mocha */ import * as transactions from '../../src/api/transactions' -import {TaskModel, TransactionModel, resolveStuckProcessingState} from '../../src/model' +import { + TaskModel, + TransactionModel, + resolveStuckProcessingState +} from '../../src/model' import {ObjectId} from 'mongodb' describe('calculateTransactionBodiesByteLength()', () => { @@ -102,7 +106,7 @@ describe('TransactionModel tests', () => { name: 'Test' } }) - + const validProcessingTransaction = Object.freeze({ status: 'Processing', request: { @@ -132,11 +136,11 @@ describe('TransactionModel tests', () => { name: 'Test' } }) - + beforeEach(async () => { await TransactionModel.deleteMany({}) }) - + afterEach(async () => { await TransactionModel.deleteMany({}) }) @@ -145,10 +149,14 @@ describe('TransactionModel tests', () => { await TransactionModel(midFlightTransaction).save() resolveStuckProcessingState() - await new Promise((resolve) => setTimeout(() => { resolve() }, 500)) - - const transactions = await TransactionModel.find({ status: 'Processing' }); - transactions.length.should.be.exactly(0); + await new Promise(resolve => + setTimeout(() => { + resolve() + }, 500) + ) + + const transactions = await TransactionModel.find({status: 'Processing'}) + transactions.length.should.be.exactly(0) }) it('should not update a transaction processing state if response or error set', async () => { @@ -156,10 +164,14 @@ describe('TransactionModel tests', () => { await TransactionModel(errorProcessingTransaction).save() resolveStuckProcessingState() - await new Promise((resolve) => setTimeout(() => { resolve() }, 500)) - - const transactions = await TransactionModel.find({ status: 'Processing' }); - transactions.length.should.be.exactly(2); + await new Promise(resolve => + setTimeout(() => { + resolve() + }, 500) + ) + + const transactions = await TransactionModel.find({status: 'Processing'}) + transactions.length.should.be.exactly(2) }) }) })