diff --git a/src/middleware/messageStore.js b/src/middleware/messageStore.js index 60c91869..827d3a35 100644 --- a/src/middleware/messageStore.js +++ b/src/middleware/messageStore.js @@ -8,13 +8,7 @@ import * as metrics from '../metrics' import * as transactions from '../model/transactions' import * as utils from '../utils' -export const transactionStatus = { - PROCESSING: 'Processing', - SUCCESSFUL: 'Successful', - COMPLETED: 'Completed', - COMPLETED_W_ERR: 'Completed with error(s)', - FAILED: 'Failed' -} +const { transactionStatus } = transactions function copyMapWithEscapedReservedCharacters(map) { const escapedMap = {} diff --git a/src/model/transactions.js b/src/model/transactions.js index f1e2760b..9b21fc58 100644 --- a/src/model/transactions.js +++ b/src/model/transactions.js @@ -4,6 +4,14 @@ import {Schema} from 'mongoose' import {connectionAPI, connectionDefault} from '../config' +export const transactionStatus = { + PROCESSING: 'Processing', + SUCCESSFUL: 'Successful', + COMPLETED: 'Completed', + COMPLETED_W_ERR: 'Completed with error(s)', + FAILED: 'Failed' +} + // Request Schema definition const RequestDef = { host: String, @@ -94,13 +102,7 @@ const TransactionSchema = new Schema({ status: { type: String, required: true, - enum: [ - 'Processing', - 'Failed', - 'Completed', - 'Successful', - 'Completed with error(s)' - ] + enum: Object.values(transactionStatus) } }) @@ -118,3 +120,26 @@ export const TransactionModel = connectionDefault.model( 'Transaction', TransactionSchema ) + +/** + * Resolve a transaction stuck in the processing state + * + * If OpenHIM crashes with an inflight transaction, that transaction's status will stay in processing + * So we run this function at start up and set all those transactions to failed + * + */ +export const resolveStuckProcessingState = async () => { + TransactionModelAPI.find({ status: transactionStatus.PROCESSING }) + .cursor() + .on('data', async (transaction) => { + try { + if (transaction.$isEmpty('response') && transaction.$isEmpty('error')) + TransactionModelAPI.findByIdAndUpdate(transaction.id, { + status: transactionStatus.FAILED, + error: { message: 'OpenHIM crashed while still waiting for a response' }, + }).exec() + } catch (err) { + console.error(`Error updating transaction stuck in processing: ${err}`) + } + }) +} diff --git a/src/server.js b/src/server.js index 11267547..334832ab 100644 --- a/src/server.js +++ b/src/server.js @@ -37,6 +37,7 @@ import * as upgradeDB from './upgradeDB' import {KeystoreModel} from './model/keystore' import {UserModel, createUser, updateTokenUser} from './model/users' import {appRoot, config, connectionAgenda} from './config' +import { resolveStuckProcessingState } from './model/transactions' mongoose.Promise = Promise @@ -886,6 +887,8 @@ if (cluster.isMaster && !module.parent) { return Promise.all(promises) .then(() => { + resolveStuckProcessingState() + let audit = atna.construct.appActivityAudit( true, himSourceID, diff --git a/test/unit/transactionsTest.js b/test/unit/transactionsTest.js index da332299..308e8c93 100644 --- a/test/unit/transactionsTest.js +++ b/test/unit/transactionsTest.js @@ -3,7 +3,7 @@ /* eslint-env mocha */ import * as transactions from '../../src/api/transactions' -import {TaskModel, TransactionModel} from '../../src/model' +import {TaskModel, TransactionModel, resolveStuckProcessingState} from '../../src/model' import {ObjectId} from 'mongodb' describe('calculateTransactionBodiesByteLength()', () => { @@ -89,3 +89,77 @@ describe('*createRerunTasks', () => { tasks.length.should.be.exactly(2) }) }) + +describe('TransactionModel tests', () => { + describe('.resolveStuckProcessingState()', () => { + const midFlightTransaction = Object.freeze({ + status: 'Processing', + request: { + timestamp: new Date().toISOString() + }, + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + }) + + const validProcessingTransaction = Object.freeze({ + status: 'Processing', + request: { + timestamp: new Date().toISOString() + }, + response: { + status: 200, + timestamp: new Date().toISOString() + }, + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + }) + + const errorProcessingTransaction = Object.freeze({ + status: 'Processing', + request: { + timestamp: new Date().toISOString() + }, + error: { + message: 'something bad happened', + stack: 'stack trace' + }, + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + }) + + beforeEach(async () => { + await TransactionModel.deleteMany({}) + }) + + afterEach(async () => { + await TransactionModel.deleteMany({}) + }) + + it('should update a processing transaction to failed if no response or error set', async () => { + await TransactionModel(midFlightTransaction).save() + + resolveStuckProcessingState() + await new Promise((resolve) => setTimeout(() => { resolve() }, 500)) + + const transactions = await TransactionModel.find({ status: 'Processing' }); + transactions.length.should.be.exactly(0); + }) + + it('should not update a transaction processing state if response or error set', async () => { + await TransactionModel(validProcessingTransaction).save() + await TransactionModel(errorProcessingTransaction).save() + + resolveStuckProcessingState() + await new Promise((resolve) => setTimeout(() => { resolve() }, 500)) + + const transactions = await TransactionModel.find({ status: 'Processing' }); + transactions.length.should.be.exactly(2); + }) + }) +})