-
Notifications
You must be signed in to change notification settings - Fork 309
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This commit performs three operations: 1. Move `integration-tests/helpers.js` to `integration-tests/helpers/index.js`. 2. Move `FakeAgent` from inside `integration-tests/helpers/index.js` to its own file called `integration-tests/helpers/fake_agent.js`. 3. Move the setting up of the Express app inside the `FakeAgent.start` method to a private helper function.
- Loading branch information
Showing
2 changed files
with
245 additions
and
232 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
'use strict' | ||
|
||
const { createHash } = require('crypto') | ||
const EventEmitter = require('events') | ||
const http = require('http') | ||
const uuid = require('crypto-randomuuid') | ||
const express = require('express') | ||
const bodyParser = require('body-parser') | ||
const msgpack = require('msgpack-lite') | ||
const codec = msgpack.createCodec({ int64: true }) | ||
const upload = require('multer')() | ||
|
||
module.exports = class FakeAgent extends EventEmitter { | ||
constructor (port = 0) { | ||
super() | ||
this.port = port | ||
this._rc_files = [] | ||
} | ||
|
||
async start () { | ||
return new Promise((resolve, reject) => { | ||
const timeoutObj = setTimeout(() => { | ||
reject(new Error('agent timed out starting up')) | ||
}, 10000) | ||
this.server = http.createServer(buildExpressServer(this)) | ||
this.server.on('error', reject) | ||
this.server.listen(this.port, () => { | ||
this.port = this.server.address().port | ||
clearTimeout(timeoutObj) | ||
resolve(this) | ||
}) | ||
}) | ||
} | ||
|
||
stop () { | ||
return new Promise((resolve) => { | ||
this.server.on('close', resolve) | ||
this.server.close() | ||
}) | ||
} | ||
|
||
resetRemoteConfig () { | ||
this._rc_files = [] | ||
} | ||
|
||
addRemoteConfig ({ orgId = 2, product, id, name = 'file1', config }) { | ||
this._rc_files.push({ orgId, product, id, name, config: JSON.stringify(config) }) | ||
} | ||
|
||
// **resolveAtFirstSuccess** - specific use case for Next.js (or any other future libraries) | ||
// where multiple payloads are generated, and only one is expected to have the proper span (ie next.request), | ||
// but it't not guaranteed to be the last one (so, expectedMessageCount would not be helpful). | ||
// It can still fail if it takes longer than `timeout` duration or if none pass the assertions (timeout still called) | ||
assertMessageReceived (fn, timeout, expectedMessageCount = 1, resolveAtFirstSuccess) { | ||
timeout = timeout || 30000 | ||
let resultResolve | ||
let resultReject | ||
let msgCount = 0 | ||
const errors = [] | ||
|
||
const timeoutObj = setTimeout(() => { | ||
const errorsMsg = errors.length === 0 ? '' : `, additionally:\n${errors.map(e => e.stack).join('\n')}\n===\n` | ||
resultReject(new Error(`timeout${errorsMsg}`, { cause: { errors } })) | ||
}, timeout) | ||
|
||
const resultPromise = new Promise((resolve, reject) => { | ||
resultResolve = () => { | ||
clearTimeout(timeoutObj) | ||
resolve() | ||
} | ||
resultReject = (e) => { | ||
clearTimeout(timeoutObj) | ||
reject(e) | ||
} | ||
}) | ||
|
||
const messageHandler = msg => { | ||
try { | ||
msgCount += 1 | ||
fn(msg) | ||
if (resolveAtFirstSuccess || msgCount === expectedMessageCount) { | ||
resultResolve() | ||
this.removeListener('message', messageHandler) | ||
} | ||
} catch (e) { | ||
errors.push(e) | ||
} | ||
} | ||
this.on('message', messageHandler) | ||
|
||
return resultPromise | ||
} | ||
|
||
assertTelemetryReceived (fn, timeout, requestType, expectedMessageCount = 1) { | ||
timeout = timeout || 30000 | ||
let resultResolve | ||
let resultReject | ||
let msgCount = 0 | ||
const errors = [] | ||
|
||
const timeoutObj = setTimeout(() => { | ||
const errorsMsg = errors.length === 0 ? '' : `, additionally:\n${errors.map(e => e.stack).join('\n')}\n===\n` | ||
resultReject(new Error(`timeout${errorsMsg}`, { cause: { errors } })) | ||
}, timeout) | ||
|
||
const resultPromise = new Promise((resolve, reject) => { | ||
resultResolve = () => { | ||
clearTimeout(timeoutObj) | ||
resolve() | ||
} | ||
resultReject = (e) => { | ||
clearTimeout(timeoutObj) | ||
reject(e) | ||
} | ||
}) | ||
|
||
const messageHandler = msg => { | ||
if (msg.payload.request_type !== requestType) return | ||
msgCount += 1 | ||
try { | ||
fn(msg) | ||
if (msgCount === expectedMessageCount) { | ||
resultResolve() | ||
} | ||
} catch (e) { | ||
errors.push(e) | ||
} | ||
if (msgCount === expectedMessageCount) { | ||
this.removeListener('telemetry', messageHandler) | ||
} | ||
} | ||
this.on('telemetry', messageHandler) | ||
|
||
return resultPromise | ||
} | ||
} | ||
|
||
function buildExpressServer (agent) { | ||
const app = express() | ||
|
||
app.use(bodyParser.raw({ limit: Infinity, type: 'application/msgpack' })) | ||
app.use(bodyParser.json({ limit: Infinity, type: 'application/json' })) | ||
|
||
app.put('/v0.4/traces', (req, res) => { | ||
if (req.body.length === 0) return res.status(200).send() | ||
res.status(200).send({ rate_by_service: { 'service:,env:': 1 } }) | ||
agent.emit('message', { | ||
headers: req.headers, | ||
payload: msgpack.decode(req.body, { codec }) | ||
}) | ||
}) | ||
|
||
app.post('/v0.7/config', (req, res) => { | ||
const buffers = [] | ||
req.on('data', buffers.push.bind(buffers)) | ||
req.on('end', () => { | ||
const { products } = JSON.parse(Buffer.concat(buffers).toString()).client | ||
|
||
const expires = (new Date(Date.now() + 1000 * 60 * 60 * 24)).toISOString() // in 24 hours | ||
const clientID = uuid() // TODO: What is this? It isn't the runtime-id | ||
|
||
// Currently, only `opaque_backend_state` and `targets` are used by dd-trace-js in the object below | ||
const targets = { | ||
signatures: [], | ||
signed: { | ||
_type: 'targets', | ||
custom: { | ||
agent_refresh_interval: 5, | ||
opaque_backend_state: '' | ||
}, | ||
expires, | ||
spec_version: '1.0.0', | ||
targets: {}, | ||
version: 12345 | ||
} | ||
} | ||
const opaqueBackendState = { | ||
version: 2, | ||
state: { file_hashes: { key: [] } } | ||
} | ||
const targetFiles = [] | ||
const clientConfigs = [] | ||
|
||
const files = agent._rc_files.filter(([product]) => products.includes(product)) | ||
|
||
for (const { orgId, product, id, name, config } of files) { | ||
const path = `datadog/${orgId}/${product}/${id}/${name}` | ||
const fileDigest = createHash('sha256').update(config).digest() | ||
|
||
opaqueBackendState.state.file_hashes.key.push(fileDigest.toString('base64')) | ||
|
||
targets.signed.targets[path] = { | ||
custom: { | ||
c: [clientID], | ||
'tracer-predicates': { tracer_predicates_v1: [{ clientID }] }, | ||
v: 20 | ||
}, | ||
hashes: { sha256: fileDigest.toString('hex') }, | ||
length: config.length | ||
} | ||
|
||
targetFiles.push({ path, raw: base64(config) }) | ||
clientConfigs.push(path) | ||
} | ||
|
||
targets.signed.custom.opaque_backend_state = base64(opaqueBackendState) | ||
|
||
res.send(JSON.stringify({ | ||
roots: [], // Not used by dd-trace-js currently | ||
targets: base64(targets), | ||
target_files: targetFiles, | ||
client_configs: clientConfigs | ||
})) | ||
}) | ||
}) | ||
|
||
app.post('/profiling/v1/input', upload.any(), (req, res) => { | ||
res.status(200).send() | ||
agent.emit('message', { | ||
headers: req.headers, | ||
payload: req.body, | ||
files: req.files | ||
}) | ||
}) | ||
|
||
app.post('/telemetry/proxy/api/v2/apmtelemetry', (req, res) => { | ||
res.status(200).send() | ||
agent.emit('telemetry', { | ||
headers: req.headers, | ||
payload: req.body | ||
}) | ||
}) | ||
|
||
return app | ||
} | ||
|
||
function base64 (strOrObj) { | ||
const str = typeof strOrObj === 'string' ? strOrObj : JSON.stringify(strOrObj) | ||
return Buffer.from(str).toString('base64') | ||
} |
Oops, something went wrong.