Skip to content

Commit

Permalink
Add filesystem events to the timeline (#4965)
Browse files Browse the repository at this point in the history
  • Loading branch information
szegedi authored and rochdev committed Dec 19, 2024
1 parent 4d5962d commit e13cc40
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 39 deletions.
40 changes: 40 additions & 0 deletions integration-tests/profiler/fstest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const fs = require('fs')
const os = require('os')
const path = require('path')

const tracer = require('dd-trace').init()
tracer.profilerStarted().then(() => {
tracer.trace('x', (_, done) => {
setImmediate(() => {
// Generate 1MB of random data
const buffer = Buffer.alloc(1024 * 1024)
for (let i = 0; i < buffer.length; i++) {
buffer[i] = Math.floor(Math.random() * 256)
}

// Create a temporary file
const tempFilePath = path.join(os.tmpdir(), 'tempfile.txt')

fs.writeFile(tempFilePath, buffer, (err) => {
if (err) throw err

// Read the data back
setImmediate(() => {
fs.readFile(tempFilePath, (err, readData) => {
setImmediate(() => {
// Delete the temporary file
fs.unlink(tempFilePath, (err) => {
if (err) throw err
})
done()
})
if (err) throw err
if (Buffer.compare(buffer, readData) !== 0) {
throw new Error('Data read from file is different from data written to file')
}
})
})
})
})
})
})
175 changes: 141 additions & 34 deletions integration-tests/profiler/profiler.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,108 @@ function expectTimeout (messagePromise, allowErrors = false) {
)
}

class TimelineEventProcessor {
constructor (strings, encoded) {
this.strings = strings
this.encoded = encoded
}
}

class NetworkEventProcessor extends TimelineEventProcessor {
constructor (strings, encoded) {
super(strings, encoded)

this.hostKey = strings.dedup('host')
this.addressKey = strings.dedup('address')
this.portKey = strings.dedup('port')
}

processLabel (label, processedLabels) {
switch (label.key) {
case this.hostKey:
processedLabels.host = label.str
return true
case this.addressKey:
processedLabels.address = label.str
return true
case this.portKey:
processedLabels.port = label.num
return true
default:
return false
}
}

decorateEvent (ev, pl) {
// Exactly one of these is defined
assert.isTrue(!!pl.address !== !!pl.host, this.encoded)
if (pl.address) {
ev.address = this.strings.strings[pl.address]
} else {
ev.host = this.strings.strings[pl.host]
}
if (pl.port) {
ev.port = pl.port
}
}
}

async function gatherNetworkTimelineEvents (cwd, scriptFilePath, eventType, args) {
return gatherTimelineEvents(cwd, scriptFilePath, eventType, args, NetworkEventProcessor)
}

class FilesystemEventProcessor extends TimelineEventProcessor {
constructor (strings, encoded) {
super(strings, encoded)

this.fdKey = strings.dedup('fd')
this.fileKey = strings.dedup('file')
this.flagKey = strings.dedup('flag')
this.modeKey = strings.dedup('mode')
this.pathKey = strings.dedup('path')
}

processLabel (label, processedLabels) {
switch (label.key) {
case this.fdKey:
processedLabels.fd = label.num
return true
case this.fileKey:
processedLabels.file = label.str
return true
case this.flagKey:
processedLabels.flag = label.str
return true
case this.modeKey:
processedLabels.mode = label.str
return true
case this.pathKey:
processedLabels.path = label.str
return true
default:
return false
}
}

decorateEvent (ev, pl) {
ev.fd = pl.fd
ev.file = this.strings.strings[pl.file]
ev.flag = this.strings.strings[pl.flag]
ev.mode = this.strings.strings[pl.mode]
ev.path = this.strings.strings[pl.path]
for (const [k, v] of Object.entries(ev)) {
if (v === undefined) {
delete ev[k]
}
}
}
}

async function gatherFilesystemTimelineEvents (cwd, scriptFilePath) {
return gatherTimelineEvents(cwd, scriptFilePath, 'fs', [], FilesystemEventProcessor)
}

async function gatherTimelineEvents (cwd, scriptFilePath, eventType, args, Processor) {
const procStart = BigInt(Date.now() * 1000000)
const proc = fork(path.join(cwd, scriptFilePath), args, {
cwd,
Expand All @@ -123,60 +224,50 @@ async function gatherNetworkTimelineEvents (cwd, scriptFilePath, eventType, args
const strings = profile.stringTable
const tsKey = strings.dedup('end_timestamp_ns')
const eventKey = strings.dedup('event')
const hostKey = strings.dedup('host')
const addressKey = strings.dedup('address')
const portKey = strings.dedup('port')
const nameKey = strings.dedup('operation')
const operationKey = strings.dedup('operation')
const spanIdKey = strings.dedup('span id')
const localRootSpanIdKey = strings.dedup('local root span id')
const eventValue = strings.dedup(eventType)
const events = []
const processor = new Processor(strings, encoded)
for (const sample of profile.sample) {
let ts, event, host, address, port, name, spanId, localRootSpanId
let ts, event, operation, spanId, localRootSpanId
const processedLabels = {}
const unexpectedLabels = []
for (const label of sample.label) {
switch (label.key) {
case tsKey: ts = label.num; break
case nameKey: name = label.str; break
case operationKey: operation = label.str; break
case eventKey: event = label.str; break
case hostKey: host = label.str; break
case addressKey: address = label.str; break
case portKey: port = label.num; break
case spanIdKey: spanId = label.str; break
case localRootSpanIdKey: localRootSpanId = label.str; break
default: unexpectedLabels.push(label.key)
default:
if (!processor.processLabel(label, processedLabels)) {
unexpectedLabels.push(label.key)
}
}
}
// Gather only DNS events; ignore sporadic GC events
// Timestamp must be defined and be between process start and end time
assert.isDefined(ts, encoded)
assert.isTrue(ts <= procEnd, encoded)
assert.isTrue(ts >= procStart, encoded)
// Gather only tested events
if (event === eventValue) {
// Timestamp must be defined and be between process start and end time
assert.isDefined(ts, encoded)
assert.isTrue(ts <= procEnd, encoded)
assert.isTrue(ts >= procStart, encoded)
if (process.platform !== 'win32') {
assert.isDefined(spanId, encoded)
assert.isDefined(localRootSpanId, encoded)
} else {
assert.isUndefined(spanId, encoded)
assert.isUndefined(localRootSpanId, encoded)
}
assert.isDefined(name, encoded)
assert.isDefined(operation, encoded)
if (unexpectedLabels.length > 0) {
const labelsStr = JSON.stringify(unexpectedLabels)
const labelsStrStr = unexpectedLabels.map(k => strings.strings[k]).join(',')
assert.fail(`Unexpected labels: ${labelsStr}\n${labelsStrStr}\n${encoded}`)
}
// Exactly one of these is defined
assert.isTrue(!!address !== !!host, encoded)
const ev = { name: strings.strings[name] }
if (address) {
ev.address = strings.strings[address]
} else {
ev.host = strings.strings[host]
}
if (port) {
ev.port = port
}
const ev = { operation: strings.strings[operation] }
processor.decorateEvent(ev, processedLabels)
events.push(ev)
}
}
Expand Down Expand Up @@ -323,14 +414,30 @@ describe('profiler', () => {
assert.equal(endpoints.size, 3, encoded)
})

it('fs timeline events work', async () => {
const fsEvents = await gatherFilesystemTimelineEvents(cwd, 'profiler/fstest.js')
assert.equal(fsEvents.length, 6)
const path = fsEvents[0].path
const fd = fsEvents[1].fd
assert(path.endsWith('tempfile.txt'))
assert.sameDeepMembers(fsEvents, [
{ flag: 'w', mode: '', operation: 'open', path },
{ fd, operation: 'write' },
{ fd, operation: 'close' },
{ file: path, operation: 'writeFile' },
{ operation: 'readFile', path },
{ operation: 'unlink', path }
])
})

it('dns timeline events work', async () => {
const dnsEvents = await gatherNetworkTimelineEvents(cwd, 'profiler/dnstest.js', 'dns')
assert.sameDeepMembers(dnsEvents, [
{ name: 'lookup', host: 'example.org' },
{ name: 'lookup', host: 'example.com' },
{ name: 'lookup', host: 'datadoghq.com' },
{ name: 'queryA', host: 'datadoghq.com' },
{ name: 'lookupService', address: '13.224.103.60', port: 80 }
{ operation: 'lookup', host: 'example.org' },
{ operation: 'lookup', host: 'example.com' },
{ operation: 'lookup', host: 'datadoghq.com' },
{ operation: 'queryA', host: 'datadoghq.com' },
{ operation: 'lookupService', address: '13.224.103.60', port: 80 }
])
})

Expand Down Expand Up @@ -366,8 +473,8 @@ describe('profiler', () => {
// The profiled program should have two TCP connection events to the two
// servers.
assert.sameDeepMembers(events, [
{ name: 'connect', host: '127.0.0.1', port: port1 },
{ name: 'connect', host: '127.0.0.1', port: port2 }
{ operation: 'connect', host: '127.0.0.1', port: port1 },
{ operation: 'connect', host: '127.0.0.1', port: port2 }
])
} finally {
server2.close()
Expand Down
3 changes: 3 additions & 0 deletions packages/datadog-instrumentations/src/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const errorChannel = channel('apm:fs:operation:error')
const ddFhSym = Symbol('ddFileHandle')
let kHandle, kDirReadPromisified, kDirClosePromisified

// Update packages/dd-trace/src/profiling/profilers/event_plugins/fs.js if you make changes to param names in any of
// the following objects.

const paramsByMethod = {
access: ['path', 'mode'],
appendFile: ['path', 'data', 'options'],
Expand Down
10 changes: 7 additions & 3 deletions packages/dd-trace/src/profiling/profilers/event_plugins/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ class EventPlugin extends TracingPlugin {
if (!store) return

const { startEvent, startTime, error } = store
if (error) {
return // don't emit perf events for failed operations
if (error || this.ignoreEvent(startEvent)) {
return // don't emit perf events for failed operations or ignored events
}
const duration = performance.now() - startTime

const duration = performance.now() - startTime
const event = {
entryType: this.entryType,
startTime,
Expand All @@ -53,6 +53,10 @@ class EventPlugin extends TracingPlugin {

this.eventHandler(this.extendEvent(event, startEvent))
}

ignoreEvent () {
return false
}
}

module.exports = EventPlugin
49 changes: 49 additions & 0 deletions packages/dd-trace/src/profiling/profilers/event_plugins/fs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const EventPlugin = require('./event')

// Values taken from parameter names in datadog-instrumentations/src/fs.js.
// Known param names that are disallowed because they can be strings and have arbitrary sizes:
// 'data'
// Known param names that are disallowed because they are never a string or number:
// 'buffer', 'buffers', 'listener'
const allowedParams = new Set([
'atime', 'dest',
'existingPath', 'fd', 'file',
'flag', 'gid', 'len',
'length', 'mode', 'mtime',
'newPath', 'offset', 'oldPath',
'operation', 'options', 'path',
'position', 'prefix', 'src',
'target', 'type', 'uid'
])

class FilesystemPlugin extends EventPlugin {
static get id () {
return 'fs'
}

static get operation () {
return 'operation'
}

static get entryType () {
return 'fs'
}

ignoreEvent (event) {
// Don't care about sync events, they show up in the event loop samples anyway
return event.operation?.endsWith('Sync')
}

extendEvent (event, detail) {
const d = { ...detail }
Object.entries(d).forEach(([k, v]) => {
if (!(allowedParams.has(k) && (typeof v === 'string' || typeof v === 'number'))) {
delete d[k]
}
})
event.detail = d

return event
}
}
module.exports = FilesystemPlugin
Loading

0 comments on commit e13cc40

Please sign in to comment.