diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 1fb5316becd..395c69de057 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -17,6 +17,10 @@ const consumerCommitCh = channel('apm:kafkajs:consume:commit') const consumerFinishCh = channel('apm:kafkajs:consume:finish') const consumerErrorCh = channel('apm:kafkajs:consume:error') +const batchConsumerStartCh = channel('apm:kafkajs:consume-batch:start') +const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish') +const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error') + function commitsFromEvent (event) { const { payload: { groupId, topics } } = event const commitList = [] @@ -96,6 +100,17 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf return createConsumer.apply(this, arguments) } + const eachMessageExtractor = (args) => { + const { topic, partition, message } = args[0] + return { topic, partition, message, groupId } + } + + const eachBatchExtractor = (args) => { + const { batch } = args[0] + const { topic, partition, messages } = batch + return { topic, partition, messages, groupId } + } + const consumer = createConsumer.apply(this, arguments) consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) @@ -103,43 +118,64 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const run = consumer.run const groupId = arguments[0].groupId - consumer.run = function ({ eachMessage, ...runArgs }) { - if (typeof eachMessage !== 'function') return run({ eachMessage, ...runArgs }) + consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { + eachMessage = wrapFunction( + eachMessage, + consumerStartCh, + consumerFinishCh, + consumerErrorCh, + eachMessageExtractor + ) + + eachBatch = wrapFunction( + eachBatch, + batchConsumerStartCh, + batchConsumerFinishCh, + batchConsumerErrorCh, + eachBatchExtractor + ) return run({ - eachMessage: function (...eachMessageArgs) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const { topic, partition, message } = eachMessageArgs[0] - consumerStartCh.publish({ topic, partition, message, groupId }) - try { - const result = eachMessage.apply(this, eachMessageArgs) - if (result && typeof result.then === 'function') { - result.then( - innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)), - innerAsyncResource.bind(err => { - if (err) { - consumerErrorCh.publish(err) - } - consumerFinishCh.publish(undefined) - }) - ) - } else { - consumerFinishCh.publish(undefined) - } - - return result - } catch (e) { - consumerErrorCh.publish(e) - consumerFinishCh.publish(undefined) - throw e - } - }) - }, + eachMessage, + eachBatch, ...runArgs }) } + return consumer }) return Kafka }) + +const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => { + return typeof fn === 'function' + ? function (...args) { + const innerAsyncResource = new AsyncResource('bound-anonymous-fn') + return innerAsyncResource.runInAsyncScope(() => { + const extractedArgs = extractArgs(args) + startCh.publish(extractedArgs) + try { + const result = fn.apply(this, args) + if (result && typeof result.then === 'function') { + result.then( + innerAsyncResource.bind(() => finishCh.publish(undefined)), + innerAsyncResource.bind(err => { + if (err) { + errorCh.publish(err) + } + finishCh.publish(undefined) + }) + ) + } else { + finishCh.publish(undefined) + } + return result + } catch (e) { + errorCh.publish(e) + finishCh.publish(undefined) + throw e + } + }) + } + : fn +} diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js new file mode 100644 index 00000000000..5a531267e9b --- /dev/null +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -0,0 +1,21 @@ +const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') +const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') + +class KafkajsBatchConsumerPlugin extends ConsumerPlugin { + static get id () { return 'kafkajs' } + static get operation () { return 'consume-batch' } + + start ({ topic, partition, messages, groupId }) { + if (!this.config.dsmEnabled) return + for (const message of messages) { + if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue + const payloadSize = getMessageSize(message) + this.tracer.decodeDataStreamsContext(message.headers) + this.tracer + .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize) + } + } +} + +module.exports = KafkajsBatchConsumerPlugin diff --git a/packages/datadog-plugin-kafkajs/src/index.js b/packages/datadog-plugin-kafkajs/src/index.js index 9e5aec80606..3d20e8af67e 100644 --- a/packages/datadog-plugin-kafkajs/src/index.js +++ b/packages/datadog-plugin-kafkajs/src/index.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('./producer') const ConsumerPlugin = require('./consumer') +const BatchConsumerPlugin = require('./batch-consumer') const CompositePlugin = require('../../dd-trace/src/plugins/composite') class KafkajsPlugin extends CompositePlugin { @@ -9,7 +10,8 @@ class KafkajsPlugin extends CompositePlugin { static get plugins () { return { producer: ProducerPlugin, - consumer: ConsumerPlugin + consumer: ConsumerPlugin, + batchConsumer: BatchConsumerPlugin } } } diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 1e63c8b01c6..3df303a95cf 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -145,7 +145,7 @@ describe('Plugin', () => { ) }) - describe('consumer', () => { + describe('consumer (eachMessage)', () => { let consumer beforeEach(async () => { @@ -387,7 +387,7 @@ describe('Plugin', () => { expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) }) - it('Should set a checkpoint on consume', async () => { + it('Should set a checkpoint on consume (eachMessage)', async () => { const runArgs = [] await consumer.run({ eachMessage: async () => { @@ -401,6 +401,20 @@ describe('Plugin', () => { } }) + it('Should set a checkpoint on consume (eachBatch)', async () => { + const runArgs = [] + await consumer.run({ + eachBatch: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + } + }) + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + it('Should set a message payload size when producing a message', async () => { const messages = [{ key: 'key1', value: 'test2' }] if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) {