Skip to content

Commit

Permalink
add configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Aug 28, 2024
1 parent 36ae746 commit 8b857a6
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 6 deletions.
30 changes: 26 additions & 4 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { extract } = require('./utils')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { isTrue } = require('../../dd-trace/src/util')
const coalesce = require('koalas')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
static get operation () { return 'consume-batch' }

configure (config) {
super.configure(coalesceConfiguration(config, this.serviceIdentifier))
}

start ({ topic, partition, messages, groupId }) {
let childOf
for (const message of messages) {
childOf = extract(this.tracer, message?.headers)
if (childOf._traceId !== null) {
break
if (this.config.batchedParentPropagationEnabled) {
for (const message of messages) {
// find the first valid context and use this as this span's parent
childOf = extract(this.tracer, message?.headers)
if (childOf._traceId !== null) {
break
}
}
}

Expand Down Expand Up @@ -45,4 +54,17 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
}
}

function coalesceConfiguration (config) {
// check if batch propagation is enabled via env variable
config.batchedParentPropagationEnabled = isTrue(
coalesce(
process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED,
config.batchedParentPropagationEnabled,
false
)
)

return config
}

module.exports = KafkajsBatchConsumerPlugin
112 changes: 110 additions & 2 deletions packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ describe('Plugin', () => {
.catch(done)
})

it('should propagate context', async () => {
it('should not propagate context by default', async () => {
const expectedSpanPromise = agent.use(traces => {
const span = traces[0][0]

Expand All @@ -410,7 +410,7 @@ describe('Plugin', () => {
'kafka.batch_size': 1
})

expect(parseInt(span.parent_id.toString())).to.be.gt(0)
expect(parseInt(span.parent_id.toString())).to.equal(0)
})

await consumer.run({ eachBatch: () => {} })
Expand Down Expand Up @@ -619,6 +619,114 @@ describe('Plugin', () => {
})
})
})

describe('with api configuration', () => {
const messages = [{ key: 'key1', value: 'test2' }]

beforeEach(async () => {
tracer = require('../../dd-trace').init({
kafkajs: { batchedParentPropagationEnabled: true }
})
await agent.load('kafkajs', { batchedParentPropagationEnabled: true })
const lib = require(`../../../versions/kafkajs@${version}`).get()
Kafka = lib.Kafka
kafka = new Kafka({
clientId: `kafkajs-test-${version}`,
brokers: ['127.0.0.1:9092'],
logLevel: lib.logLevel.WARN
})
})

describe('consumer (eachBatch)', () => {
let consumer

beforeEach(async () => {
consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.subscribe({ topic: testTopic })
await consumer.connect()
})

afterEach(async () => {
await consumer.disconnect()
})

it('should propagate context when configured', async () => {
const expectedSpanPromise = agent.use(traces => {
const span = traces[0][0]

expect(span).to.include({
name: 'kafka.consume',
service: 'test-kafka',
resource: testTopic
})
expect(span.metrics).to.include({
'kafka.batch_size': 1
})

expect(parseInt(span.parent_id.toString())).to.be.gt(0)
})

await consumer.run({ eachBatch: () => {} })
await sendMessages(kafka, testTopic, messages)
await expectedSpanPromise
})
})
})
describe('with env variable configuration', () => {
const messages = [{ key: 'key1', value: 'test2' }]

beforeEach(async () => {
process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'true'
tracer = require('../../dd-trace').init()
await agent.load('kafkajs')
const lib = require(`../../../versions/kafkajs@${version}`).get()
Kafka = lib.Kafka
kafka = new Kafka({
clientId: `kafkajs-test-${version}`,
brokers: ['127.0.0.1:9092'],
logLevel: lib.logLevel.WARN
})
})

afterEach(() => {
process.env.DD_TRACE_KAFKAJS_BATCHED_PARENT_PROPAGATION_ENABLED = 'false'
})

describe('consumer (eachBatch)', () => {
let consumer

beforeEach(async () => {
consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.subscribe({ topic: testTopic })
await consumer.connect()
})

afterEach(async () => {
await consumer.disconnect()
})

it('should propagate context when configured', async () => {
const expectedSpanPromise = agent.use(traces => {
const span = traces[0][0]

expect(span).to.include({
name: 'kafka.consume',
service: 'test-kafka',
resource: testTopic
})
expect(span.metrics).to.include({
'kafka.batch_size': 1
})

expect(parseInt(span.parent_id.toString())).to.be.gt(0)
})

await consumer.run({ eachBatch: () => {} })
await sendMessages(kafka, testTopic, messages)
await expectedSpanPromise
})
})
})
})
})
})
Expand Down

0 comments on commit 8b857a6

Please sign in to comment.