From 8b857a667b2aff8f51311cd3a8249fa7e58955f6 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 28 Aug 2024 10:20:16 -0400 Subject: [PATCH] add configuration --- .../src/batch-consumer.js | 30 ++++- .../datadog-plugin-kafkajs/test/index.spec.js | 112 +++++++++++++++++- 2 files changed, 136 insertions(+), 6 deletions(-) diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 12c87f90e06..d439787ae8a 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -2,17 +2,26 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { extract } = require('./utils') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') +const { isTrue } = require('../../dd-trace/src/util') +const coalesce = require('koalas') class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } static get operation () { return 'consume-batch' } + configure (config) { + super.configure(coalesceConfiguration(config, this.serviceIdentifier)) + } + start ({ topic, partition, messages, groupId }) { let childOf - for (const message of messages) { - childOf = extract(this.tracer, message?.headers) - if (childOf._traceId !== null) { - break + if (this.config.batchedParentPropagationEnabled) { + for (const message of messages) { + // find the first valid context and use this as this span's parent + childOf = extract(this.tracer, message?.headers) + if (childOf._traceId !== null) { + break + } } } @@ -45,4 +54,17 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { } } +function coalesceConfiguration (config) { + // check if batch propagation is enabled via env variable + config.batchedParentPropagationEnabled = isTrue( + coalesce( + process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED, + config.batchedParentPropagationEnabled, + false + ) + ) + + return config +} + module.exports = KafkajsBatchConsumerPlugin diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index cbe159ec770..88e4b7646e3 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -397,7 +397,7 @@ describe('Plugin', () => { .catch(done) }) - it('should propagate context', async () => { + it('should not propagate context by default', async () => { const expectedSpanPromise = agent.use(traces => { const span = traces[0][0] @@ -410,7 +410,7 @@ describe('Plugin', () => { 'kafka.batch_size': 1 }) - expect(parseInt(span.parent_id.toString())).to.be.gt(0) + expect(parseInt(span.parent_id.toString())).to.equal(0) }) await consumer.run({ eachBatch: () => {} }) @@ -619,6 +619,114 @@ describe('Plugin', () => { }) }) }) + + describe('with api configuration', () => { + const messages = [{ key: 'key1', value: 'test2' }] + + beforeEach(async () => { + tracer = require('../../dd-trace').init({ + kafkajs: { batchedParentPropagationEnabled: true } + }) + await agent.load('kafkajs', { batchedParentPropagationEnabled: true }) + const lib = require(`../../../versions/kafkajs@${version}`).get() + Kafka = lib.Kafka + kafka = new Kafka({ + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: lib.logLevel.WARN + }) + }) + + describe('consumer (eachBatch)', () => { + let consumer + + beforeEach(async () => { + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.subscribe({ topic: testTopic }) + await consumer.connect() + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + it('should propagate context when configured', async () => { + const expectedSpanPromise = agent.use(traces => { + const span = traces[0][0] + + expect(span).to.include({ + name: 'kafka.consume', + service: 'test-kafka', + resource: testTopic + }) + expect(span.metrics).to.include({ + 'kafka.batch_size': 1 + }) + + expect(parseInt(span.parent_id.toString())).to.be.gt(0) + }) + + await consumer.run({ eachBatch: () => {} }) + await sendMessages(kafka, testTopic, messages) + await expectedSpanPromise + }) + }) + }) + describe('with env variable configuration', () => { + const messages = [{ key: 'key1', value: 'test2' }] + + beforeEach(async () => { + process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'true' + tracer = require('../../dd-trace').init() + await agent.load('kafkajs') + const lib = require(`../../../versions/kafkajs@${version}`).get() + Kafka = lib.Kafka + kafka = new Kafka({ + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: lib.logLevel.WARN + }) + }) + + afterEach(() => { + process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'false' + }) + + describe('consumer (eachBatch)', () => { + let consumer + + beforeEach(async () => { + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.subscribe({ topic: testTopic }) + await consumer.connect() + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + it('should propagate context when configured', async () => { + const expectedSpanPromise = agent.use(traces => { + const span = traces[0][0] + + expect(span).to.include({ + name: 'kafka.consume', + service: 'test-kafka', + resource: testTopic + }) + expect(span.metrics).to.include({ + 'kafka.batch_size': 1 + }) + + expect(parseInt(span.parent_id.toString())).to.be.gt(0) + }) + + await consumer.run({ eachBatch: () => {} }) + await sendMessages(kafka, testTopic, messages) + await expectedSpanPromise + }) + }) + }) }) }) })