From 771548c2b766d2f9e75faecda4c3094665ca0555 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Wed, 6 Sep 2023 09:42:41 +0200 Subject: [PATCH 01/14] added Pending Async as status for transaction --- src/middleware/messageStore.js | 15 +++++++++++++-- src/model/channels.js | 5 +++++ src/model/transactions.js | 1 + 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/middleware/messageStore.js b/src/middleware/messageStore.js index 60c91869e..29c26ce09 100644 --- a/src/middleware/messageStore.js +++ b/src/middleware/messageStore.js @@ -10,6 +10,7 @@ import * as utils from '../utils' export const transactionStatus = { PROCESSING: 'Processing', + PENDING_ASYNC: 'Pending Async', SUCCESSFUL: 'Successful', COMPLETED: 'Completed', COMPLETED_W_ERR: 'Completed with error(s)', @@ -93,9 +94,13 @@ export function storeTransaction(ctx, done) { export function storeResponse(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.response.header) + + const status = ctx.matchingChannel.isAsynchronousProcess && ctx.response.status < 400 ? 202 : ctx.response.status; + // updates the status code that appears to for the client + ctx.response.status = status; const res = { - status: ctx.response.status, + status, headers, body: !ctx.response.body ? '' : ctx.response.body.toString(), timestamp: ctx.response.timestamp @@ -259,7 +264,11 @@ export function setFinalStatus(ctx, callback) { ctx.response.status <= 299 && routeSuccess ) { - tx.status = transactionStatus.SUCCESSFUL + if(ctx.matchingChannel.isAsynchronousProcess){ + tx.status = transactionStatus.PENDING_ASYNC + }else{ + tx.status = transactionStatus.SUCCESSFUL + } } if ( ctx.response.status >= 400 && @@ -279,6 +288,8 @@ export function setFinalStatus(ctx, callback) { logger.info(`Final status for transaction ${tx._id} : ${tx.status}`) update.status = tx.status + + } if (ctx.autoRetry != null) { diff --git a/src/model/channels.js b/src/model/channels.js index 7eb247244..500da52e7 100644 --- a/src/model/channels.js +++ b/src/model/channels.js @@ -112,6 +112,11 @@ const ChannelDef = { type: String, required: true }, + isAsynchronousProcess:{ + type: Boolean, + required: false, + default: false + }, maxBodyAgeDays: { type: Number, min: 1, diff --git a/src/model/transactions.js b/src/model/transactions.js index f1e2760b5..b75cfea8e 100644 --- a/src/model/transactions.js +++ b/src/model/transactions.js @@ -96,6 +96,7 @@ const TransactionSchema = new Schema({ required: true, enum: [ 'Processing', + 'Pending Async', 'Failed', 'Completed', 'Successful', From a3255a9bbe578c0d81e3a1dc74497edba9d560e1 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Tue, 19 Sep 2023 08:11:24 +0200 Subject: [PATCH 02/14] fx: typo and bug --- src/middleware/messageStore.js | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/middleware/messageStore.js b/src/middleware/messageStore.js index 29c26ce09..a5d206f33 100644 --- a/src/middleware/messageStore.js +++ b/src/middleware/messageStore.js @@ -37,7 +37,7 @@ export function storeTransaction(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.header) const tx = new transactions.TransactionModel({ - status: transactionStatus.PROCESSING, + status: ctx?.matchingChannel?.isAsynchronousProcess ? transactionStatus.PENDING_ASYNC:transactionStatus.PROCESSING , clientID: ctx.authenticated != null ? ctx.authenticated._id : undefined, channelID: ctx.authorisedChannel._id, clientIP: ctx.ip, @@ -95,9 +95,9 @@ export function storeTransaction(ctx, done) { export function storeResponse(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.response.header) - const status = ctx.matchingChannel.isAsynchronousProcess && ctx.response.status < 400 ? 202 : ctx.response.status; - // updates the status code that appears to for the client - ctx.response.status = status; + const status = ctx.matchingChannel?.isAsynchronousProcess && ctx.response.status < 400 ? 202 : ctx.response.status + // if the channel is asynchronous and the response is successful, change the status to 202 otherwise use the original status + ctx.response.status = status const res = { status, @@ -264,7 +264,7 @@ export function setFinalStatus(ctx, callback) { ctx.response.status <= 299 && routeSuccess ) { - if(ctx.matchingChannel.isAsynchronousProcess){ + if(ctx.matchingChannel?.isAsynchronousProcess){ tx.status = transactionStatus.PENDING_ASYNC }else{ tx.status = transactionStatus.SUCCESSFUL @@ -289,7 +289,6 @@ export function setFinalStatus(ctx, callback) { logger.info(`Final status for transaction ${tx._id} : ${tx.status}`) update.status = tx.status - } if (ctx.autoRetry != null) { From cc5bba7551823c9cf711e03c479f9e9cba6a0d31 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Tue, 19 Sep 2023 08:12:24 +0200 Subject: [PATCH 03/14] rm unnecessary line --- src/model/channels.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/model/channels.js b/src/model/channels.js index 500da52e7..e36d3ee2d 100644 --- a/src/model/channels.js +++ b/src/model/channels.js @@ -114,7 +114,6 @@ const ChannelDef = { }, isAsynchronousProcess:{ type: Boolean, - required: false, default: false }, maxBodyAgeDays: { From 33b6fa07c09528cd556d0c7910192b8c9f182efe Mon Sep 17 00:00:00 2001 From: brett-onions Date: Tue, 19 Sep 2023 08:12:53 +0200 Subject: [PATCH 04/14] adding unit and integration tests for new feature --- test/integration/httpTests.js | 33 ++++++++++++++++++++++++++++++++- test/unit/messageStoreTest.js | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/test/integration/httpTests.js b/test/integration/httpTests.js index 78c8958e4..6203ab76e 100644 --- a/test/integration/httpTests.js +++ b/test/integration/httpTests.js @@ -223,13 +223,34 @@ describe('HTTP tests', () => { } }) + const asyncChannel = new ChannelModelAPI({ + name: 'TEST DATA - Mock async endpoint async', + urlPattern: '/async', + allow: ['PoC'], + methods: ['POST', 'PUT'], + routes: [ + { + name: 'test route async', + host: 'localhost', + port: constants.MEDIATOR_PORT, + primary: true + } + ], + isAsynchronousProcess: true, + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + }) + await Promise.all([ channel1.save(), channel2.save(), channel3.save(), channel4.save(), channel5.save(), - channel6.save() + channel6.save(), + asyncChannel.save() ]) const testAppDoc = { @@ -345,6 +366,16 @@ describe('HTTP tests', () => { .expect('content-encoding', 'gzip') .expect(testDoc) }) + + it('should return 202 CREATED on POST Request for Channel Set as Async Process - async', async () => { + await promisify(server.start)({httpPort: SERVER_PORTS.httpPort}) + await request(constants.HTTP_BASE_URL) + .post('/async') + .send(testDoc) + .auth('testApp', 'password') + .expect(202) + }) + }) describe('HTTP body content matching - XML', () => { diff --git a/test/unit/messageStoreTest.js b/test/unit/messageStoreTest.js index ed1990441..a949ee550 100644 --- a/test/unit/messageStoreTest.js +++ b/test/unit/messageStoreTest.js @@ -57,6 +57,27 @@ describe('MessageStore', () => { } } + const asyncChannel = { + name: 'TestAsyncChannel3', + urlPattern: 'test/sample-async', + allow: ['PoC', 'Test1', 'Test2'], + routes: [ + { + name: 'test route', + host: 'localhost', + port: 9876, + primary: true + } + ], + isAsynchronousProcess: true, + txViewAcl: 'group1', + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + } + + const req = {} req.path = '/api/test/request' req.headers = { @@ -154,6 +175,19 @@ describe('MessageStore', () => { }) }) + it('should have a transaction with a status of "Pending Async" if the channel is asynchronous', done => { + ctx.path = '/api/test/sample-async'; + ctx.matchingChannel = { isAsynchronousProcess: true }; + + messageStore.storeTransaction(ctx, (error, result) => { + should.not.exist(error) + TransactionModel.findOne({_id: result._id}, (error, trans) => { + trans.status.should.equal("Pending Async") + return done() + }) + }); + }); + it('should be able to save the transaction if the headers contain Mongo reserved characters ($ or .)', done => { ctx.header['dot.header'] = '123' ctx.header.dollar$header = '124' From 9d08168e3959656fbcece66fe56d52c65c952efa Mon Sep 17 00:00:00 2001 From: brett-onions Date: Fri, 22 Sep 2023 12:08:33 +0200 Subject: [PATCH 05/14] fx with rerunning a transactions --- src/middleware/messageStore.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/middleware/messageStore.js b/src/middleware/messageStore.js index 0ddda1ba7..7f797f29f 100644 --- a/src/middleware/messageStore.js +++ b/src/middleware/messageStore.js @@ -87,8 +87,8 @@ export function storeTransaction(ctx, done) { export function storeResponse(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.response.header) - - const status = ctx.matchingChannel?.isAsynchronousProcess && ctx.response.status < 400 ? 202 : ctx.response.status + const isAsynchronousProcess = Boolean(ctx?.matchingChannel?.isAsynchronousProcess) || Boolean(ctx?.authorisedChannel?.isAsynchronousProcess); + const status = isAsynchronousProcess && ctx.response.status < 400 ? 202 : ctx.response.status // if the channel is asynchronous and the response is successful, change the status to 202 otherwise use the original status ctx.response.status = status @@ -257,7 +257,7 @@ export function setFinalStatus(ctx, callback) { ctx.response.status <= 299 && routeSuccess ) { - if(ctx.matchingChannel?.isAsynchronousProcess){ + if(ctx?.matchingChannel?.isAsynchronousProcess || !!ctx?.authorisedChannel?.isAsynchronousProcess){ tx.status = transactionStatus.PENDING_ASYNC }else{ tx.status = transactionStatus.SUCCESSFUL From ca1a0c307d60c2c2d57a86e93c70937b6c8d6fef Mon Sep 17 00:00:00 2001 From: brett-onions Date: Tue, 26 Sep 2023 16:48:12 +0200 Subject: [PATCH 06/14] ft: query param to allow search by clientID --- src/api/clients.js | 28 ++++++++++++++------ test/integration/clientsAPITests.js | 40 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/src/api/clients.js b/src/api/clients.js index 384118fbf..40ec4c52b 100644 --- a/src/api/clients.js +++ b/src/api/clients.js @@ -102,15 +102,27 @@ export async function getClient(ctx, clientId, property) { return } - clientId = unescape(clientId) - try { - const result = await ClientModelAPI.findById( - clientId, - projectionRestriction - ) - .lean() - .exec() + + let result; + if(ctx?.query?.byNamedClientID === 'true'){ + result = await ClientModelAPI.findOne( + {clientID: clientId}, + projectionRestriction + ) + .lean() + .exec() + } + else{ + clientId = unescape(clientId) + result = await ClientModelAPI.findById( + clientId, + projectionRestriction + ) + .lean() + .exec() + } + if (result === null) { utils.logAndSetResponse( ctx, diff --git a/test/integration/clientsAPITests.js b/test/integration/clientsAPITests.js index 37f9a92f9..e058d1dc6 100644 --- a/test/integration/clientsAPITests.js +++ b/test/integration/clientsAPITests.js @@ -162,6 +162,46 @@ describe('API Integration Tests', () => { }) }) + describe('Getting a client ID by the named "clientID" field on the document in the database', () => { + const clientTest = { + clientID: 'testClient', + clientDomain: 'www.zedmusic-unique.co.zw', + name: 'OpenHIE NodeJs', + roles: ['test_role_PoC', 'monitoring'], + passwordHash: + '$2a$10$w8GyqInkl72LMIQNpMM/fenF6VsVukyya.c6fh/GRtrKq05C2.Zgy' + } + + let clientId; + + beforeEach(async () => { + const client = await new ClientModelAPI(clientTest).save() + clientId = clientTest.clientID; + }) + + it('should return the client ID if it exists', async () => { + const client = await new ClientModelAPI(testAppDoc).save() + const res = await request(BASE_URL) + .get(`/clients/${clientId}?byNamedClientID=true`) + .set('Cookie', rootCookie) + .expect(200) + }) + + it('should return 404 if the client ID does not exist', async () => { + await request(BASE_URL) + .get('/clients/nonExistentClientID?byNamedClientID=true') + .set('Cookie', rootCookie) + .expect(404) + }) + + it('should fail when sending clientID with "byNamedClientID" param set to false', async () => { + await request(BASE_URL) + .get(`/clients/${clientId}?byNamedClientID=false`) + .set('Cookie', rootCookie) + .expect(500) + }) + }); + describe('*getClient(_id)', () => { const clientTest = { clientID: 'testClient', From b2b23435bee7403b60d0735b77e1e428b0bd9368 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Wed, 27 Sep 2023 08:48:27 +0200 Subject: [PATCH 07/14] linting --- src/api/apps.js | 4 +++- src/api/clients.js | 15 ++++-------- src/api/tasks.js | 14 +++++------ src/middleware/messageStore.js | 23 ++++++++++++------ src/middleware/sessionStore.js | 5 +++- src/model/channels.js | 2 +- src/model/transactions.js | 30 ++++++++++++----------- src/server.js | 4 ++-- test/integration/channelsAPITests.js | 3 ++- test/integration/clientsAPITests.js | 6 ++--- test/integration/httpTests.js | 1 - test/unit/contactTest.js | 6 ++--- test/unit/messageStoreTest.js | 11 ++++----- test/unit/tasksTest.js | 6 ++--- test/unit/transactionsTest.js | 36 ++++++++++++++++++---------- 15 files changed, 94 insertions(+), 72 deletions(-) diff --git a/src/api/apps.js b/src/api/apps.js index 1e5b308f2..c2e3b041c 100644 --- a/src/api/apps.js +++ b/src/api/apps.js @@ -45,7 +45,9 @@ const createErrorResponse = (ctx, operation, error) => { const validateId = (ctx, id) => { if (!id.match(/^[0-9a-fA-F]{24}$/)) { ctx.statusCode = 400 - throw Error(`App id "${id}" is invalid. ObjectId should contain 24 characters`) + throw Error( + `App id "${id}" is invalid. ObjectId should contain 24 characters` + ) } } diff --git a/src/api/clients.js b/src/api/clients.js index 40ec4c52b..8a8bc2a16 100644 --- a/src/api/clients.js +++ b/src/api/clients.js @@ -103,26 +103,21 @@ export async function getClient(ctx, clientId, property) { } try { - - let result; - if(ctx?.query?.byNamedClientID === 'true'){ + let result + if (ctx?.query?.byNamedClientID === 'true') { result = await ClientModelAPI.findOne( {clientID: clientId}, projectionRestriction ) .lean() .exec() - } - else{ + } else { clientId = unescape(clientId) - result = await ClientModelAPI.findById( - clientId, - projectionRestriction - ) + result = await ClientModelAPI.findById(clientId, projectionRestriction) .lean() .exec() } - + if (result === null) { utils.logAndSetResponse( ctx, diff --git a/src/api/tasks.js b/src/api/tasks.js index 740b891f0..538d0b6a0 100644 --- a/src/api/tasks.js +++ b/src/api/tasks.js @@ -216,13 +216,13 @@ export async function addTask(ctx) { ) // Clear the transactions out of the auto retry queue, in case they're in there - return AutoRetryModelAPI - .deleteMany({transactionID: {$in: transactions.tids}}) - .exec(err => { - if (err) { - return logger.error(err) - } - }) + return AutoRetryModelAPI.deleteMany({ + transactionID: {$in: transactions.tids} + }).exec(err => { + if (err) { + return logger.error(err) + } + }) } else { // rerun task creation not allowed utils.logAndSetResponse( diff --git a/src/middleware/messageStore.js b/src/middleware/messageStore.js index 7f797f29f..79fbd16b7 100644 --- a/src/middleware/messageStore.js +++ b/src/middleware/messageStore.js @@ -8,7 +8,7 @@ import * as metrics from '../metrics' import * as transactions from '../model/transactions' import * as utils from '../utils' -const { transactionStatus } = transactions +const {transactionStatus} = transactions function copyMapWithEscapedReservedCharacters(map) { const escapedMap = {} @@ -30,7 +30,9 @@ export function storeTransaction(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.header) const tx = new transactions.TransactionModel({ - status: ctx?.matchingChannel?.isAsynchronousProcess ? transactionStatus.PENDING_ASYNC:transactionStatus.PROCESSING , + status: ctx?.matchingChannel?.isAsynchronousProcess + ? transactionStatus.PENDING_ASYNC + : transactionStatus.PROCESSING, clientID: ctx.authenticated != null ? ctx.authenticated._id : undefined, channelID: ctx.authorisedChannel._id, clientIP: ctx.ip, @@ -87,8 +89,13 @@ export function storeTransaction(ctx, done) { export function storeResponse(ctx, done) { const headers = copyMapWithEscapedReservedCharacters(ctx.response.header) - const isAsynchronousProcess = Boolean(ctx?.matchingChannel?.isAsynchronousProcess) || Boolean(ctx?.authorisedChannel?.isAsynchronousProcess); - const status = isAsynchronousProcess && ctx.response.status < 400 ? 202 : ctx.response.status + const isAsynchronousProcess = + Boolean(ctx?.matchingChannel?.isAsynchronousProcess) || + Boolean(ctx?.authorisedChannel?.isAsynchronousProcess) + const status = + isAsynchronousProcess && ctx.response.status < 400 + ? 202 + : ctx.response.status // if the channel is asynchronous and the response is successful, change the status to 202 otherwise use the original status ctx.response.status = status @@ -257,9 +264,12 @@ export function setFinalStatus(ctx, callback) { ctx.response.status <= 299 && routeSuccess ) { - if(ctx?.matchingChannel?.isAsynchronousProcess || !!ctx?.authorisedChannel?.isAsynchronousProcess){ + if ( + ctx?.matchingChannel?.isAsynchronousProcess || + !!ctx?.authorisedChannel?.isAsynchronousProcess + ) { tx.status = transactionStatus.PENDING_ASYNC - }else{ + } else { tx.status = transactionStatus.SUCCESSFUL } } @@ -281,7 +291,6 @@ export function setFinalStatus(ctx, callback) { logger.info(`Final status for transaction ${tx._id} : ${tx.status}`) update.status = tx.status - } if (ctx.autoRetry != null) { diff --git a/src/middleware/sessionStore.js b/src/middleware/sessionStore.js index 886eaab4f..f683facd9 100644 --- a/src/middleware/sessionStore.js +++ b/src/middleware/sessionStore.js @@ -41,7 +41,10 @@ class MongooseStore { async set(id, data) { const {session} = this const record = {_id: id, data, updatedAt: new Date()} - await session.findByIdAndUpdate(id, record, {upsert: true, writeConcern: {w: "majority", wtimeout: 10000}}) + await session.findByIdAndUpdate(id, record, { + upsert: true, + writeConcern: {w: 'majority', wtimeout: 10000} + }) return data } diff --git a/src/model/channels.js b/src/model/channels.js index e36d3ee2d..1b373cb39 100644 --- a/src/model/channels.js +++ b/src/model/channels.js @@ -112,7 +112,7 @@ const ChannelDef = { type: String, required: true }, - isAsynchronousProcess:{ + isAsynchronousProcess: { type: Boolean, default: false }, diff --git a/src/model/transactions.js b/src/model/transactions.js index 382040076..5eee1f192 100644 --- a/src/model/transactions.js +++ b/src/model/transactions.js @@ -126,21 +126,23 @@ export const TransactionModel = connectionDefault.model( * Resolve a transaction stuck in the processing state * * If OpenHIM crashes with an inflight transaction, that transaction's status will stay in processing - * So we run this function at start up and set all those transactions to failed + * So we run this function at start up and set all those transactions to failed * */ export const resolveStuckProcessingState = async () => { - TransactionModelAPI.find({ status: transactionStatus.PROCESSING }) - .cursor() - .on('data', async (transaction) => { - try { - if (transaction.$isEmpty('response') && transaction.$isEmpty('error')) - TransactionModelAPI.findByIdAndUpdate(transaction.id, { - status: transactionStatus.FAILED, - error: { message: 'OpenHIM crashed while still waiting for a response' }, - }).exec() - } catch (err) { - console.error(`Error updating transaction stuck in processing: ${err}`) - } - }) + TransactionModelAPI.find({status: transactionStatus.PROCESSING}) + .cursor() + .on('data', async transaction => { + try { + if (transaction.$isEmpty('response') && transaction.$isEmpty('error')) + TransactionModelAPI.findByIdAndUpdate(transaction.id, { + status: transactionStatus.FAILED, + error: { + message: 'OpenHIM crashed while still waiting for a response' + } + }).exec() + } catch (err) { + console.error(`Error updating transaction stuck in processing: ${err}`) + } + }) } diff --git a/src/server.js b/src/server.js index 334832abf..1f2c92ec8 100644 --- a/src/server.js +++ b/src/server.js @@ -37,7 +37,7 @@ import * as upgradeDB from './upgradeDB' import {KeystoreModel} from './model/keystore' import {UserModel, createUser, updateTokenUser} from './model/users' import {appRoot, config, connectionAgenda} from './config' -import { resolveStuckProcessingState } from './model/transactions' +import {resolveStuckProcessingState} from './model/transactions' mongoose.Promise = Promise @@ -888,7 +888,7 @@ if (cluster.isMaster && !module.parent) { return Promise.all(promises) .then(() => { resolveStuckProcessingState() - + let audit = atna.construct.appActivityAudit( true, himSourceID, diff --git a/test/integration/channelsAPITests.js b/test/integration/channelsAPITests.js index f0b0664fa..fdaedfc10 100644 --- a/test/integration/channelsAPITests.js +++ b/test/integration/channelsAPITests.js @@ -859,7 +859,8 @@ describe('API Integration Tests', () => { convertedPatch._id = convertedPatch._id.toString() convertedPatch.ref = convertedPatch.ref.toString() convertedPatch.date = convertedPatch.date.toISOString() - convertedPatch.updatedBy._id = convertedPatch.updatedBy._id.toString() + convertedPatch.updatedBy._id = + convertedPatch.updatedBy._id.toString() return convertedPatch }) }) diff --git a/test/integration/clientsAPITests.js b/test/integration/clientsAPITests.js index e058d1dc6..58a53b1a7 100644 --- a/test/integration/clientsAPITests.js +++ b/test/integration/clientsAPITests.js @@ -172,11 +172,11 @@ describe('API Integration Tests', () => { '$2a$10$w8GyqInkl72LMIQNpMM/fenF6VsVukyya.c6fh/GRtrKq05C2.Zgy' } - let clientId; + let clientId beforeEach(async () => { const client = await new ClientModelAPI(clientTest).save() - clientId = clientTest.clientID; + clientId = clientTest.clientID }) it('should return the client ID if it exists', async () => { @@ -200,7 +200,7 @@ describe('API Integration Tests', () => { .set('Cookie', rootCookie) .expect(500) }) - }); + }) describe('*getClient(_id)', () => { const clientTest = { diff --git a/test/integration/httpTests.js b/test/integration/httpTests.js index 6203ab76e..a18b23c99 100644 --- a/test/integration/httpTests.js +++ b/test/integration/httpTests.js @@ -375,7 +375,6 @@ describe('HTTP tests', () => { .auth('testApp', 'password') .expect(202) }) - }) describe('HTTP body content matching - XML', () => { diff --git a/test/unit/contactTest.js b/test/unit/contactTest.js index 70f9e475c..b23d0ed78 100644 --- a/test/unit/contactTest.js +++ b/test/unit/contactTest.js @@ -164,7 +164,7 @@ describe('Contact Users', () => { describe('contactUser', () => { it('should throw if passed the incorrect method type', done => { - contact.contactUser('none', '', '', '', '', (err) => { + contact.contactUser('none', '', '', '', '', err => { should.exist(err) should.equal(err.message, "Unknown contact method 'none'") return done() @@ -173,11 +173,11 @@ describe('Contact Users', () => { it('should throw on sms send if sms provider is not listed', done => { config.smsGateway.provider = 'none' - contact.contactUser('sms', 'test', '', 'test message', '', (err) => { + contact.contactUser('sms', 'test', '', 'test message', '', err => { should.exist(err) should.equal(err.message, "Unknown SMS gateway provider 'none'") - config.smsGateway.provider = 'clickatell' + config.smsGateway.provider = 'clickatell' return done() }) }) diff --git a/test/unit/messageStoreTest.js b/test/unit/messageStoreTest.js index a949ee550..4b3091921 100644 --- a/test/unit/messageStoreTest.js +++ b/test/unit/messageStoreTest.js @@ -77,7 +77,6 @@ describe('MessageStore', () => { } } - const req = {} req.path = '/api/test/request' req.headers = { @@ -176,17 +175,17 @@ describe('MessageStore', () => { }) it('should have a transaction with a status of "Pending Async" if the channel is asynchronous', done => { - ctx.path = '/api/test/sample-async'; - ctx.matchingChannel = { isAsynchronousProcess: true }; + ctx.path = '/api/test/sample-async' + ctx.matchingChannel = {isAsynchronousProcess: true} messageStore.storeTransaction(ctx, (error, result) => { should.not.exist(error) TransactionModel.findOne({_id: result._id}, (error, trans) => { - trans.status.should.equal("Pending Async") + trans.status.should.equal('Pending Async') return done() }) - }); - }); + }) + }) it('should be able to save the transaction if the headers contain Mongo reserved characters ($ or .)', done => { ctx.header['dot.header'] = '123' diff --git a/test/unit/tasksTest.js b/test/unit/tasksTest.js index 7e5abfc8a..a13dd5fe8 100644 --- a/test/unit/tasksTest.js +++ b/test/unit/tasksTest.js @@ -377,10 +377,10 @@ describe('Rerun Task Tests', () => { await tasks.findAndProcessAQueuedTask().should.not.rejected() }) - it("should return without processing if active tasks is greater than max active", async () => { - const spy = sinon.spy(TaskModel, "findOneAndUpdate") + it('should return without processing if active tasks is greater than max active', async () => { + const spy = sinon.spy(TaskModel, 'findOneAndUpdate') config.rerun.activeConcurrentTasks = -1 - + await tasks.findAndProcessAQueuedTask() spy.called.should.be.false diff --git a/test/unit/transactionsTest.js b/test/unit/transactionsTest.js index 308e8c93d..677c00d4c 100644 --- a/test/unit/transactionsTest.js +++ b/test/unit/transactionsTest.js @@ -3,7 +3,11 @@ /* eslint-env mocha */ import * as transactions from '../../src/api/transactions' -import {TaskModel, TransactionModel, resolveStuckProcessingState} from '../../src/model' +import { + TaskModel, + TransactionModel, + resolveStuckProcessingState +} from '../../src/model' import {ObjectId} from 'mongodb' describe('calculateTransactionBodiesByteLength()', () => { @@ -102,7 +106,7 @@ describe('TransactionModel tests', () => { name: 'Test' } }) - + const validProcessingTransaction = Object.freeze({ status: 'Processing', request: { @@ -132,11 +136,11 @@ describe('TransactionModel tests', () => { name: 'Test' } }) - + beforeEach(async () => { await TransactionModel.deleteMany({}) }) - + afterEach(async () => { await TransactionModel.deleteMany({}) }) @@ -145,10 +149,14 @@ describe('TransactionModel tests', () => { await TransactionModel(midFlightTransaction).save() resolveStuckProcessingState() - await new Promise((resolve) => setTimeout(() => { resolve() }, 500)) - - const transactions = await TransactionModel.find({ status: 'Processing' }); - transactions.length.should.be.exactly(0); + await new Promise(resolve => + setTimeout(() => { + resolve() + }, 500) + ) + + const transactions = await TransactionModel.find({status: 'Processing'}) + transactions.length.should.be.exactly(0) }) it('should not update a transaction processing state if response or error set', async () => { @@ -156,10 +164,14 @@ describe('TransactionModel tests', () => { await TransactionModel(errorProcessingTransaction).save() resolveStuckProcessingState() - await new Promise((resolve) => setTimeout(() => { resolve() }, 500)) - - const transactions = await TransactionModel.find({ status: 'Processing' }); - transactions.length.should.be.exactly(2); + await new Promise(resolve => + setTimeout(() => { + resolve() + }, 500) + ) + + const transactions = await TransactionModel.find({status: 'Processing'}) + transactions.length.should.be.exactly(2) }) }) }) From 07b84a5e3a8abb5fc38611a4b874013925081177 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Thu, 28 Sep 2023 16:28:00 +0200 Subject: [PATCH 08/14] Clean Up --- src/middleware/messageStore.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/middleware/messageStore.js b/src/middleware/messageStore.js index 79fbd16b7..a67d2d9c3 100644 --- a/src/middleware/messageStore.js +++ b/src/middleware/messageStore.js @@ -213,6 +213,10 @@ export function storeNonPrimaryResponse(ctx, route, done) { * This should only be called once all routes have responded. */ export function setFinalStatus(ctx, callback) { + const isAsynchronousProcess = + Boolean(ctx?.matchingChannel?.isAsynchronousProcess) || + Boolean(ctx?.authorisedChannel?.isAsynchronousProcess) + let transactionId = '' if ( ctx.request != null && @@ -264,10 +268,7 @@ export function setFinalStatus(ctx, callback) { ctx.response.status <= 299 && routeSuccess ) { - if ( - ctx?.matchingChannel?.isAsynchronousProcess || - !!ctx?.authorisedChannel?.isAsynchronousProcess - ) { + if (isAsynchronousProcess) { tx.status = transactionStatus.PENDING_ASYNC } else { tx.status = transactionStatus.SUCCESSFUL From 77c1ac448cce50e7200bde8c4ab7ba4182ca2972 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Thu, 28 Sep 2023 16:30:48 +0200 Subject: [PATCH 09/14] 8.3.0 --- package-lock.json | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index ea9aacc82..c76e84c52 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "openhim-core", - "version": "8.2.0", + "version": "8.3.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 60178dbc7..3ff5d8a1d 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "openhim-core", "description": "The OpenHIM core application that provides logging and routing of http requests", - "version": "8.2.0", + "version": "8.3.0", "main": "./lib/server.js", "bin": { "openhim-core": "./bin/openhim-core.js" From 6a74f7c29bbfbcb41345c2edac8159244924da43 Mon Sep 17 00:00:00 2001 From: Arran Standish Date: Fri, 29 Sep 2023 09:46:34 +0200 Subject: [PATCH 10/14] Add channel's pattern to the kafka bundle to better allow url mangling --- src/middleware/router.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/middleware/router.js b/src/middleware/router.js index 75c431529..8a72c7b79 100644 --- a/src/middleware/router.js +++ b/src/middleware/router.js @@ -656,6 +656,7 @@ function sendKafkaRequest(ctx, route) { const message = { method: ctx.request.method, path: ctx.request.url, + pattern: channel.urlPattern, headers: ctx.request.headers, body: ctx.body && ctx.body.toString() } From fed4c8d1ad79e2a1fd6e72ae5df0b92c4614037d Mon Sep 17 00:00:00 2001 From: brett-onions Date: Wed, 4 Oct 2023 13:18:38 +0200 Subject: [PATCH 11/14] adding empty header obj to pass condition --- src/middleware/router.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/middleware/router.js b/src/middleware/router.js index 8a72c7b79..1633c936d 100644 --- a/src/middleware/router.js +++ b/src/middleware/router.js @@ -670,7 +670,8 @@ function sendKafkaRequest(ctx, route) { resolve({ status: 200, body: JSON.stringify(res), - timestamp: +new Date() + timestamp: +new Date(), + headers: {} }) }) }) From 8992d3a44696de05bf127255028585830fa78070 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Fri, 6 Oct 2023 13:41:42 +0200 Subject: [PATCH 12/14] Pending Async Workflow to Transaction Rerunning --- src/model/tasks.js | 2 +- src/tasks.js | 66 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/src/model/tasks.js b/src/model/tasks.js index 805c447c0..f6adc3df1 100644 --- a/src/model/tasks.js +++ b/src/model/tasks.js @@ -8,7 +8,7 @@ const TaskSchema = new Schema({ status: { type: String, required: true, - enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed'], + enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed', "Pending Async"], default: 'Queued', index: true }, diff --git a/src/tasks.js b/src/tasks.js index 3d188b402..a6673a6a5 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -28,11 +28,20 @@ export async function findAndProcessAQueuedTask() { {status: 'Processing'}, {new: true} ) + if (task != null) { activeTasks++ await processNextTaskRound(task) activeTasks-- } + + const asyncTasks = await TaskModel.find({status: 'Pending Async'}); + + asyncTasks.forEach(async task => { + logger.warn(`Processing async task ${task._id}`); + await checkAsyncTaskStatus(task); + }) + } catch (err) { if (task == null) { logger.error(`An error occurred while looking for rerun tasks: ${err}`) @@ -45,6 +54,51 @@ export async function findAndProcessAQueuedTask() { } } +async function checkAsyncTaskStatus(task) { + const pendingAsyncTransactions = task.transactions.reduce((acc, transaction) => { + if (transaction.rerunStatus === 'Pending Async') { + return [...acc, transaction]; + } + return acc; + },[]); + + let remainingAsyncTransactions = pendingAsyncTransactions.length; + + pendingAsyncTransactions.forEach(async transaction => { + const currentTransactionStatus = await TransactionModel.findById(transaction.rerunID); + + if (currentTransactionStatus.status === 'Successful') { + transaction.tstatus = 'Completed'; + transaction.rerunStatus = 'Successful'; + await task.save(); + remainingAsyncTransactions--; + } + else if (currentTransactionStatus.status === 'Completed with error(s)') { + transaction.tstatus = 'Completed'; + transaction.rerunStatus = 'Completed with error(s)'; + await task.save(); + remainingAsyncTransactions--; + } + else if (currentTransactionStatus.status === 'Failed') { + transaction.tstatus = 'Completed'; + transaction.rerunStatus = 'Failed'; + await task.save(); + remainingAsyncTransactions--; + } + }); + + + if (pendingAsyncTransactions.length === 0){ + task.status = 'Completed'; + task.completedDate = new Date(); + await task.save() + logger.info(`Async task ${task._id} completed`); + } + + return; + +} + function rerunTaskProcessor() { if (live) { findAndProcessAQueuedTask() @@ -139,6 +193,8 @@ async function processNextTaskRound(task) { return } + let taskHasAsyncTransactions = false + const promises = transactions.map(transaction => { task.remainingTransactions-- @@ -158,7 +214,11 @@ async function processNextTaskRound(task) { logger.error( `An error occurred while rerunning transaction ${transaction.tid} for task ${task._id}: ${err}` ) - } else { + }else if(response.status === 202){ + transaction.tstatus = 'Processing' + taskHasAsyncTransactions = true + } + else { transaction.tstatus = 'Completed' } return resolve() @@ -177,8 +237,8 @@ async function processNextTaskRound(task) { if (task.remainingTransactions) { await processNextTaskRound(task) } else { - task.status = 'Completed' - task.completedDate = new Date() + task.status = taskHasAsyncTransactions ? 'Pending Async' : 'Completed' + task.completedDate = taskHasAsyncTransactions ? null : new Date() logger.info(`Round completed for rerun task #${task._id} - Task completed`) await task.save().catch(err => { From 73f029b59fab96e67acb47c8b0000fce8ca68f48 Mon Sep 17 00:00:00 2001 From: brett-onions Date: Mon, 9 Oct 2023 14:17:03 +0200 Subject: [PATCH 13/14] clean up --- src/tasks.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/tasks.js b/src/tasks.js index a6673a6a5..522204971 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -38,7 +38,6 @@ export async function findAndProcessAQueuedTask() { const asyncTasks = await TaskModel.find({status: 'Pending Async'}); asyncTasks.forEach(async task => { - logger.warn(`Processing async task ${task._id}`); await checkAsyncTaskStatus(task); }) From b8870252d27808b705038d0e7bc6be83dc1ca30a Mon Sep 17 00:00:00 2001 From: brett-onions Date: Wed, 11 Oct 2023 11:26:40 +0200 Subject: [PATCH 14/14] changes based on comments --- src/tasks.js | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/src/tasks.js b/src/tasks.js index 522204971..5486bd342 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -54,40 +54,23 @@ export async function findAndProcessAQueuedTask() { } async function checkAsyncTaskStatus(task) { - const pendingAsyncTransactions = task.transactions.reduce((acc, transaction) => { - if (transaction.rerunStatus === 'Pending Async') { - return [...acc, transaction]; - } - return acc; - },[]); + const pendingAsyncTransactions = task.transactions.filter(transaction => transaction.rerunStatus === 'Pending Async'); let remainingAsyncTransactions = pendingAsyncTransactions.length; pendingAsyncTransactions.forEach(async transaction => { const currentTransactionStatus = await TransactionModel.findById(transaction.rerunID); - if (currentTransactionStatus.status === 'Successful') { - transaction.tstatus = 'Completed'; - transaction.rerunStatus = 'Successful'; - await task.save(); - remainingAsyncTransactions--; - } - else if (currentTransactionStatus.status === 'Completed with error(s)') { - transaction.tstatus = 'Completed'; - transaction.rerunStatus = 'Completed with error(s)'; - await task.save(); - remainingAsyncTransactions--; - } - else if (currentTransactionStatus.status === 'Failed') { + if (["Successful", "Completed with error(s)", "Failed"].includes(currentTransactionStatus.status)) { transaction.tstatus = 'Completed'; - transaction.rerunStatus = 'Failed'; + transaction.rerunStatus = currentTransactionStatus.status; await task.save(); remainingAsyncTransactions--; } }); - if (pendingAsyncTransactions.length === 0){ + if (remainingAsyncTransactions === 0){ task.status = 'Completed'; task.completedDate = new Date(); await task.save()