Skip to content

Commit

Permalink
Merge pull request #1207 from jembi/ft-pending-async
Browse files Browse the repository at this point in the history
added Pending Async as status for transaction
  • Loading branch information
MatthewErispe authored Oct 13, 2023
2 parents 6f53407 + d13259b commit 69e71eb
Show file tree
Hide file tree
Showing 20 changed files with 262 additions and 64 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "openhim-core",
"description": "The OpenHIM core application that provides logging and routing of http requests",
"version": "8.2.0",
"version": "8.3.0",
"main": "./lib/server.js",
"bin": {
"openhim-core": "./bin/openhim-core.js"
Expand Down
4 changes: 3 additions & 1 deletion src/api/apps.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ const createErrorResponse = (ctx, operation, error) => {
const validateId = (ctx, id) => {
if (!id.match(/^[0-9a-fA-F]{24}$/)) {
ctx.statusCode = 400
throw Error(`App id "${id}" is invalid. ObjectId should contain 24 characters`)
throw Error(
`App id "${id}" is invalid. ObjectId should contain 24 characters`
)
}
}

Expand Down
23 changes: 15 additions & 8 deletions src/api/clients.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,22 @@ export async function getClient(ctx, clientId, property) {
return
}

clientId = unescape(clientId)

try {
const result = await ClientModelAPI.findById(
clientId,
projectionRestriction
)
.lean()
.exec()
let result
if (ctx?.query?.byNamedClientID === 'true') {
result = await ClientModelAPI.findOne(
{clientID: clientId},
projectionRestriction
)
.lean()
.exec()
} else {
clientId = unescape(clientId)
result = await ClientModelAPI.findById(clientId, projectionRestriction)
.lean()
.exec()
}

if (result === null) {
utils.logAndSetResponse(
ctx,
Expand Down
14 changes: 7 additions & 7 deletions src/api/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,13 @@ export async function addTask(ctx) {
)

// Clear the transactions out of the auto retry queue, in case they're in there
return AutoRetryModelAPI
.deleteMany({transactionID: {$in: transactions.tids}})
.exec(err => {
if (err) {
return logger.error(err)
}
})
return AutoRetryModelAPI.deleteMany({
transactionID: {$in: transactions.tids}
}).exec(err => {
if (err) {
return logger.error(err)
}
})
} else {
// rerun task creation not allowed
utils.logAndSetResponse(
Expand Down
27 changes: 23 additions & 4 deletions src/middleware/messageStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import * as metrics from '../metrics'
import * as transactions from '../model/transactions'
import * as utils from '../utils'

const { transactionStatus } = transactions
const {transactionStatus} = transactions

function copyMapWithEscapedReservedCharacters(map) {
const escapedMap = {}
Expand All @@ -30,7 +30,9 @@ export function storeTransaction(ctx, done) {
const headers = copyMapWithEscapedReservedCharacters(ctx.header)

const tx = new transactions.TransactionModel({
status: transactionStatus.PROCESSING,
status: ctx?.matchingChannel?.isAsynchronousProcess
? transactionStatus.PENDING_ASYNC
: transactionStatus.PROCESSING,
clientID: ctx.authenticated != null ? ctx.authenticated._id : undefined,
channelID: ctx.authorisedChannel._id,
clientIP: ctx.ip,
Expand Down Expand Up @@ -87,9 +89,18 @@ export function storeTransaction(ctx, done) {

export function storeResponse(ctx, done) {
const headers = copyMapWithEscapedReservedCharacters(ctx.response.header)
const isAsynchronousProcess =
Boolean(ctx?.matchingChannel?.isAsynchronousProcess) ||
Boolean(ctx?.authorisedChannel?.isAsynchronousProcess)
const status =
isAsynchronousProcess && ctx.response.status < 400
? 202
: ctx.response.status
// if the channel is asynchronous and the response is successful, change the status to 202 otherwise use the original status
ctx.response.status = status

const res = {
status: ctx.response.status,
status,
headers,
body: !ctx.response.body ? '' : ctx.response.body.toString(),
timestamp: ctx.response.timestamp
Expand Down Expand Up @@ -202,6 +213,10 @@ export function storeNonPrimaryResponse(ctx, route, done) {
* This should only be called once all routes have responded.
*/
export function setFinalStatus(ctx, callback) {
const isAsynchronousProcess =
Boolean(ctx?.matchingChannel?.isAsynchronousProcess) ||
Boolean(ctx?.authorisedChannel?.isAsynchronousProcess)

let transactionId = ''
if (
ctx.request != null &&
Expand Down Expand Up @@ -253,7 +268,11 @@ export function setFinalStatus(ctx, callback) {
ctx.response.status <= 299 &&
routeSuccess
) {
tx.status = transactionStatus.SUCCESSFUL
if (isAsynchronousProcess) {
tx.status = transactionStatus.PENDING_ASYNC
} else {
tx.status = transactionStatus.SUCCESSFUL
}
}
if (
ctx.response.status >= 400 &&
Expand Down
4 changes: 3 additions & 1 deletion src/middleware/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ function sendKafkaRequest(ctx, route) {
const message = {
method: ctx.request.method,
path: ctx.request.url,
pattern: channel.urlPattern,
headers: ctx.request.headers,
body: ctx.body && ctx.body.toString()
}
Expand All @@ -669,7 +670,8 @@ function sendKafkaRequest(ctx, route) {
resolve({
status: 200,
body: JSON.stringify(res),
timestamp: +new Date()
timestamp: +new Date(),
headers: {}
})
})
})
Expand Down
5 changes: 4 additions & 1 deletion src/middleware/sessionStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class MongooseStore {
async set(id, data) {
const {session} = this
const record = {_id: id, data, updatedAt: new Date()}
await session.findByIdAndUpdate(id, record, {upsert: true, writeConcern: {w: "majority", wtimeout: 10000}})
await session.findByIdAndUpdate(id, record, {
upsert: true,
writeConcern: {w: 'majority', wtimeout: 10000}
})
return data
}

Expand Down
4 changes: 4 additions & 0 deletions src/model/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ const ChannelDef = {
type: String,
required: true
},
isAsynchronousProcess: {
type: Boolean,
default: false
},
maxBodyAgeDays: {
type: Number,
min: 1,
Expand Down
2 changes: 1 addition & 1 deletion src/model/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const TaskSchema = new Schema({
status: {
type: String,
required: true,
enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed'],
enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed', "Pending Async"],
default: 'Queued',
index: true
},
Expand Down
31 changes: 17 additions & 14 deletions src/model/transactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {connectionAPI, connectionDefault} from '../config'

export const transactionStatus = {
PROCESSING: 'Processing',
PENDING_ASYNC: 'Pending Async',
SUCCESSFUL: 'Successful',
COMPLETED: 'Completed',
COMPLETED_W_ERR: 'Completed with error(s)',
Expand Down Expand Up @@ -125,21 +126,23 @@ export const TransactionModel = connectionDefault.model(
* Resolve a transaction stuck in the processing state
*
* If OpenHIM crashes with an inflight transaction, that transaction's status will stay in processing
* So we run this function at start up and set all those transactions to failed
* So we run this function at start up and set all those transactions to failed
*
*/
export const resolveStuckProcessingState = async () => {
TransactionModelAPI.find({ status: transactionStatus.PROCESSING })
.cursor()
.on('data', async (transaction) => {
try {
if (transaction.$isEmpty('response') && transaction.$isEmpty('error'))
TransactionModelAPI.findByIdAndUpdate(transaction.id, {
status: transactionStatus.FAILED,
error: { message: 'OpenHIM crashed while still waiting for a response' },
}).exec()
} catch (err) {
console.error(`Error updating transaction stuck in processing: ${err}`)
}
})
TransactionModelAPI.find({status: transactionStatus.PROCESSING})
.cursor()
.on('data', async transaction => {
try {
if (transaction.$isEmpty('response') && transaction.$isEmpty('error'))
TransactionModelAPI.findByIdAndUpdate(transaction.id, {
status: transactionStatus.FAILED,
error: {
message: 'OpenHIM crashed while still waiting for a response'
}
}).exec()
} catch (err) {
console.error(`Error updating transaction stuck in processing: ${err}`)
}
})
}
4 changes: 2 additions & 2 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import * as upgradeDB from './upgradeDB'
import {KeystoreModel} from './model/keystore'
import {UserModel, createUser, updateTokenUser} from './model/users'
import {appRoot, config, connectionAgenda} from './config'
import { resolveStuckProcessingState } from './model/transactions'
import {resolveStuckProcessingState} from './model/transactions'

mongoose.Promise = Promise

Expand Down Expand Up @@ -888,7 +888,7 @@ if (cluster.isMaster && !module.parent) {
return Promise.all(promises)
.then(() => {
resolveStuckProcessingState()

let audit = atna.construct.appActivityAudit(
true,
himSourceID,
Expand Down
48 changes: 45 additions & 3 deletions src/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,19 @@ export async function findAndProcessAQueuedTask() {
{status: 'Processing'},
{new: true}
)

if (task != null) {
activeTasks++
await processNextTaskRound(task)
activeTasks--
}

const asyncTasks = await TaskModel.find({status: 'Pending Async'});

asyncTasks.forEach(async task => {
await checkAsyncTaskStatus(task);
})

} catch (err) {
if (task == null) {
logger.error(`An error occurred while looking for rerun tasks: ${err}`)
Expand All @@ -45,6 +53,34 @@ export async function findAndProcessAQueuedTask() {
}
}

async function checkAsyncTaskStatus(task) {
const pendingAsyncTransactions = task.transactions.filter(transaction => transaction.rerunStatus === 'Pending Async');

let remainingAsyncTransactions = pendingAsyncTransactions.length;

pendingAsyncTransactions.forEach(async transaction => {
const currentTransactionStatus = await TransactionModel.findById(transaction.rerunID);

if (["Successful", "Completed with error(s)", "Failed"].includes(currentTransactionStatus.status)) {
transaction.tstatus = 'Completed';
transaction.rerunStatus = currentTransactionStatus.status;
await task.save();
remainingAsyncTransactions--;
}
});


if (remainingAsyncTransactions === 0){
task.status = 'Completed';
task.completedDate = new Date();
await task.save()
logger.info(`Async task ${task._id} completed`);
}

return;

}

function rerunTaskProcessor() {
if (live) {
findAndProcessAQueuedTask()
Expand Down Expand Up @@ -139,6 +175,8 @@ async function processNextTaskRound(task) {
return
}

let taskHasAsyncTransactions = false

const promises = transactions.map(transaction => {
task.remainingTransactions--

Expand All @@ -158,7 +196,11 @@ async function processNextTaskRound(task) {
logger.error(
`An error occurred while rerunning transaction ${transaction.tid} for task ${task._id}: ${err}`
)
} else {
}else if(response.status === 202){
transaction.tstatus = 'Processing'
taskHasAsyncTransactions = true
}
else {
transaction.tstatus = 'Completed'
}
return resolve()
Expand All @@ -177,8 +219,8 @@ async function processNextTaskRound(task) {
if (task.remainingTransactions) {
await processNextTaskRound(task)
} else {
task.status = 'Completed'
task.completedDate = new Date()
task.status = taskHasAsyncTransactions ? 'Pending Async' : 'Completed'
task.completedDate = taskHasAsyncTransactions ? null : new Date()
logger.info(`Round completed for rerun task #${task._id} - Task completed`)

await task.save().catch(err => {
Expand Down
3 changes: 2 additions & 1 deletion test/integration/channelsAPITests.js
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,8 @@ describe('API Integration Tests', () => {
convertedPatch._id = convertedPatch._id.toString()
convertedPatch.ref = convertedPatch.ref.toString()
convertedPatch.date = convertedPatch.date.toISOString()
convertedPatch.updatedBy._id = convertedPatch.updatedBy._id.toString()
convertedPatch.updatedBy._id =
convertedPatch.updatedBy._id.toString()
return convertedPatch
})
})
Expand Down
Loading

0 comments on commit 69e71eb

Please sign in to comment.