Skip to content

Commit

Permalink
[DI] Add a global max snapshot sample rate of 25/second
Browse files Browse the repository at this point in the history
Each enhanced log probe has a sample rate of one second. However, too
many individual probes might still overload the system, so a global
snapshot sample rate across all enhanced log probes is required.
  • Loading branch information
watson committed Jan 8, 2025
1 parent b36ce05 commit d1d8056
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 9 deletions.
86 changes: 86 additions & 0 deletions integration-tests/debugger/snapshot-global-sample-rate.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
'use strict'

const { assert } = require('chai')
const { setup } = require('./utils')

describe('Dynamic Instrumentation', function () {
const t = setup({
testApp: 'target-app/basic.js'
})

describe('input messages', function () {
describe('with snapshot', function () {
beforeEach(t.triggerBreakpoint)

it('should respect global max snapshot sampling rate', function (done) {
const MAX_SNAPSHOTS_PER_SECOND_GLOBALLY = 25
const snapshotsPerSecond = MAX_SNAPSHOTS_PER_SECOND_GLOBALLY * 2
const probeConf = { captureSnapshot: true, sampling: { snapshotsPerSecond } }
let start = 0
let hitBreakpoints = 0
let isDone = false
let prevTimestamp

const rcConfig1 = t.breakpoints[0].generateRemoteConfig(probeConf)
const rcConfig2 = t.breakpoints[1].generateRemoteConfig(probeConf)

// Two breakpoints, each triggering a request every 10ms, so we should get 200 requests per second
const state = {
[rcConfig1.config.id]: {
tiggerBreakpointContinuously () {
t.axios.get(t.breakpoints[0].url).catch(done)
this.timer = setTimeout(this.tiggerBreakpointContinuously.bind(this), 10)
}
},
[rcConfig2.config.id]: {
tiggerBreakpointContinuously () {
t.axios.get(t.breakpoints[1].url).catch(done)
this.timer = setTimeout(this.tiggerBreakpointContinuously.bind(this), 10)
}
}
}

t.agent.on('debugger-diagnostics', ({ payload }) => {
payload.forEach((event) => {
const { probeId, status } = event.debugger.diagnostics
if (status === 'INSTALLED') {
state[probeId].tiggerBreakpointContinuously()
}
})
})

t.agent.on('debugger-input', ({ payload }) => {
payload.forEach(({ 'debugger.snapshot': { timestamp } }) => {
if (start === 0) start = timestamp
if (isDone) return // the buffer might still contain data that is flushed after the test finished
if (++hitBreakpoints <= MAX_SNAPSHOTS_PER_SECOND_GLOBALLY) {
prevTimestamp = timestamp
} else {
const duration = timestamp - start
const timeSincePrevTimestamp = timestamp - prevTimestamp

// Allow for a variance of +50ms (time will tell if this is enough)
assert.isAtLeast(duration, 1000)
assert.isBelow(duration, 1050)

// A sanity check to make sure we're not saturating the event loop. We expect a lot of snapshots to be
// sampled in the beginning of the sample window and then once the threshold is hit, we expect a "quiet"
// period until the end of the window. If there's no "quiet" period, then we're saturating the event loop
// and this test isn't really testing anything.
assert.isAtLeast(timeSincePrevTimestamp, 250)

clearTimeout(state[rcConfig1.config.id].timer)
clearTimeout(state[rcConfig2.config.id].timer)

isDone = true
done()
}
})
})

t.agent.addRemoteConfig(rcConfig1)
t.agent.addRemoteConfig(rcConfig2)
})
})
})
})
1 change: 1 addition & 0 deletions packages/dd-trace/src/debugger/devtools_client/defaults.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

module.exports = {
MAX_SNAPSHOTS_PER_SECOND_GLOBALLY: 25,
MAX_SNAPSHOTS_PER_SECOND_PER_PROBE: 1,
MAX_NON_SNAPSHOTS_PER_SECOND_PER_PROBE: 5_000
}
36 changes: 27 additions & 9 deletions packages/dd-trace/src/debugger/devtools_client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const send = require('./send')
const { getStackFromCallFrames } = require('./state')
const { ackEmitting, ackError } = require('./status')
const { parentThreadId } = require('./config')
const { MAX_SNAPSHOTS_PER_SECOND_GLOBALLY } = require('./defaults')
const log = require('../../log')
const { version } = require('../../../../../package.json')

Expand All @@ -24,11 +25,14 @@ const expression = `
const threadId = parentThreadId === 0 ? `pid:${process.pid}` : `pid:${process.pid};tid:${parentThreadId}`
const threadName = parentThreadId === 0 ? 'MainThread' : `WorkerThread:${parentThreadId}`

const oneSecondNs = BigInt(1_000_000_000)
let globalSnapshotSamplingRateWindowStart = BigInt(0)
let snapshotsSampledWithinTheLastSecond = 0

// WARNING: The code above the line `await session.post('Debugger.resume')` is highly optimized. Please edit with care!
session.on('Debugger.paused', async ({ params }) => {
const start = process.hrtime.bigint()

let captureSnapshotForProbe = null
let maxReferenceDepth, maxCollectionSize, maxFieldCount, maxLength

// V8 doesn't allow seting more than one breakpoint at a specific location, however, it's possible to set two
Expand All @@ -38,6 +42,9 @@ session.on('Debugger.paused', async ({ params }) => {
let sampled = false
const length = params.hitBreakpoints.length
let probes = new Array(length)
// TODO: Consider reusing this array between pauses and only recreating it if it needs to grow
const snapshotProbeIndex = new Uint8Array(length) // TODO: Is a limit of 256 probes ever going to be a problem?
let numberOfProbesWithSnapshots = 0
for (let i = 0; i < length; i++) {
const id = params.hitBreakpoints[i]
const probe = breakpoints.get(id)
Expand All @@ -46,17 +53,28 @@ session.on('Debugger.paused', async ({ params }) => {
continue
}

sampled = true
probe.lastCaptureNs = start

if (probe.captureSnapshot === true) {
captureSnapshotForProbe = probe
// This algorithm to calculate number of sampled snapshots within the last second is not perfect, as it's not a
// sliding window. But it's quick and easy :)
if (i === 0 && start - globalSnapshotSamplingRateWindowStart > oneSecondNs) {
snapshotsSampledWithinTheLastSecond = 1
globalSnapshotSamplingRateWindowStart = start
} else if (snapshotsSampledWithinTheLastSecond >= MAX_SNAPSHOTS_PER_SECOND_GLOBALLY) {
continue
} else {
snapshotsSampledWithinTheLastSecond++
}

snapshotProbeIndex[numberOfProbesWithSnapshots++] = i
maxReferenceDepth = highestOrUndefined(probe.capture.maxReferenceDepth, maxReferenceDepth)
maxCollectionSize = highestOrUndefined(probe.capture.maxCollectionSize, maxCollectionSize)
maxFieldCount = highestOrUndefined(probe.capture.maxFieldCount, maxFieldCount)
maxLength = highestOrUndefined(probe.capture.maxLength, maxLength)
}

sampled = true
probe.lastCaptureNs = start

probes[i] = probe
}

Expand All @@ -68,17 +86,17 @@ session.on('Debugger.paused', async ({ params }) => {
const dd = await getDD(params.callFrames[0].callFrameId)

let processLocalState
if (captureSnapshotForProbe !== null) {
if (numberOfProbesWithSnapshots !== 0) {
try {
// TODO: Create unique states for each affected probe based on that probes unique `capture` settings (DEBUG-2863)
processLocalState = await getLocalStateForCallFrame(
params.callFrames[0],
{ maxReferenceDepth, maxCollectionSize, maxFieldCount, maxLength }
)
} catch (err) {
// TODO: This error is not tied to a specific probe, but to all probes with `captureSnapshot: true`.
// However, in 99,99% of cases, there will be just a single probe, so I guess this simplification is ok?
ackError(err, captureSnapshotForProbe) // TODO: Ok to continue after sending ackError?
for (let i = 0; i < numberOfProbesWithSnapshots; i++) {
ackError(err, probes[snapshotProbeIndex[i]]) // TODO: Ok to continue after sending ackError?
}
}
}

Expand Down

0 comments on commit d1d8056

Please sign in to comment.