diff --git a/packages/datadog-instrumentations/src/aws-sdk.js b/packages/datadog-instrumentations/src/aws-sdk.js index 8ea5552fe1a..5a2efd00681 100644 --- a/packages/datadog-instrumentations/src/aws-sdk.js +++ b/packages/datadog-instrumentations/src/aws-sdk.js @@ -20,7 +20,8 @@ function wrapRequest (send) { return innerAr.runInAsyncScope(() => { this.on('complete', innerAr.bind(response => { - channel(`apm:aws:request:complete:${channelSuffix}`).publish({ response }) + const cbExists = typeof cb === 'function' + channel(`apm:aws:request:complete:${channelSuffix}`).publish({ response, cbExists }) })) startCh.publish({ diff --git a/packages/datadog-plugin-aws-sdk/src/base.js b/packages/datadog-plugin-aws-sdk/src/base.js index 21e4bfa47f6..7254a0a2fa9 100644 --- a/packages/datadog-plugin-aws-sdk/src/base.js +++ b/packages/datadog-plugin-aws-sdk/src/base.js @@ -64,11 +64,17 @@ class BaseAwsSdkPlugin extends ClientPlugin { span.setTag('region', region) }) - this.addSub(`apm:aws:request:complete:${this.serviceIdentifier}`, ({ response }) => { + this.addSub(`apm:aws:request:complete:${this.serviceIdentifier}`, ({ response, cbExists = false }) => { const store = storage.getStore() if (!store) return const { span } = store if (!span) return + // try to extract DSM context from response if no callback exists as extraction normally happens in CB + if (!cbExists && this.serviceIdentifier === 'sqs') { + const params = response.request.params + const operation = response.request.operation + this.responseExtractDSMContext(operation, params, response.data, span) + } this.addResponseTags(span, response) this.finish(span, response, response.error) }) diff --git a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js index f33ec7f5dc8..9e8eb2dcd44 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js +++ b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js @@ -52,7 +52,7 @@ class Kinesis extends BaseAwsSdkPlugin { // extract DSM context after as we might not have a parent-child but may have a DSM context this.responseExtractDSMContext( - request.operation, response, span || null, streamName + request.operation, request.params, response, span || null, { streamName } ) } }) @@ -100,7 +100,8 @@ class Kinesis extends BaseAwsSdkPlugin { } } - responseExtractDSMContext (operation, response, span, streamName) { + responseExtractDSMContext (operation, params, response, span, kwargs = {}) { + const { streamName } = kwargs if (!this.config.dsmEnabled) return if (operation !== 'getRecords') return if (!response || !response.Records || !response.Records[0]) return diff --git a/packages/datadog-plugin-aws-sdk/src/services/sqs.js b/packages/datadog-plugin-aws-sdk/src/services/sqs.js index 769d9fc9ac6..62ede0ae6e4 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sqs.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sqs.js @@ -23,7 +23,7 @@ class Sqs extends BaseAwsSdkPlugin { const plugin = this const contextExtraction = this.responseExtract(request.params, request.operation, response) let span - let parsedMessageAttributes + let parsedMessageAttributes = null if (contextExtraction && contextExtraction.datadogContext) { obj.needsFinish = true const options = { @@ -39,8 +39,9 @@ class Sqs extends BaseAwsSdkPlugin { this.enter(span, store) } // extract DSM context after as we might not have a parent-child but may have a DSM context + this.responseExtractDSMContext( - request.operation, request.params, response, span || null, parsedMessageAttributes || null + request.operation, request.params, response, span || null, { parsedMessageAttributes } ) }) @@ -165,7 +166,8 @@ class Sqs extends BaseAwsSdkPlugin { } } - responseExtractDSMContext (operation, params, response, span, parsedAttributes) { + responseExtractDSMContext (operation, params, response, span, kwargs = {}) { + let { parsedAttributes } = kwargs if (!this.config.dsmEnabled) return if (operation !== 'receiveMessage') return if (!response || !response.Messages || !response.Messages[0]) return @@ -188,7 +190,7 @@ class Sqs extends BaseAwsSdkPlugin { // SQS to SQS } } - if (message.MessageAttributes && message.MessageAttributes._datadog) { + if (!parsedAttributes && message.MessageAttributes && message.MessageAttributes._datadog) { parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog) } } @@ -219,6 +221,17 @@ class Sqs extends BaseAwsSdkPlugin { this.injectToMessage(span, params.Entries[i], params.QueueUrl, i === 0) } break + case 'receiveMessage': + if (!params.MessageAttributeNames) { + params.MessageAttributeNames = ['_datadog'] + } else if ( + !params.MessageAttributeNames.includes('_datadog') && + !params.MessageAttributeNames.includes('.*') && + !params.MessageAttributeNames.includes('All') + ) { + params.MessageAttributeNames.push('_datadog') + } + break } } diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js index fee2a918810..a32f8a51a86 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js @@ -3,6 +3,7 @@ const sinon = require('sinon') const agent = require('../../dd-trace/test/plugins/agent') const { setup } = require('./spec_helpers') +const semver = require('semver') const { rawExpectedSchema } = require('./sqs-naming') const queueName = 'SQS_QUEUE_NAME' @@ -408,6 +409,34 @@ describe('Plugin', () => { }) }) + if (sqsClientName === 'aws-sdk' && semver.intersects(version, '>=2.3')) { + it('Should set pathway hash tag on a span when consuming and promise() was used over a callback', + async () => { + await sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }) + await sqs.receiveMessage({ QueueUrl: QueueUrlDsm }).promise() + + let consumeSpanMeta = {} + return new Promise((resolve, reject) => { + agent.use(traces => { + const span = traces[0][0] + + if (span.name === 'aws.request' && span.meta['aws.operation'] === 'receiveMessage') { + consumeSpanMeta = span.meta + } + + try { + expect(consumeSpanMeta).to.include({ + 'pathway.hash': expectedConsumerHash + }) + resolve() + } catch (error) { + reject(error) + } + }) + }) + }) + } + it('Should emit DSM stats to the agent when sending a message', done => { agent.expectPipelineStats(dsmStats => { let statsPointsReceived = 0