From 2e5e9e4afb6d75d6d297617734ef240774075bac Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 26 Aug 2024 11:23:34 -0400 Subject: [PATCH 1/8] support batch operations --- .../datadog-instrumentations/src/kafkajs.js | 95 ++++++++---- .../src/batch-consumer.js | 39 +++++ .../datadog-plugin-kafkajs/src/consumer.js | 20 +++ packages/datadog-plugin-kafkajs/src/index.js | 4 +- .../datadog-plugin-kafkajs/test/index.spec.js | 135 +++++++++++++++++- 5 files changed, 258 insertions(+), 35 deletions(-) create mode 100644 packages/datadog-plugin-kafkajs/src/batch-consumer.js diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 1fb5316becd..1671f897126 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -17,6 +17,10 @@ const consumerCommitCh = channel('apm:kafkajs:consume:commit') const consumerFinishCh = channel('apm:kafkajs:consume:finish') const consumerErrorCh = channel('apm:kafkajs:consume:error') +const batchConsumerStartCh = channel('apm:kafkajs:consume-batch:start') +const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish') +const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error') + function commitsFromEvent (event) { const { payload: { groupId, topics } } = event const commitList = [] @@ -103,39 +107,72 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const run = consumer.run const groupId = arguments[0].groupId - consumer.run = function ({ eachMessage, ...runArgs }) { - if (typeof eachMessage !== 'function') return run({ eachMessage, ...runArgs }) - + consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { return run({ - eachMessage: function (...eachMessageArgs) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const { topic, partition, message } = eachMessageArgs[0] - consumerStartCh.publish({ topic, partition, message, groupId }) - try { - const result = eachMessage.apply(this, eachMessageArgs) - if (result && typeof result.then === 'function') { - result.then( - innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)), - innerAsyncResource.bind(err => { - if (err) { - consumerErrorCh.publish(err) - } - consumerFinishCh.publish(undefined) - }) - ) - } else { + eachMessage: typeof eachMessage === 'function' + ? function (...eachMessageArgs) { + const innerAsyncResource = new AsyncResource('bound-anonymous-fn') + return innerAsyncResource.runInAsyncScope(() => { + const { topic, partition, message } = eachMessageArgs[0] + consumerStartCh.publish({ topic, partition, message, groupId }) + try { + const result = eachMessage.apply(this, eachMessageArgs) + if (result && typeof result.then === 'function') { + result.then( + innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)), + innerAsyncResource.bind(err => { + if (err) { + consumerErrorCh.publish(err) + } + consumerFinishCh.publish(undefined) + }) + ) + } else { + consumerFinishCh.publish(undefined) + } + + return result + } catch (e) { + consumerErrorCh.publish(e) consumerFinishCh.publish(undefined) + throw e } - - return result - } catch (e) { - consumerErrorCh.publish(e) - consumerFinishCh.publish(undefined) - throw e + }) + } + : eachMessage, + eachBatch: + typeof eachBatch === 'function' + ? function (...eachBatchArgs) { + const innerAsyncResource = new AsyncResource('bound-anonymous-fn') + return innerAsyncResource.runInAsyncScope(() => { + const { batch } = eachBatchArgs[0] + const { topic, partition, messages } = batch + batchConsumerStartCh.publish({ topic, partition, messages, groupId }) + try { + const result = eachBatch.apply(this, eachBatchArgs) + if (result && typeof result.then === 'function') { + result.then( + innerAsyncResource.bind(() => batchConsumerFinishCh.publish()), + innerAsyncResource.bind((err) => { + if (err) { + batchConsumerErrorCh.publish(err) + } + batchConsumerFinishCh.publish() + }) + ) + } else { + batchConsumerFinishCh.publish() + } + + return result + } catch (error) { + batchConsumerErrorCh.publish(error) + batchConsumerFinishCh.publish() + throw error + } + }) } - }) - }, + : eachBatch, ...runArgs }) } diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js new file mode 100644 index 00000000000..69f9c29f25f --- /dev/null +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -0,0 +1,39 @@ +const ConsumerPlugin = require('dd-trace/packages/dd-trace/src/plugins/consumer') +const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') + +class KafkajsBatchConsumerPlugin extends ConsumerPlugin { + static get operation () { + return 'consume-batch' + } + + start ({ topic, partition, messages, groupId }) { + const span = this.startSpan({ + resource: topic, + type: 'worker', + meta: { + component: 'kafkajs', + 'kafka.topic': topic, + 'kafka.message.offset': messages[0].offset, + 'kafka.message.offset.last': messages[messages.length - 1].offset + }, + metrics: { + 'kafka.partition': partition, + 'kafka.batch_size': messages.length + } + }) + + if (this.config.dsmEnabled) { + for (const message of messages) { + if (message?.headers && DsmPathwayCodec.contextExists(message.headers)) { + const payloadSize = getMessageSize(message) + this.tracer.decodeDataStreamsContext(message.headers) + this.tracer + .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], span, payloadSize) + } + } + } + } +} + +module.exports = KafkajsBatchConsumerPlugin diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 420fea10902..29f8739a8f8 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -90,6 +90,26 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } + startBatch ({ topic, partition, messages, groupId }) { + if (this.config.dsmEnabled) { + this.tracer.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']) + } + this.startSpan({ + resource: topic, + type: 'worker', + meta: { + component: 'kafkajs', + 'kafka.topic': topic, + 'kafka.message.offset': messages[0].offset, + 'kafka.message.offset.last': messages[messages.length - 1].offset + }, + metrics: { + 'kafka.partition': partition, + 'kafka.batch_size': messages.length + } + }) + } + finish () { if (beforeFinishCh.hasSubscribers) { beforeFinishCh.publish() diff --git a/packages/datadog-plugin-kafkajs/src/index.js b/packages/datadog-plugin-kafkajs/src/index.js index 9e5aec80606..3d20e8af67e 100644 --- a/packages/datadog-plugin-kafkajs/src/index.js +++ b/packages/datadog-plugin-kafkajs/src/index.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('./producer') const ConsumerPlugin = require('./consumer') +const BatchConsumerPlugin = require('./batch-consumer') const CompositePlugin = require('../../dd-trace/src/plugins/composite') class KafkajsPlugin extends CompositePlugin { @@ -9,7 +10,8 @@ class KafkajsPlugin extends CompositePlugin { static get plugins () { return { producer: ProducerPlugin, - consumer: ConsumerPlugin + consumer: ConsumerPlugin, + batchConsumer: BatchConsumerPlugin } } } diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 1e63c8b01c6..b3627fd8555 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -29,6 +29,7 @@ const expectedConsumerHash = computePathwayHash( describe('Plugin', () => { describe('kafkajs', function () { // TODO: remove when new internal trace has landed + let groupId this.timeout(10000) afterEach(() => { @@ -40,6 +41,7 @@ describe('Plugin', () => { let Kafka describe('without configuration', () => { const messages = [{ key: 'key1', value: 'test2' }] + groupId = 'test-group-' + Date.now() beforeEach(async () => { process.env.DD_DATA_STREAMS_ENABLED = 'true' @@ -145,11 +147,11 @@ describe('Plugin', () => { ) }) - describe('consumer', () => { + describe('consumer (eachMessage)', () => { let consumer beforeEach(async () => { - consumer = kafka.consumer({ groupId: 'test-group' }) + consumer = kafka.consumer({ groupId }) await consumer.connect() await consumer.subscribe({ topic: testTopic }) }) @@ -342,13 +344,125 @@ describe('Plugin', () => { ) }) + describe('consumer (eachBatch)', () => { + let consumer + + beforeEach(async () => { + consumer = kafka.consumer({ groupId }) + await consumer.subscribe({ topic: testTopic }) + await consumer.connect() + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + it('should be instrumented', async () => { + const expectedSpanPromise = expectSpanWithDefaults({ + name: expectedSchema.receive.opName, + service: expectedSchema.receive.serviceName, + meta: { + 'span.kind': 'consumer', + component: 'kafkajs' + }, + resource: testTopic, + error: 0, + type: 'worker' + }) + + await consumer.run({ + eachBatch: () => {} + }) + await sendMessages(kafka, testTopic, messages) + + return expectedSpanPromise + }) + + it('should run the consumer in the context of the consumer span', done => { + const firstSpan = tracer.scope().active() + let eachBatch = async ({ batch }) => { + const currentSpan = tracer.scope().active() + + try { + expect(currentSpan).to.not.equal(firstSpan) + expect(currentSpan.context()._name).to.equal(expectedSchema.receive.opName) + done() + } catch (e) { + done(e) + } finally { + eachBatch = () => {} // avoid being called for each message + } + } + + consumer.run({ eachBatch: (...args) => eachBatch(...args) }) + .then(() => sendMessages(kafka, testTopic, messages)) + .catch(done) + }) + + it('should propagate context', async () => { + const expectedSpanPromise = agent.use(traces => { + const span = traces[0][0] + + expect(span).to.include({ + name: 'kafka.consume-batch', + service: 'test-kafka', + resource: testTopic + }) + + expect(parseInt(span.parent_id.toString())).to.be.gt(0) + }) + + await consumer.run({ eachBatch: () => {} }) + await sendMessages(kafka, testTopic, messages) + await expectedSpanPromise + }) + + it('should be instrumented w/ error', async () => { + const fakeError = new Error('Oh No!') + const expectedSpanPromise = expectSpanWithDefaults({ + name: expectedSchema.receive.opName, + service: expectedSchema.receive.serviceName, + meta: { + [ERROR_TYPE]: fakeError.name, + [ERROR_MESSAGE]: fakeError.message, + [ERROR_STACK]: fakeError.stack, + component: 'kafkajs' + }, + resource: testTopic, + error: 1, + type: 'worker' + + }) + + await consumer.subscribe({ topic: testTopic, fromBeginning: true }) + await consumer.run({ + eachBatch: async ({ batch }) => { + throw fakeError + } + }) + await sendMessages(kafka, testTopic, messages) + + return expectedSpanPromise + }) + + withNamingSchema( + async () => { + await consumer.run({ eachBatch: () => {} }) + await sendMessages(kafka, testTopic, messages) + }, + () => expectedSchema.send.opName, + () => expectedSchema.send.serviceName, + 'test' + ) + }) + describe('data stream monitoring', () => { let consumer beforeEach(async () => { tracer.init() tracer.use('kafkajs', { dsmEnabled: true }) - consumer = kafka.consumer({ groupId: 'test-group' }) + consumer = kafka.consumer({ groupId }) await consumer.connect() await consumer.subscribe({ topic: testTopic }) }) @@ -377,7 +491,7 @@ describe('Plugin', () => { const expectedConsumerHash = computePathwayHash( 'test', 'tester', - ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'], + ['direction:in', 'group:' + groupId, 'topic:' + testTopic, 'type:kafka'], expectedProducerHash ) @@ -387,7 +501,7 @@ describe('Plugin', () => { expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) }) - it('Should set a checkpoint on consume', async () => { + it('Should set a checkpoint on consume (eachMessage)', async () => { const runArgs = [] await consumer.run({ eachMessage: async () => { @@ -401,6 +515,17 @@ describe('Plugin', () => { } }) + it('Should set a checkpoint on consume (eachBatch)', async () => { + await sendMessages(kafka, testTopic, messages) + const setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + await consumer.run({ + eachBatch: async ({ batch, heartbeat, pause }) => { + expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedConsumerHash) + } + }) + setDataStreamsContextSpy.restore() + }) + it('Should set a message payload size when producing a message', async () => { const messages = [{ key: 'key1', value: 'test2' }] if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { From 59026f51a11c35284a256d431202e419e7a31d1e Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 26 Aug 2024 15:07:38 -0400 Subject: [PATCH 2/8] fix tests --- .../src/batch-consumer.js | 17 ++++-- .../datadog-plugin-kafkajs/src/consumer.js | 53 +++++++------------ packages/datadog-plugin-kafkajs/src/utils.js | 17 ++++++ .../datadog-plugin-kafkajs/test/index.spec.js | 32 +++++------ 4 files changed, 67 insertions(+), 52 deletions(-) create mode 100644 packages/datadog-plugin-kafkajs/src/utils.js diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 69f9c29f25f..12c87f90e06 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -1,14 +1,23 @@ -const ConsumerPlugin = require('dd-trace/packages/dd-trace/src/plugins/consumer') +const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') +const { extract } = require('./utils') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') class KafkajsBatchConsumerPlugin extends ConsumerPlugin { - static get operation () { - return 'consume-batch' - } + static get id () { return 'kafkajs' } + static get operation () { return 'consume-batch' } start ({ topic, partition, messages, groupId }) { + let childOf + for (const message of messages) { + childOf = extract(this.tracer, message?.headers) + if (childOf._traceId !== null) { + break + } + } + const span = this.startSpan({ + childOf, resource: topic, type: 'worker', meta: { diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 29f8739a8f8..5c1598bbfd5 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -1,6 +1,7 @@ 'use strict' const dc = require('dc-polyfill') +const { extract } = require('./utils') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') @@ -90,25 +91,25 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } - startBatch ({ topic, partition, messages, groupId }) { - if (this.config.dsmEnabled) { - this.tracer.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']) - } - this.startSpan({ - resource: topic, - type: 'worker', - meta: { - component: 'kafkajs', - 'kafka.topic': topic, - 'kafka.message.offset': messages[0].offset, - 'kafka.message.offset.last': messages[messages.length - 1].offset - }, - metrics: { - 'kafka.partition': partition, - 'kafka.batch_size': messages.length - } - }) - } + // startBatch ({ topic, partition, messages, groupId }) { + // if (this.config.dsmEnabled) { + // this.tracer.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']) + // } + // this.startSpan({ + // resource: topic, + // type: 'worker', + // meta: { + // component: 'kafkajs', + // 'kafka.topic': topic, + // 'kafka.message.offset': messages[0].offset, + // 'kafka.message.offset.last': messages[messages.length - 1].offset + // }, + // metrics: { + // 'kafka.partition': partition, + // 'kafka.batch_size': messages.length + // } + // }) + // } finish () { if (beforeFinishCh.hasSubscribers) { @@ -119,18 +120,4 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } -function extract (tracer, bufferMap) { - if (!bufferMap) return null - - const textMap = {} - - for (const key of Object.keys(bufferMap)) { - if (bufferMap[key] === null || bufferMap[key] === undefined) continue - - textMap[key] = bufferMap[key].toString() - } - - return tracer.extract('text_map', textMap) -} - module.exports = KafkajsConsumerPlugin diff --git a/packages/datadog-plugin-kafkajs/src/utils.js b/packages/datadog-plugin-kafkajs/src/utils.js new file mode 100644 index 00000000000..6a1eb1bf6ad --- /dev/null +++ b/packages/datadog-plugin-kafkajs/src/utils.js @@ -0,0 +1,17 @@ +function extract (tracer, bufferMap) { + if (!bufferMap) return null + + const textMap = {} + + for (const key of Object.keys(bufferMap)) { + if (bufferMap[key] === null || bufferMap[key] === undefined) continue + + textMap[key] = bufferMap[key].toString() + } + + return tracer.extract('text_map', textMap) +} + +module.exports = { + extract +} diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index b3627fd8555..cbe159ec770 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -29,7 +29,6 @@ const expectedConsumerHash = computePathwayHash( describe('Plugin', () => { describe('kafkajs', function () { // TODO: remove when new internal trace has landed - let groupId this.timeout(10000) afterEach(() => { @@ -41,7 +40,6 @@ describe('Plugin', () => { let Kafka describe('without configuration', () => { const messages = [{ key: 'key1', value: 'test2' }] - groupId = 'test-group-' + Date.now() beforeEach(async () => { process.env.DD_DATA_STREAMS_ENABLED = 'true' @@ -151,7 +149,7 @@ describe('Plugin', () => { let consumer beforeEach(async () => { - consumer = kafka.consumer({ groupId }) + consumer = kafka.consumer({ groupId: 'test-group' }) await consumer.connect() await consumer.subscribe({ topic: testTopic }) }) @@ -348,7 +346,7 @@ describe('Plugin', () => { let consumer beforeEach(async () => { - consumer = kafka.consumer({ groupId }) + consumer = kafka.consumer({ groupId: 'test-group' }) await consumer.subscribe({ topic: testTopic }) await consumer.connect() }) @@ -404,10 +402,13 @@ describe('Plugin', () => { const span = traces[0][0] expect(span).to.include({ - name: 'kafka.consume-batch', + name: 'kafka.consume', service: 'test-kafka', resource: testTopic }) + expect(span.metrics).to.include({ + 'kafka.batch_size': 1 + }) expect(parseInt(span.parent_id.toString())).to.be.gt(0) }) @@ -450,9 +451,7 @@ describe('Plugin', () => { await consumer.run({ eachBatch: () => {} }) await sendMessages(kafka, testTopic, messages) }, - () => expectedSchema.send.opName, - () => expectedSchema.send.serviceName, - 'test' + rawExpectedSchema.receive ) }) @@ -462,7 +461,7 @@ describe('Plugin', () => { beforeEach(async () => { tracer.init() tracer.use('kafkajs', { dsmEnabled: true }) - consumer = kafka.consumer({ groupId }) + consumer = kafka.consumer({ groupId: 'test-group' }) await consumer.connect() await consumer.subscribe({ topic: testTopic }) }) @@ -491,7 +490,7 @@ describe('Plugin', () => { const expectedConsumerHash = computePathwayHash( 'test', 'tester', - ['direction:in', 'group:' + groupId, 'topic:' + testTopic, 'type:kafka'], + ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'], expectedProducerHash ) @@ -516,14 +515,17 @@ describe('Plugin', () => { }) it('Should set a checkpoint on consume (eachBatch)', async () => { - await sendMessages(kafka, testTopic, messages) - const setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + const runArgs = [] await consumer.run({ - eachBatch: async ({ batch, heartbeat, pause }) => { - expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedConsumerHash) + eachBatch: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) } }) - setDataStreamsContextSpy.restore() + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } }) it('Should set a message payload size when producing a message', async () => { From 8b857a667b2aff8f51311cd3a8249fa7e58955f6 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 28 Aug 2024 10:20:16 -0400 Subject: [PATCH 3/8] add configuration --- .../src/batch-consumer.js | 30 ++++- .../datadog-plugin-kafkajs/test/index.spec.js | 112 +++++++++++++++++- 2 files changed, 136 insertions(+), 6 deletions(-) diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 12c87f90e06..d439787ae8a 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -2,17 +2,26 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { extract } = require('./utils') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') +const { isTrue } = require('../../dd-trace/src/util') +const coalesce = require('koalas') class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } static get operation () { return 'consume-batch' } + configure (config) { + super.configure(coalesceConfiguration(config, this.serviceIdentifier)) + } + start ({ topic, partition, messages, groupId }) { let childOf - for (const message of messages) { - childOf = extract(this.tracer, message?.headers) - if (childOf._traceId !== null) { - break + if (this.config.batchedParentPropagationEnabled) { + for (const message of messages) { + // find the first valid context and use this as this span's parent + childOf = extract(this.tracer, message?.headers) + if (childOf._traceId !== null) { + break + } } } @@ -45,4 +54,17 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { } } +function coalesceConfiguration (config) { + // check if batch propagation is enabled via env variable + config.batchedParentPropagationEnabled = isTrue( + coalesce( + process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED, + config.batchedParentPropagationEnabled, + false + ) + ) + + return config +} + module.exports = KafkajsBatchConsumerPlugin diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index cbe159ec770..88e4b7646e3 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -397,7 +397,7 @@ describe('Plugin', () => { .catch(done) }) - it('should propagate context', async () => { + it('should not propagate context by default', async () => { const expectedSpanPromise = agent.use(traces => { const span = traces[0][0] @@ -410,7 +410,7 @@ describe('Plugin', () => { 'kafka.batch_size': 1 }) - expect(parseInt(span.parent_id.toString())).to.be.gt(0) + expect(parseInt(span.parent_id.toString())).to.equal(0) }) await consumer.run({ eachBatch: () => {} }) @@ -619,6 +619,114 @@ describe('Plugin', () => { }) }) }) + + describe('with api configuration', () => { + const messages = [{ key: 'key1', value: 'test2' }] + + beforeEach(async () => { + tracer = require('../../dd-trace').init({ + kafkajs: { batchedParentPropagationEnabled: true } + }) + await agent.load('kafkajs', { batchedParentPropagationEnabled: true }) + const lib = require(`../../../versions/kafkajs@${version}`).get() + Kafka = lib.Kafka + kafka = new Kafka({ + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: lib.logLevel.WARN + }) + }) + + describe('consumer (eachBatch)', () => { + let consumer + + beforeEach(async () => { + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.subscribe({ topic: testTopic }) + await consumer.connect() + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + it('should propagate context when configured', async () => { + const expectedSpanPromise = agent.use(traces => { + const span = traces[0][0] + + expect(span).to.include({ + name: 'kafka.consume', + service: 'test-kafka', + resource: testTopic + }) + expect(span.metrics).to.include({ + 'kafka.batch_size': 1 + }) + + expect(parseInt(span.parent_id.toString())).to.be.gt(0) + }) + + await consumer.run({ eachBatch: () => {} }) + await sendMessages(kafka, testTopic, messages) + await expectedSpanPromise + }) + }) + }) + describe('with env variable configuration', () => { + const messages = [{ key: 'key1', value: 'test2' }] + + beforeEach(async () => { + process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'true' + tracer = require('../../dd-trace').init() + await agent.load('kafkajs') + const lib = require(`../../../versions/kafkajs@${version}`).get() + Kafka = lib.Kafka + kafka = new Kafka({ + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: lib.logLevel.WARN + }) + }) + + afterEach(() => { + process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'false' + }) + + describe('consumer (eachBatch)', () => { + let consumer + + beforeEach(async () => { + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.subscribe({ topic: testTopic }) + await consumer.connect() + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + it('should propagate context when configured', async () => { + const expectedSpanPromise = agent.use(traces => { + const span = traces[0][0] + + expect(span).to.include({ + name: 'kafka.consume', + service: 'test-kafka', + resource: testTopic + }) + expect(span.metrics).to.include({ + 'kafka.batch_size': 1 + }) + + expect(parseInt(span.parent_id.toString())).to.be.gt(0) + }) + + await consumer.run({ eachBatch: () => {} }) + await sendMessages(kafka, testTopic, messages) + await expectedSpanPromise + }) + }) + }) }) }) }) From f330fe41ebeb61bf35890c01cac13928dab7edf1 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 28 Aug 2024 10:25:01 -0400 Subject: [PATCH 4/8] add env var --- packages/datadog-plugin-kafkajs/test/index.spec.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 88e4b7646e3..8b9e0cb2ef9 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -406,7 +406,7 @@ describe('Plugin', () => { service: 'test-kafka', resource: testTopic }) - expect(span.metrics).to.include({ + expect(span.metrics).to.be.gt({ 'kafka.batch_size': 1 }) @@ -659,7 +659,7 @@ describe('Plugin', () => { service: 'test-kafka', resource: testTopic }) - expect(span.metrics).to.include({ + expect(span.metrics).to.be.gt({ 'kafka.batch_size': 1 }) @@ -714,7 +714,7 @@ describe('Plugin', () => { service: 'test-kafka', resource: testTopic }) - expect(span.metrics).to.include({ + expect(span.metrics).to.be.gt({ 'kafka.batch_size': 1 }) From 4e89d0c10a386ff004774d504ae87b5a1a34bdd5 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 28 Aug 2024 11:06:53 -0400 Subject: [PATCH 5/8] fix tests --- .../datadog-plugin-kafkajs/test/index.spec.js | 65 +------------------ 1 file changed, 3 insertions(+), 62 deletions(-) diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 8b9e0cb2ef9..58255b3e1d1 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -406,9 +406,7 @@ describe('Plugin', () => { service: 'test-kafka', resource: testTopic }) - expect(span.metrics).to.be.gt({ - 'kafka.batch_size': 1 - }) + expect(span.metrics['kafka.batch_size']).to.be.at.least(1) expect(parseInt(span.parent_id.toString())).to.equal(0) }) @@ -620,7 +618,7 @@ describe('Plugin', () => { }) }) - describe('with api configuration', () => { + describe('with configuration', () => { const messages = [{ key: 'key1', value: 'test2' }] beforeEach(async () => { @@ -659,64 +657,7 @@ describe('Plugin', () => { service: 'test-kafka', resource: testTopic }) - expect(span.metrics).to.be.gt({ - 'kafka.batch_size': 1 - }) - - expect(parseInt(span.parent_id.toString())).to.be.gt(0) - }) - - await consumer.run({ eachBatch: () => {} }) - await sendMessages(kafka, testTopic, messages) - await expectedSpanPromise - }) - }) - }) - describe('with env variable configuration', () => { - const messages = [{ key: 'key1', value: 'test2' }] - - beforeEach(async () => { - process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'true' - tracer = require('../../dd-trace').init() - await agent.load('kafkajs') - const lib = require(`../../../versions/kafkajs@${version}`).get() - Kafka = lib.Kafka - kafka = new Kafka({ - clientId: `kafkajs-test-${version}`, - brokers: ['127.0.0.1:9092'], - logLevel: lib.logLevel.WARN - }) - }) - - afterEach(() => { - process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'false' - }) - - describe('consumer (eachBatch)', () => { - let consumer - - beforeEach(async () => { - consumer = kafka.consumer({ groupId: 'test-group' }) - await consumer.subscribe({ topic: testTopic }) - await consumer.connect() - }) - - afterEach(async () => { - await consumer.disconnect() - }) - - it('should propagate context when configured', async () => { - const expectedSpanPromise = agent.use(traces => { - const span = traces[0][0] - - expect(span).to.include({ - name: 'kafka.consume', - service: 'test-kafka', - resource: testTopic - }) - expect(span.metrics).to.be.gt({ - 'kafka.batch_size': 1 - }) + expect(span.metrics['kafka.batch_size']).to.be.at.least(1) expect(parseInt(span.parent_id.toString())).to.be.gt(0) }) From 95cdc1edab5d52ebdcd40796d92531119c6fc216 Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 30 Aug 2024 15:00:36 -0400 Subject: [PATCH 6/8] initial commit --- .../src/batch-consumer.js | 49 +----- .../datadog-plugin-kafkajs/src/consumer.js | 35 ++-- .../datadog-plugin-kafkajs/test/index.spec.js | 162 ------------------ 3 files changed, 15 insertions(+), 231 deletions(-) diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index d439787ae8a..f9937aa1295 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -1,70 +1,23 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') -const { extract } = require('./utils') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') -const { isTrue } = require('../../dd-trace/src/util') -const coalesce = require('koalas') class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } static get operation () { return 'consume-batch' } - configure (config) { - super.configure(coalesceConfiguration(config, this.serviceIdentifier)) - } - start ({ topic, partition, messages, groupId }) { - let childOf - if (this.config.batchedParentPropagationEnabled) { - for (const message of messages) { - // find the first valid context and use this as this span's parent - childOf = extract(this.tracer, message?.headers) - if (childOf._traceId !== null) { - break - } - } - } - - const span = this.startSpan({ - childOf, - resource: topic, - type: 'worker', - meta: { - component: 'kafkajs', - 'kafka.topic': topic, - 'kafka.message.offset': messages[0].offset, - 'kafka.message.offset.last': messages[messages.length - 1].offset - }, - metrics: { - 'kafka.partition': partition, - 'kafka.batch_size': messages.length - } - }) - if (this.config.dsmEnabled) { for (const message of messages) { if (message?.headers && DsmPathwayCodec.contextExists(message.headers)) { const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.headers) this.tracer - .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], span, payloadSize) + .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize) } } } } } -function coalesceConfiguration (config) { - // check if batch propagation is enabled via env variable - config.batchedParentPropagationEnabled = isTrue( - coalesce( - process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED, - config.batchedParentPropagationEnabled, - false - ) - ) - - return config -} - module.exports = KafkajsBatchConsumerPlugin diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 5c1598bbfd5..420fea10902 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -1,7 +1,6 @@ 'use strict' const dc = require('dc-polyfill') -const { extract } = require('./utils') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') @@ -91,26 +90,6 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } - // startBatch ({ topic, partition, messages, groupId }) { - // if (this.config.dsmEnabled) { - // this.tracer.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']) - // } - // this.startSpan({ - // resource: topic, - // type: 'worker', - // meta: { - // component: 'kafkajs', - // 'kafka.topic': topic, - // 'kafka.message.offset': messages[0].offset, - // 'kafka.message.offset.last': messages[messages.length - 1].offset - // }, - // metrics: { - // 'kafka.partition': partition, - // 'kafka.batch_size': messages.length - // } - // }) - // } - finish () { if (beforeFinishCh.hasSubscribers) { beforeFinishCh.publish() @@ -120,4 +99,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } +function extract (tracer, bufferMap) { + if (!bufferMap) return null + + const textMap = {} + + for (const key of Object.keys(bufferMap)) { + if (bufferMap[key] === null || bufferMap[key] === undefined) continue + + textMap[key] = bufferMap[key].toString() + } + + return tracer.extract('text_map', textMap) +} + module.exports = KafkajsConsumerPlugin diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 58255b3e1d1..3df303a95cf 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -342,117 +342,6 @@ describe('Plugin', () => { ) }) - describe('consumer (eachBatch)', () => { - let consumer - - beforeEach(async () => { - consumer = kafka.consumer({ groupId: 'test-group' }) - await consumer.subscribe({ topic: testTopic }) - await consumer.connect() - }) - - afterEach(async () => { - await consumer.disconnect() - }) - - it('should be instrumented', async () => { - const expectedSpanPromise = expectSpanWithDefaults({ - name: expectedSchema.receive.opName, - service: expectedSchema.receive.serviceName, - meta: { - 'span.kind': 'consumer', - component: 'kafkajs' - }, - resource: testTopic, - error: 0, - type: 'worker' - }) - - await consumer.run({ - eachBatch: () => {} - }) - await sendMessages(kafka, testTopic, messages) - - return expectedSpanPromise - }) - - it('should run the consumer in the context of the consumer span', done => { - const firstSpan = tracer.scope().active() - let eachBatch = async ({ batch }) => { - const currentSpan = tracer.scope().active() - - try { - expect(currentSpan).to.not.equal(firstSpan) - expect(currentSpan.context()._name).to.equal(expectedSchema.receive.opName) - done() - } catch (e) { - done(e) - } finally { - eachBatch = () => {} // avoid being called for each message - } - } - - consumer.run({ eachBatch: (...args) => eachBatch(...args) }) - .then(() => sendMessages(kafka, testTopic, messages)) - .catch(done) - }) - - it('should not propagate context by default', async () => { - const expectedSpanPromise = agent.use(traces => { - const span = traces[0][0] - - expect(span).to.include({ - name: 'kafka.consume', - service: 'test-kafka', - resource: testTopic - }) - expect(span.metrics['kafka.batch_size']).to.be.at.least(1) - - expect(parseInt(span.parent_id.toString())).to.equal(0) - }) - - await consumer.run({ eachBatch: () => {} }) - await sendMessages(kafka, testTopic, messages) - await expectedSpanPromise - }) - - it('should be instrumented w/ error', async () => { - const fakeError = new Error('Oh No!') - const expectedSpanPromise = expectSpanWithDefaults({ - name: expectedSchema.receive.opName, - service: expectedSchema.receive.serviceName, - meta: { - [ERROR_TYPE]: fakeError.name, - [ERROR_MESSAGE]: fakeError.message, - [ERROR_STACK]: fakeError.stack, - component: 'kafkajs' - }, - resource: testTopic, - error: 1, - type: 'worker' - - }) - - await consumer.subscribe({ topic: testTopic, fromBeginning: true }) - await consumer.run({ - eachBatch: async ({ batch }) => { - throw fakeError - } - }) - await sendMessages(kafka, testTopic, messages) - - return expectedSpanPromise - }) - - withNamingSchema( - async () => { - await consumer.run({ eachBatch: () => {} }) - await sendMessages(kafka, testTopic, messages) - }, - rawExpectedSchema.receive - ) - }) - describe('data stream monitoring', () => { let consumer @@ -617,57 +506,6 @@ describe('Plugin', () => { }) }) }) - - describe('with configuration', () => { - const messages = [{ key: 'key1', value: 'test2' }] - - beforeEach(async () => { - tracer = require('../../dd-trace').init({ - kafkajs: { batchedParentPropagationEnabled: true } - }) - await agent.load('kafkajs', { batchedParentPropagationEnabled: true }) - const lib = require(`../../../versions/kafkajs@${version}`).get() - Kafka = lib.Kafka - kafka = new Kafka({ - clientId: `kafkajs-test-${version}`, - brokers: ['127.0.0.1:9092'], - logLevel: lib.logLevel.WARN - }) - }) - - describe('consumer (eachBatch)', () => { - let consumer - - beforeEach(async () => { - consumer = kafka.consumer({ groupId: 'test-group' }) - await consumer.subscribe({ topic: testTopic }) - await consumer.connect() - }) - - afterEach(async () => { - await consumer.disconnect() - }) - - it('should propagate context when configured', async () => { - const expectedSpanPromise = agent.use(traces => { - const span = traces[0][0] - - expect(span).to.include({ - name: 'kafka.consume', - service: 'test-kafka', - resource: testTopic - }) - expect(span.metrics['kafka.batch_size']).to.be.at.least(1) - - expect(parseInt(span.parent_id.toString())).to.be.gt(0) - }) - - await consumer.run({ eachBatch: () => {} }) - await sendMessages(kafka, testTopic, messages) - await expectedSpanPromise - }) - }) - }) }) }) }) From 7e34f1f40129ebaeae25ebed5a262f7305136d1b Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 30 Aug 2024 15:13:19 -0400 Subject: [PATCH 7/8] delete utils --- packages/datadog-plugin-kafkajs/src/utils.js | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 packages/datadog-plugin-kafkajs/src/utils.js diff --git a/packages/datadog-plugin-kafkajs/src/utils.js b/packages/datadog-plugin-kafkajs/src/utils.js deleted file mode 100644 index 6a1eb1bf6ad..00000000000 --- a/packages/datadog-plugin-kafkajs/src/utils.js +++ /dev/null @@ -1,17 +0,0 @@ -function extract (tracer, bufferMap) { - if (!bufferMap) return null - - const textMap = {} - - for (const key of Object.keys(bufferMap)) { - if (bufferMap[key] === null || bufferMap[key] === undefined) continue - - textMap[key] = bufferMap[key].toString() - } - - return tracer.extract('text_map', textMap) -} - -module.exports = { - extract -} From 87c414bb8cb9436c103ed6fbbd8f792a95ad69e4 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 3 Sep 2024 09:11:12 -0400 Subject: [PATCH 8/8] update according to reviewer comments --- .../datadog-instrumentations/src/kafkajs.js | 127 +++++++++--------- .../src/batch-consumer.js | 16 +-- 2 files changed, 70 insertions(+), 73 deletions(-) diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 1671f897126..395c69de057 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -100,6 +100,17 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf return createConsumer.apply(this, arguments) } + const eachMessageExtractor = (args) => { + const { topic, partition, message } = args[0] + return { topic, partition, message, groupId } + } + + const eachBatchExtractor = (args) => { + const { batch } = args[0] + const { topic, partition, messages } = batch + return { topic, partition, messages, groupId } + } + const consumer = createConsumer.apply(this, arguments) consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) @@ -108,75 +119,63 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const groupId = arguments[0].groupId consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { - return run({ - eachMessage: typeof eachMessage === 'function' - ? function (...eachMessageArgs) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const { topic, partition, message } = eachMessageArgs[0] - consumerStartCh.publish({ topic, partition, message, groupId }) - try { - const result = eachMessage.apply(this, eachMessageArgs) - if (result && typeof result.then === 'function') { - result.then( - innerAsyncResource.bind(() => consumerFinishCh.publish(undefined)), - innerAsyncResource.bind(err => { - if (err) { - consumerErrorCh.publish(err) - } - consumerFinishCh.publish(undefined) - }) - ) - } else { - consumerFinishCh.publish(undefined) - } + eachMessage = wrapFunction( + eachMessage, + consumerStartCh, + consumerFinishCh, + consumerErrorCh, + eachMessageExtractor + ) + + eachBatch = wrapFunction( + eachBatch, + batchConsumerStartCh, + batchConsumerFinishCh, + batchConsumerErrorCh, + eachBatchExtractor + ) - return result - } catch (e) { - consumerErrorCh.publish(e) - consumerFinishCh.publish(undefined) - throw e - } - }) - } - : eachMessage, - eachBatch: - typeof eachBatch === 'function' - ? function (...eachBatchArgs) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const { batch } = eachBatchArgs[0] - const { topic, partition, messages } = batch - batchConsumerStartCh.publish({ topic, partition, messages, groupId }) - try { - const result = eachBatch.apply(this, eachBatchArgs) - if (result && typeof result.then === 'function') { - result.then( - innerAsyncResource.bind(() => batchConsumerFinishCh.publish()), - innerAsyncResource.bind((err) => { - if (err) { - batchConsumerErrorCh.publish(err) - } - batchConsumerFinishCh.publish() - }) - ) - } else { - batchConsumerFinishCh.publish() - } - - return result - } catch (error) { - batchConsumerErrorCh.publish(error) - batchConsumerFinishCh.publish() - throw error - } - }) - } - : eachBatch, + return run({ + eachMessage, + eachBatch, ...runArgs }) } + return consumer }) return Kafka }) + +const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => { + return typeof fn === 'function' + ? function (...args) { + const innerAsyncResource = new AsyncResource('bound-anonymous-fn') + return innerAsyncResource.runInAsyncScope(() => { + const extractedArgs = extractArgs(args) + startCh.publish(extractedArgs) + try { + const result = fn.apply(this, args) + if (result && typeof result.then === 'function') { + result.then( + innerAsyncResource.bind(() => finishCh.publish(undefined)), + innerAsyncResource.bind(err => { + if (err) { + errorCh.publish(err) + } + finishCh.publish(undefined) + }) + ) + } else { + finishCh.publish(undefined) + } + return result + } catch (e) { + errorCh.publish(e) + finishCh.publish(undefined) + throw e + } + }) + } + : fn +} diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index f9937aa1295..5a531267e9b 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -7,15 +7,13 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get operation () { return 'consume-batch' } start ({ topic, partition, messages, groupId }) { - if (this.config.dsmEnabled) { - for (const message of messages) { - if (message?.headers && DsmPathwayCodec.contextExists(message.headers)) { - const payloadSize = getMessageSize(message) - this.tracer.decodeDataStreamsContext(message.headers) - this.tracer - .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize) - } - } + if (!this.config.dsmEnabled) return + for (const message of messages) { + if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue + const payloadSize = getMessageSize(message) + this.tracer.decodeDataStreamsContext(message.headers) + this.tracer + .setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize) } } }