Skip to content

Commit

Permalink
Merge pull request #1102 from jembi/OHM-699-retrieve-bodies-via-strea…
Browse files Browse the repository at this point in the history
…ming

Ohm 699 retrieve bodies via streaming
  • Loading branch information
Martin Brocker authored Sep 16, 2020
2 parents ba6aefe + eb1d432 commit 1101837
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 154 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ This project uses [mocha](https://mochajs.org/) as a unit testing framework with

- `npm run lint` - ensure the code is lint free, this is also run after an `npm test`
- `npm link` - will symlink you local working directory to the globally installed openhim-core module. Use this so you can use the global openhim-core binary to run your current work in progress. Also, if you build any local changes the server will automatically restart.
- `npm test -- --grep <regex>` - will only run tests with names matching the regex.
- `npm test -- --inspect` - enabled the node debugger while running unit tests. Add `debugger` statements and use `node debug localhost:5858` to connect to the debugger instance.
- `npm test -- --bail` - exit on first test failure.
- `npm test:<int|unit> -- --grep <regex>` - will only run tests with names matching the regex.
- `npm test:<int|unit> -- --inspect` - enabled the node debugger while running unit tests. Add `debugger` statements and use `node debug localhost:5858` to connect to the debugger instance.
- `npm test:<int|unit> -- --bail` - exit on first test failure.

---

Expand Down
6 changes: 0 additions & 6 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ The following config option are provided by the OpenHIM. All of these options ha
// the size of that window in seconds
"authWindowSeconds": 10,
// Max size of a request payload to the API
// Due to the maximum size of a mongo document, the bodies in the request will be truncated if the request is larger than 16MB
"maxPayloadSizeMB": 50,
// Certain API endpoints allow for details to be truncated, e.g. transactions with very large bodies
// This setting sets the size to truncate to (number of characters)
"truncateSize": 15000,
// A message to append to detail strings that have been truncated
"truncateAppend": "\n[truncated ...]",
// The types of authentication to use for the API
// Supported types are "token" and "basic"
"authenicationTypes": ["token"]
Expand Down
2 changes: 0 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
"port": 8080,
"authWindowSeconds": 10,
"maxPayloadSizeMB": 50,
"truncateSize": 15000,
"truncateAppend": "\n[truncated ...]",
"authenticationTypes": ["token"]
},
"rerun": {
Expand Down
2 changes: 0 additions & 2 deletions config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
"port": 8080,
"authWindowSeconds": 50,
"maxPayloadSizeMB": 50,
"truncateSize": 10,
"truncateAppend": "\n[truncated ...]",
"authenticationTypes": ["token", "basic"]
},
"caching": {
Expand Down
139 changes: 85 additions & 54 deletions src/api/transactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@

import logger from 'winston'
import { promisify } from 'util'
import { Types } from 'mongoose'

import * as authorisation from './authorisation'
import * as autoRetryUtils from '../autoRetry'
import * as events from '../middleware/events'
import * as utils from '../utils'
import { ChannelModelAPI } from '../model/channels'
import { TransactionModelAPI } from '../model/transactions'
import { config } from '../config'
import { addBodiesToTransactions, extractTransactionPayloadIntoChunks, promisesToRemoveAllTransactionBodies } from '../contentChunk'

const apiConf = config.get('api')
import { addBodiesToTransactions, extractTransactionPayloadIntoChunks, promisesToRemoveAllTransactionBodies, retrieveBody } from '../contentChunk'

function hasError (updates) {
if (updates.error != null) { return true }
Expand Down Expand Up @@ -43,17 +41,14 @@ function getProjectionObject (filterRepresentation) {
return {
'request.bodyId': 0,
'response.bodyId': 0,
'routes.request.body': 0,
'routes.response.body': 0,
'orchestrations.request.body': 0,
'orchestrations.response.body': 0
'routes.request.bodyId': 0,
'routes.response.bodyId': 0,
'orchestrations.request.bodyId': 0,
'orchestrations.response.bodyId': 0
}
case 'full':
// view all transaction data
return {}
case 'fulltruncate':
// same as full
return {}
case 'bulkrerun':
// view only 'bulkrerun' properties
return { _id: 1, childIDs: 1, canRerun: 1, channelID: 1 }
Expand All @@ -71,30 +66,6 @@ function getProjectionObject (filterRepresentation) {
}
}

function truncateTransactionDetails (trx) {
const truncateSize = apiConf.truncateSize != null ? apiConf.truncateSize : 15000
const truncateAppend = apiConf.truncateAppend != null ? apiConf.truncateAppend : '\n[truncated ...]'

function trunc (t) {
if (((t.request != null ? t.request.body : undefined) != null) && (t.request.body.length > truncateSize)) {
t.request.body = t.request.body.slice(0, truncateSize) + truncateAppend
}
if (((t.response != null ? t.response.body : undefined) != null) && (t.response.body.length > truncateSize)) {
t.response.body = t.response.body.slice(0, truncateSize) + truncateAppend
}
}

trunc(trx)

if (trx.routes != null) {
for (const r of Array.from(trx.routes)) { trunc(r) }
}

if (trx.orchestrations != null) {
return Array.from(trx.orchestrations).map((o) => trunc(o))
}
}

/*
* Returns intersection of user and channel roles/permission groups
*/
Expand Down Expand Up @@ -250,10 +221,6 @@ export async function getTransactions (ctx) {
const transformedTransactions = await addBodiesToTransactions(transactions)

ctx.body = transformedTransactions

if (filterRepresentation === 'fulltruncate') {
transformedTransactions.map((trx) => truncateTransactionDetails(trx))
}
} catch (e) {
utils.logAndSetResponse(ctx, 500, `Could not retrieve transactions via the API: ${e}`, 'error')
}
Expand Down Expand Up @@ -336,31 +303,20 @@ export async function getTransactionById (ctx, transactionId) {
// --------------Check if user has permission to view full content----------------- #
// get projection object
const projectionFiltersObject = getProjectionObject(filterRepresentation)

const transaction = await TransactionModelAPI.findById(transactionId, projectionFiltersObject).exec()

// retrieve transaction request and response bodies
const resultArray = await addBodiesToTransactions([transaction])
const result = resultArray[0]

if (result && (filterRepresentation === 'fulltruncate')) {
truncateTransactionDetails(result)
}

// Test if the result if valid
if (!result) {
if (!transaction) {
ctx.body = `Could not find transaction with ID: ${transactionId}`
ctx.status = 404
// Test if the user is authorised
} else if (!authorisation.inGroup('admin', ctx.authenticated)) {
const channels = await authorisation.getUserViewableChannels(ctx.authenticated)
if (getChannelIDsArray(channels).indexOf(result.channelID.toString()) >= 0) {
ctx.body = result
if (getChannelIDsArray(channels).indexOf(transaction.channelID.toString()) >= 0) {
ctx.body = transaction
} else {
return utils.logAndSetResponse(ctx, 403, `User ${ctx.authenticated.email} is not authenticated to retrieve transaction ${transactionId}`, 'info')
}
} else {
ctx.body = result
ctx.body = transaction
}
} catch (e) {
utils.logAndSetResponse(ctx, 500, `Could not get transaction by ID via the API: ${e}`, 'error')
Expand Down Expand Up @@ -501,3 +457,78 @@ export async function removeTransaction (ctx, transactionId) {
utils.logAndSetResponse(ctx, 500, `Could not remove transaction via the API: ${e}`, 'error')
}
}

/*
* Streams a transaction body
*/
export async function getTransactionBodyById (ctx, transactionId, bodyId) {
transactionId = unescape(transactionId)

// Test if the user is authorised
if (!authorisation.inGroup('admin', ctx.authenticated)) {
const transaction = await TransactionModelAPI.findById(transactionId).exec()
const channels = await authorisation.getUserViewableChannels(ctx.authenticated, 'txViewFullAcl')
if (!getChannelIDsArray(channels).includes(transaction.channelID.toString())) {
return utils.logAndSetResponse(ctx, 403, `User ${ctx.authenticated.email} is not authenticated to retrieve transaction ${transactionId}`, 'info')
}
}

// parse range header
const rangeHeader = ctx.request.header.range || ''
const match = rangeHeader.match(/bytes=(?<start>\d+)-(?<end>\d*)/)
const range = match ? match.groups : {}

let gridFsRange
if (rangeHeader) {
if (!range.start) {
return utils.logAndSetResponse(ctx, 416, 'Only accepts single ranges with at least start value', 'info')
}

range.start = Number(range.start)
if (range.end) {
range.end = Number(range.end)
} else {
delete range.end
}

if (range.end !== undefined && range.start > range.end) {
return utils.logAndSetResponse(ctx, 416, `Start range [${range.start}] cannot be greater than end [${range.end}]`, 'info')
}

// gridfs uses an exclusive end value
gridFsRange = Object.assign({}, range)
if (gridFsRange.end !== undefined) {
gridFsRange.end += 1
}
}

let body
try {
body = await retrieveBody(new Types.ObjectId(bodyId), gridFsRange || {})
} catch (err) {
const status = err.status || 400
return utils.logAndSetResponse(ctx, status, err.message, 'info')
}

if (range.start && !range.end) {
range.end = body.fileDetails.length
}

if (range.end && range.end >= body.fileDetails.length) {
range.end = body.fileDetails.length - 1
}

// set response
ctx.status = rangeHeader ? 206 : 200
ctx.set('accept-ranges', 'bytes')
ctx.set('content-type', 'application/text')
if (rangeHeader) {
ctx.set('content-range', `bytes ${range.start}-${range.end}/${body.fileDetails.length}`)
ctx.set('content-length', Math.min((range.end - range.start) + 1, body.fileDetails.length))
} else {
ctx.set('content-length', body.fileDetails.length)
}

// assign body to a stream
ctx.body = body.stream
}
53 changes: 38 additions & 15 deletions src/contentChunk.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import mongodb from 'mongodb'
import zlib from 'zlib'
import { PassThrough } from 'stream'
import { config, connectionDefault } from './config'
import { obtainCharset } from './utils'
import logger from 'winston'

const apiConf = config.get('api')
import { connectionDefault } from './config'
import { obtainCharset } from './utils'

let bucket
export const getGridFSBucket = () => {
Expand Down Expand Up @@ -133,10 +133,6 @@ export const retrievePayload = async fileId => {
throw new Error('Payload id not supplied')
}

let payloadSize = 0
// Perhaps the truncateSize should be represented in actual size, and not string length
const truncateSize = apiConf.truncateSize != null ? apiConf.truncateSize : 15000

const fileDetails = await getFileDetails(fileId)

const contentEncoding = fileDetails ? (fileDetails.metadata ? fileDetails.metadata['content-encoding'] : null) : null
Expand All @@ -151,14 +147,7 @@ export const retrievePayload = async fileId => {

// apply the decompression transformation and start listening for the output chunks
downloadStream.pipe(decompressionStream)
decompressionStream.on('data', (chunk) => {
payloadSize += chunk.length
if (payloadSize >= truncateSize) {
decompressionStream.destroy()
downloadStream.destroy()
}
uncompressedBodyBufs.push(chunk)
})
decompressionStream.on('data', (chunk) => uncompressedBodyBufs.push(chunk))

return new Promise((resolve) => {
decompressionStream.on('end', () => { resolveDecompressionBuffer(uncompressedBodyBufs) })
Expand All @@ -181,6 +170,40 @@ export const retrievePayload = async fileId => {
})
}

export const retrieveBody = async (bodyId, range) => {
if (!bodyId) {
throw new Error('bodyID not supplied')
}

const fileDetails = await getFileDetails(bodyId)

if (!fileDetails) {
const err = new Error('Could not find specified file')
err.status = 404
throw err
}
if (range.start && range.start >= fileDetails.length) {
const err = new Error('Start range cannot be greater than file length')
err.status = 416
throw err
}
if (range.end && range.end > fileDetails.length) {
range.end = fileDetails.length
}

const contentEncoding = fileDetails ? (fileDetails.metadata ? fileDetails.metadata['content-encoding'] : null) : null
const decompressionStream = getDecompressionStreamByContentEncoding(contentEncoding)

const bucket = getGridFSBucket()
const downloadStream = bucket.openDownloadStream(bodyId, range)
downloadStream.on('error', err => {
logger.error(err)
})

// apply the decompression transformation
return { stream: downloadStream.pipe(decompressionStream), fileDetails }
}

export const addBodiesToTransactions = async (transactions) => {
if (!transactions || !Array.isArray(transactions) || transactions.length < 1) {
return []
Expand Down
1 change: 1 addition & 0 deletions src/koaApi.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export function setupApp (done) {
app.use(route.get('/transactions/clients/:clientId', transactions.findTransactionByClientId))
app.use(route.put('/transactions/:transactionId', transactions.updateTransaction))
app.use(route.delete('/transactions/:transactionId', transactions.removeTransaction))
app.use(route.get('/transactions/:transactionId/bodies/:bodyId', transactions.getTransactionBodyById))

app.use(route.get('/groups', contactGroups.getContactGroups))
app.use(route.get('/groups/:contactGroupId', contactGroups.getContactGroup))
Expand Down
2 changes: 1 addition & 1 deletion src/model/transactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const TransactionSchema = new Schema({
})

export const compactTransactionCollection = async () => {
return (await connectionAPI).db.command({compact: 'transactions', force: true})
return (await connectionAPI).db.command({ compact: 'transactions', force: true })
}

TransactionSchema.index('request.timestamp')
Expand Down
2 changes: 1 addition & 1 deletion src/upgradeDB.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ upgradeFuncs.push({
async func (batchSize = 100, concurrency = 5) {
const totalTransactions = await TransactionModel.countDocuments().exec()
let batchNum = 0
let currentlyExecuting = []
const currentlyExecuting = []
const totalBatches = Math.ceil(totalTransactions / batchSize)
const startTime = new Date()

Expand Down
2 changes: 1 addition & 1 deletion src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export function makeQuerablePromise (promise) {
let isRejected = false

// Observe the promise, saving the fulfillment in a closure scope.
let result = promise.then(
const result = promise.then(
val => {
isResolved = true
return val
Expand Down
Loading

0 comments on commit 1101837

Please sign in to comment.