Skip to content

Commit

Permalink
tracing(aws-sdk): improve sqs dsm tracing experience (#4425)
Browse files Browse the repository at this point in the history
* ensure datadog attributes are received by sqs and set DSM extraction context in request complete if no callback is used
  • Loading branch information
wconti27 authored and juan-fernandez committed Jul 11, 2024
1 parent 7de24cb commit d37e73d
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 8 deletions.
3 changes: 2 additions & 1 deletion packages/datadog-instrumentations/src/aws-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ function wrapRequest (send) {

return innerAr.runInAsyncScope(() => {
this.on('complete', innerAr.bind(response => {
channel(`apm:aws:request:complete:${channelSuffix}`).publish({ response })
const cbExists = typeof cb === 'function'
channel(`apm:aws:request:complete:${channelSuffix}`).publish({ response, cbExists })
}))

startCh.publish({
Expand Down
8 changes: 7 additions & 1 deletion packages/datadog-plugin-aws-sdk/src/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ class BaseAwsSdkPlugin extends ClientPlugin {
span.setTag('region', region)
})

this.addSub(`apm:aws:request:complete:${this.serviceIdentifier}`, ({ response }) => {
this.addSub(`apm:aws:request:complete:${this.serviceIdentifier}`, ({ response, cbExists = false }) => {
const store = storage.getStore()
if (!store) return
const { span } = store
if (!span) return
// try to extract DSM context from response if no callback exists as extraction normally happens in CB
if (!cbExists && this.serviceIdentifier === 'sqs') {
const params = response.request.params
const operation = response.request.operation
this.responseExtractDSMContext(operation, params, response.data, span)
}
this.addResponseTags(span, response)
this.finish(span, response, response.error)
})
Expand Down
5 changes: 3 additions & 2 deletions packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Kinesis extends BaseAwsSdkPlugin {

// extract DSM context after as we might not have a parent-child but may have a DSM context
this.responseExtractDSMContext(
request.operation, response, span || null, streamName
request.operation, request.params, response, span || null, { streamName }
)
}
})
Expand Down Expand Up @@ -100,7 +100,8 @@ class Kinesis extends BaseAwsSdkPlugin {
}
}

responseExtractDSMContext (operation, response, span, streamName) {
responseExtractDSMContext (operation, params, response, span, kwargs = {}) {
const { streamName } = kwargs
if (!this.config.dsmEnabled) return
if (operation !== 'getRecords') return
if (!response || !response.Records || !response.Records[0]) return
Expand Down
21 changes: 17 additions & 4 deletions packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Sqs extends BaseAwsSdkPlugin {
const plugin = this
const contextExtraction = this.responseExtract(request.params, request.operation, response)
let span
let parsedMessageAttributes
let parsedMessageAttributes = null
if (contextExtraction && contextExtraction.datadogContext) {
obj.needsFinish = true
const options = {
Expand All @@ -39,8 +39,9 @@ class Sqs extends BaseAwsSdkPlugin {
this.enter(span, store)
}
// extract DSM context after as we might not have a parent-child but may have a DSM context

this.responseExtractDSMContext(
request.operation, request.params, response, span || null, parsedMessageAttributes || null
request.operation, request.params, response, span || null, { parsedMessageAttributes }
)
})

Expand Down Expand Up @@ -165,7 +166,8 @@ class Sqs extends BaseAwsSdkPlugin {
}
}

responseExtractDSMContext (operation, params, response, span, parsedAttributes) {
responseExtractDSMContext (operation, params, response, span, kwargs = {}) {
let { parsedAttributes } = kwargs
if (!this.config.dsmEnabled) return
if (operation !== 'receiveMessage') return
if (!response || !response.Messages || !response.Messages[0]) return
Expand All @@ -188,7 +190,7 @@ class Sqs extends BaseAwsSdkPlugin {
// SQS to SQS
}
}
if (message.MessageAttributes && message.MessageAttributes._datadog) {
if (!parsedAttributes && message.MessageAttributes && message.MessageAttributes._datadog) {
parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog)
}
}
Expand Down Expand Up @@ -219,6 +221,17 @@ class Sqs extends BaseAwsSdkPlugin {
this.injectToMessage(span, params.Entries[i], params.QueueUrl, i === 0)
}
break
case 'receiveMessage':
if (!params.MessageAttributeNames) {
params.MessageAttributeNames = ['_datadog']
} else if (
!params.MessageAttributeNames.includes('_datadog') &&
!params.MessageAttributeNames.includes('.*') &&
!params.MessageAttributeNames.includes('All')
) {
params.MessageAttributeNames.push('_datadog')
}
break
}
}

Expand Down
29 changes: 29 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/sqs.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const sinon = require('sinon')
const agent = require('../../dd-trace/test/plugins/agent')
const { setup } = require('./spec_helpers')
const semver = require('semver')
const { rawExpectedSchema } = require('./sqs-naming')

const queueName = 'SQS_QUEUE_NAME'
Expand Down Expand Up @@ -408,6 +409,34 @@ describe('Plugin', () => {
})
})

if (sqsClientName === 'aws-sdk' && semver.intersects(version, '>=2.3')) {
it('Should set pathway hash tag on a span when consuming and promise() was used over a callback',
async () => {
await sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm })
await sqs.receiveMessage({ QueueUrl: QueueUrlDsm }).promise()

let consumeSpanMeta = {}
return new Promise((resolve, reject) => {
agent.use(traces => {
const span = traces[0][0]

if (span.name === 'aws.request' && span.meta['aws.operation'] === 'receiveMessage') {
consumeSpanMeta = span.meta
}

try {
expect(consumeSpanMeta).to.include({
'pathway.hash': expectedConsumerHash
})
resolve()
} catch (error) {
reject(error)
}
})
})
})
}

it('Should emit DSM stats to the agent when sending a message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
Expand Down

0 comments on commit d37e73d

Please sign in to comment.