diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 1671f897126..395c69de057 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -100,6 +100,17 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf return createConsumer.apply(this, arguments) } + const eachMessageExtractor = (args) => { + const { topic, partition, message } = args[0] + return { topic, partition, message, groupId } + } + + const eachBatchExtractor = (args) => { + const { batch } = args[0] + const { topic, partition, messages } = batch + return { topic, partition, messages, groupId } + } + const consumer = createConsumer.apply(this, arguments) consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) @@ -108,75 +119,63 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const groupId = arguments[0].groupId consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { - return run({ - eachMessage: typeof eachMessage === 'function' - ? function (...eachMessageArgs) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const { topic, partition, message } = eachMessageArgs[0] - consumerStartCh.publish({ topic, partition, message, groupId }) - try { - const result = eachMessage.apply(this, eachMessageArgs) - if (result && typeof result.then === 'function') { - result.then( - innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)), - innerAsyncResource.bind(err => { - if (err) { - consumerErrorCh.publish(err) - } - consumerFinishCh.publish(undefined) - }) - ) - } else { - consumerFinishCh.publish(undefined) - } + eachMessage = wrapFunction( + eachMessage, + consumerStartCh, + consumerFinishCh, + consumerErrorCh, + eachMessageExtractor + ) + + eachBatch = wrapFunction( + eachBatch, + batchConsumerStartCh, + batchConsumerFinishCh, + batchConsumerErrorCh, + eachBatchExtractor + ) - return result - } catch (e) { - consumerErrorCh.publish(e) - consumerFinishCh.publish(undefined) - throw e - } - }) - } - : eachMessage, - eachBatch: - typeof eachBatch === 'function' - ? function (...eachBatchArgs) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const { batch } = eachBatchArgs[0] - const { topic, partition, messages } = batch - batchConsumerStartCh.publish({ topic, partition, messages, groupId }) - try { - const result = eachBatch.apply(this, eachBatchArgs) - if (result && typeof result.then === 'function') { - result.then( - innerAsyncResource.bind(() => batchConsumerFinishCh.publish()), - innerAsyncResource.bind((err) => { - if (err) { - batchConsumerErrorCh.publish(err) - } - batchConsumerFinishCh.publish() - }) - ) - } else { - batchConsumerFinishCh.publish() - } - - return result - } catch (error) { - batchConsumerErrorCh.publish(error) - batchConsumerFinishCh.publish() - throw error - } - }) - } - : eachBatch, + return run({ + eachMessage, + eachBatch, ...runArgs }) } + return consumer }) return Kafka }) + +const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => { + return typeof fn === 'function' + ? function (...args) { + const innerAsyncResource = new AsyncResource('bound-anonymous-fn') + return innerAsyncResource.runInAsyncScope(() => { + const extractedArgs = extractArgs(args) + startCh.publish(extractedArgs) + try { + const result = fn.apply(this, args) + if (result && typeof result.then === 'function') { + result.then( + innerAsyncResource.bind(() => finishCh.publish(undefined)), + innerAsyncResource.bind(err => { + if (err) { + errorCh.publish(err) + } + finishCh.publish(undefined) + }) + ) + } else { + finishCh.publish(undefined) + } + return result + } catch (e) { + errorCh.publish(e) + finishCh.publish(undefined) + throw e + } + }) + } + : fn +} diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index f9937aa1295..5a531267e9b 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -7,15 +7,13 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get operation () { return 'consume-batch' } start ({ topic, partition, messages, groupId }) { - if (this.config.dsmEnabled) { - for (const message of messages) { - if (message?.headers && DsmPathwayCodec.contextExists(message.headers)) { - const payloadSize = getMessageSize(message) - this.tracer.decodeDataStreamsContext(message.headers) - this.tracer - .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize) - } - } + if (!this.config.dsmEnabled) return + for (const message of messages) { + if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue + const payloadSize = getMessageSize(message) + this.tracer.decodeDataStreamsContext(message.headers) + this.tracer + .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize) } } }